You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:25 UTC
[10/11] flink git commit: [FLINK-4877] Rename
ProcessingTimeCallback.trigger() to onProcessingTime()
[FLINK-4877] Rename ProcessingTimeCallback.trigger() to onProcessingTime()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/770f2f83
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/770f2f83
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/770f2f83
Branch: refs/heads/master
Commit: 770f2f83a81b2810aff171b2f56390ef686f725a
Parents: 94a3f25
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 18 11:11:10 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 21 19:03:05 2016 +0200
----------------------------------------------------------------------
.../connectors/fs/bucketing/BucketingSink.java | 2 +-
.../kafka/internals/AbstractFetcher.java | 2 +-
.../api/operators/HeapInternalTimerService.java | 2 +-
.../api/operators/StreamSourceContexts.java | 2 +-
.../operators/ExtractTimestampsOperator.java | 2 +-
.../TimestampsAndPeriodicWatermarksOperator.java | 2 +-
...stractAlignedProcessingTimeWindowOperator.java | 2 +-
.../runtime/tasks/ProcessingTimeCallback.java | 2 +-
.../tasks/SystemProcessingTimeService.java | 2 +-
.../runtime/tasks/TestProcessingTimeService.java | 4 ++--
.../runtime/operators/StreamTaskTimerTest.java | 4 ++--
.../operators/TestProcessingTimeServiceTest.java | 4 ++--
.../tasks/SystemProcessingTimeServiceTest.java | 18 +++++++++---------
.../streaming/runtime/StreamTaskTimerITCase.java | 4 ++--
14 files changed, 26 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 66e704c..52de00d 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -422,7 +422,7 @@ public class BucketingSink<T>
}
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
checkForInactiveBuckets(currentProcessingTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 58bca52..3350b06 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -495,7 +495,7 @@ public abstract class AbstractFetcher<T, KPH> {
}
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
long minAcrossAll = Long.MAX_VALUE;
for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
index 15258cf..8884c3d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -170,7 +170,7 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>,
}
@Override
- public void trigger(long time) throws Exception {
+ public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 66d2ac2..a6a273f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -215,7 +215,7 @@ public class StreamSourceContexts {
}
@Override
- public void trigger(long timestamp) {
+ public void onProcessingTime(long timestamp) {
final long currentTime = timeService.getCurrentProcessingTime();
if (currentTime > nextWatermarkTime) {
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 5f5028a..a10e457 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -73,7 +73,7 @@ public class ExtractTimestampsOperator<T>
}
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
// register next timer
long newWatermark = userFunction.getCurrentWatermark();
if (newWatermark > currentWatermark) {
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index ba72659..4defb96 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -69,7 +69,7 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
}
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 80a317e..24fd0de 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -207,7 +207,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
}
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
// first we check if we actually trigger the window function
if (timestamp == nextEvaluationTime) {
// compute and output the results
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
index aca1718..035939f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -36,5 +36,5 @@ public interface ProcessingTimeCallback {
*
* @param timestamp The timestamp for which the trigger event was scheduled.
*/
- void trigger(long timestamp) throws Exception ;
+ void onProcessingTime(long timestamp) throws Exception ;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index b433f8d..153aedf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -179,7 +179,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
public void run() {
synchronized (lock) {
try {
- target.trigger(timestamp);
+ target.onProcessingTime(timestamp);
} catch (Throwable t) {
TimerException asyncException = new TimerException(t);
exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index 3e6c273..2ca287a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -67,7 +67,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) {
long now = tasks.getKey();
for (ScheduledTimerFuture task: tasks.getValue()) {
- task.getProcessingTimeCallback().trigger(now);
+ task.getProcessingTimeCallback().onProcessingTime(now);
}
}
}
@@ -89,7 +89,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
if (timestamp <= currentTime) {
try {
- target.trigger(timestamp);
+ target.onProcessingTime(timestamp);
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 87241dd..f23c6d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -67,7 +67,7 @@ public class StreamTaskTimerTest {
// first one spawns thread
mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) {
+ public void onProcessingTime(long timestamp) {
}
});
@@ -163,7 +163,7 @@ public class StreamTaskTimerTest {
}
@Override
- public void trigger(long timestamp) {
+ public void onProcessingTime(long timestamp) {
try {
assertEquals(expectedTimestamp, timestamp);
assertEquals(expectedInSequence, numInSequence);
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index db56717..a3b231b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -73,14 +73,14 @@ public class TestProcessingTimeServiceTest {
// register 2 tasks
mapTask.getProcessingTimeService().registerTimer(30, new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) {
+ public void onProcessingTime(long timestamp) {
}
});
mapTask.getProcessingTimeService().registerTimer(40, new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) {
+ public void onProcessingTime(long timestamp) {
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index dc679ab..797e18a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -52,7 +52,7 @@ public class SystemProcessingTimeServiceTest {
// schedule something
ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) {
+ public void onProcessingTime(long timestamp) {
assertTrue(Thread.holdsLock(lock));
}
});
@@ -88,7 +88,7 @@ public class SystemProcessingTimeServiceTest {
// the task should trigger immediately and should block until terminated with interruption
timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
latch.trigger();
Thread.sleep(100000000);
}
@@ -106,7 +106,7 @@ public class SystemProcessingTimeServiceTest {
try {
timer.registerTimer(System.currentTimeMillis() + 1000, new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) {}
+ public void onProcessingTime(long timestamp) {}
});
fail("should result in an exception");
@@ -142,7 +142,7 @@ public class SystemProcessingTimeServiceTest {
timer.registerTimer(System.currentTimeMillis() + 20, new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
scopeLock.lock();
try {
latch.trigger();
@@ -164,7 +164,7 @@ public class SystemProcessingTimeServiceTest {
// should be able to schedule more tasks (that never get executed)
ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
throw new Exception("test");
}
});
@@ -199,7 +199,7 @@ public class SystemProcessingTimeServiceTest {
// schedule something
ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) {}
+ public void onProcessingTime(long timestamp) {}
});
assertEquals(1, timer.getNumTasksScheduled());
@@ -234,7 +234,7 @@ public class SystemProcessingTimeServiceTest {
timeServiceProvider.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
throw new Exception("Exception in Timer");
}
});
@@ -258,7 +258,7 @@ public class SystemProcessingTimeServiceTest {
// to register some additional timers out of order
timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) throws Exception {
+ public void onProcessingTime(long timestamp) throws Exception {
sync.await();
}
});
@@ -273,7 +273,7 @@ public class SystemProcessingTimeServiceTest {
final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
ProcessingTimeCallback trigger = new ProcessingTimeCallback() {
@Override
- public void trigger(long timestamp) {
+ public void onProcessingTime(long timestamp) {
timestamps.add(timestamp);
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/770f2f83/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index c0cd0be..48e6fae 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -201,7 +201,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
}
@Override
- public void trigger(long time) throws Exception {
+ public void onProcessingTime(long time) throws Exception {
if (!semaphore.tryAcquire()) {
Assert.fail("Concurrent invocation of operator functions.");
}
@@ -276,7 +276,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
@Override
- public void trigger(long time) throws Exception {
+ public void onProcessingTime(long time) throws Exception {
if (!semaphore.tryAcquire()) {
Assert.fail("Concurrent invocation of operator functions.");
}