You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:25 UTC

[10/11] flink git commit: [FLINK-4877] Rename ProcessingTimeCallback.trigger() to onProcessingTime()

[FLINK-4877] Rename ProcessingTimeCallback.trigger() to onProcessingTime()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/770f2f83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/770f2f83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/770f2f83

Branch: refs/heads/master
Commit: 770f2f83a81b2810aff171b2f56390ef686f725a
Parents: 94a3f25
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 18 11:11:10 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java    |  2 +-
 .../kafka/internals/AbstractFetcher.java          |  2 +-
 .../api/operators/HeapInternalTimerService.java   |  2 +-
 .../api/operators/StreamSourceContexts.java       |  2 +-
 .../operators/ExtractTimestampsOperator.java      |  2 +-
 .../TimestampsAndPeriodicWatermarksOperator.java  |  2 +-
 ...stractAlignedProcessingTimeWindowOperator.java |  2 +-
 .../runtime/tasks/ProcessingTimeCallback.java     |  2 +-
 .../tasks/SystemProcessingTimeService.java        |  2 +-
 .../runtime/tasks/TestProcessingTimeService.java  |  4 ++--
 .../runtime/operators/StreamTaskTimerTest.java    |  4 ++--
 .../operators/TestProcessingTimeServiceTest.java  |  4 ++--
 .../tasks/SystemProcessingTimeServiceTest.java    | 18 +++++++++---------
 .../streaming/runtime/StreamTaskTimerITCase.java  |  4 ++--
 14 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 66e704c..52de00d 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -422,7 +422,7 @@ public class BucketingSink<T>
 	}
 
 	@Override
-	public void trigger(long timestamp) throws Exception {
+	public void onProcessingTime(long timestamp) throws Exception {
 		long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
 
 		checkForInactiveBuckets(currentProcessingTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 58bca52..3350b06 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -495,7 +495,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		}
 		
 		@Override
-		public void trigger(long timestamp) throws Exception {
+		public void onProcessingTime(long timestamp) throws Exception {
 
 			long minAcrossAll = Long.MAX_VALUE;
 			for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
index 15258cf..8884c3d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -170,7 +170,7 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>,
 	}
 
 	@Override
-	public void trigger(long time) throws Exception {
+	public void onProcessingTime(long time) throws Exception {
 		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
 		// inside the callback.
 		nextTimer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 66d2ac2..a6a273f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -215,7 +215,7 @@ public class StreamSourceContexts {
 			}
 
 			@Override
-			public void trigger(long timestamp) {
+			public void onProcessingTime(long timestamp) {
 				final long currentTime = timeService.getCurrentProcessingTime();
 
 				if (currentTime > nextWatermarkTime) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 5f5028a..a10e457 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -73,7 +73,7 @@ public class ExtractTimestampsOperator<T>
 	}
 
 	@Override
-	public void trigger(long timestamp) throws Exception {
+	public void onProcessingTime(long timestamp) throws Exception {
 		// register next timer
 		long newWatermark = userFunction.getCurrentWatermark();
 		if (newWatermark > currentWatermark) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index ba72659..4defb96 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -69,7 +69,7 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
 	}
 
 	@Override
-	public void trigger(long timestamp) throws Exception {
+	public void onProcessingTime(long timestamp) throws Exception {
 		// register next timer
 		Watermark newWatermark = userFunction.getCurrentWatermark();
 		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 80a317e..24fd0de 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -207,7 +207,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	}
 
 	@Override
-	public void trigger(long timestamp) throws Exception {
+	public void onProcessingTime(long timestamp) throws Exception {
 		// first we check if we actually trigger the window function
 		if (timestamp == nextEvaluationTime) {
 			// compute and output the results

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
index aca1718..035939f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -36,5 +36,5 @@ public interface ProcessingTimeCallback {
 	 * 
 	 * @param timestamp The timestamp for which the trigger event was scheduled.
 	 */
-	void trigger(long timestamp) throws Exception ;
+	void onProcessingTime(long timestamp) throws Exception ;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index b433f8d..153aedf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -179,7 +179,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 		public void run() {
 			synchronized (lock) {
 				try {
-					target.trigger(timestamp);
+					target.onProcessingTime(timestamp);
 				} catch (Throwable t) {
 					TimerException asyncException = new TimerException(t);
 					exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 3e6c273..2ca287a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -67,7 +67,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) {
 				long now = tasks.getKey();
 				for (ScheduledTimerFuture task: tasks.getValue()) {
-					task.getProcessingTimeCallback().trigger(now);
+					task.getProcessingTimeCallback().onProcessingTime(now);
 				}
 			}
 		}
@@ -89,7 +89,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 
 		if (timestamp <= currentTime) {
 			try {
-				target.trigger(timestamp);
+				target.onProcessingTime(timestamp);
 			} catch (Exception e) {
 				throw new RuntimeException(e);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 87241dd..f23c6d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -67,7 +67,7 @@ public class StreamTaskTimerTest {
 		// first one spawns thread
 		mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 			@Override
-			public void trigger(long timestamp) {
+			public void onProcessingTime(long timestamp) {
 			}
 		});
 
@@ -163,7 +163,7 @@ public class StreamTaskTimerTest {
 		}
 
 		@Override
-		public void trigger(long timestamp) {
+		public void onProcessingTime(long timestamp) {
 			try {
 				assertEquals(expectedTimestamp, timestamp);
 				assertEquals(expectedInSequence, numInSequence);

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index db56717..a3b231b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -73,14 +73,14 @@ public class TestProcessingTimeServiceTest {
 		// register 2 tasks
 		mapTask.getProcessingTimeService().registerTimer(30, new ProcessingTimeCallback() {
 			@Override
-			public void trigger(long timestamp) {
+			public void onProcessingTime(long timestamp) {
 
 			}
 		});
 
 		mapTask.getProcessingTimeService().registerTimer(40, new ProcessingTimeCallback() {
 			@Override
-			public void trigger(long timestamp) {
+			public void onProcessingTime(long timestamp) {
 
 			}
 		});

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index dc679ab..797e18a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -52,7 +52,7 @@ public class SystemProcessingTimeServiceTest {
 			// schedule something
 			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 				@Override
-				public void trigger(long timestamp) {
+				public void onProcessingTime(long timestamp) {
 					assertTrue(Thread.holdsLock(lock));
 				}
 			});
@@ -88,7 +88,7 @@ public class SystemProcessingTimeServiceTest {
 			// the task should trigger immediately and should block until terminated with interruption
 			timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 				@Override
-				public void trigger(long timestamp) throws Exception {
+				public void onProcessingTime(long timestamp) throws Exception {
 					latch.trigger();
 					Thread.sleep(100000000);
 				}
@@ -106,7 +106,7 @@ public class SystemProcessingTimeServiceTest {
 			try {
 				timer.registerTimer(System.currentTimeMillis() + 1000, new ProcessingTimeCallback() {
 					@Override
-					public void trigger(long timestamp) {}
+					public void onProcessingTime(long timestamp) {}
 				});
 
 				fail("should result in an exception");
@@ -142,7 +142,7 @@ public class SystemProcessingTimeServiceTest {
 
 			timer.registerTimer(System.currentTimeMillis() + 20, new ProcessingTimeCallback() {
 				@Override
-				public void trigger(long timestamp) throws Exception {
+				public void onProcessingTime(long timestamp) throws Exception {
 					scopeLock.lock();
 					try {
 						latch.trigger();
@@ -164,7 +164,7 @@ public class SystemProcessingTimeServiceTest {
 			// should be able to schedule more tasks (that never get executed)
 			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new ProcessingTimeCallback() {
 				@Override
-				public void trigger(long timestamp) throws Exception {
+				public void onProcessingTime(long timestamp) throws Exception {
 					throw new Exception("test");
 				}
 			});
@@ -199,7 +199,7 @@ public class SystemProcessingTimeServiceTest {
 			// schedule something
 			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new ProcessingTimeCallback() {
 				@Override
-				public void trigger(long timestamp) {}
+				public void onProcessingTime(long timestamp) {}
 			});
 			assertEquals(1, timer.getNumTasksScheduled());
 
@@ -234,7 +234,7 @@ public class SystemProcessingTimeServiceTest {
 		
 		timeServiceProvider.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 			@Override
-			public void trigger(long timestamp) throws Exception {
+			public void onProcessingTime(long timestamp) throws Exception {
 				throw new Exception("Exception in Timer");
 			}
 		});
@@ -258,7 +258,7 @@ public class SystemProcessingTimeServiceTest {
 			// to register some additional timers out of order
 			timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 				@Override
-				public void trigger(long timestamp) throws Exception {
+				public void onProcessingTime(long timestamp) throws Exception {
 					sync.await();
 				}
 			});
@@ -273,7 +273,7 @@ public class SystemProcessingTimeServiceTest {
 			final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
 			ProcessingTimeCallback trigger = new ProcessingTimeCallback() {
 				@Override
-				public void trigger(long timestamp) {
+				public void onProcessingTime(long timestamp) {
 					timestamps.add(timestamp);
 				}
 			};

http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index c0cd0be..48e6fae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -201,7 +201,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public void trigger(long time) throws Exception {
+		public void onProcessingTime(long time) throws Exception {
 			if (!semaphore.tryAcquire()) {
 				Assert.fail("Concurrent invocation of operator functions.");
 			}
@@ -276,7 +276,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 
 
 		@Override
-		public void trigger(long time) throws Exception {
+		public void onProcessingTime(long time) throws Exception {
 			if (!semaphore.tryAcquire()) {
 				Assert.fail("Concurrent invocation of operator functions.");
 			}