You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:20 UTC
[05/11] flink git commit: [FLINK-4877] Rename TimeServiceProvider to
ProcessingTimeService
[FLINK-4877] Rename TimeServiceProvider to ProcessingTimeService
The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.
TimeServiceProvider is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e112a632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e112a632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e112a632
Branch: refs/heads/master
Commit: e112a63208006b4e348d75f3df84d2fd4b091797
Parents: 71d2e3e
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sun Sep 25 20:58:16 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 21 19:03:04 2016 +0200
----------------------------------------------------------------------
.../hdfstests/ContinuousFileMonitoringTest.java | 4 +-
.../connectors/fs/bucketing/BucketingSink.java | 6 +-
.../kafka/internals/AbstractFetcher.java | 8 +-
.../AbstractFetcherTimestampsTest.java | 8 +-
.../kafka/testutils/MockRuntimeContext.java | 8 +-
.../source/ContinuousFileReaderOperator.java | 2 +-
.../api/operators/AbstractStreamOperator.java | 17 +-
.../streaming/api/operators/StreamSource.java | 2 +-
.../api/operators/StreamSourceContexts.java | 14 +-
.../api/operators/StreamingRuntimeContext.java | 6 +-
.../operators/ExtractTimestampsOperator.java | 8 +-
...TimestampsAndPeriodicWatermarksOperator.java | 8 +-
...ractAlignedProcessingTimeWindowOperator.java | 6 +-
.../windowing/EvictingWindowOperator.java | 2 +-
.../operators/windowing/WindowOperator.java | 10 +-
.../tasks/DefaultTimeServiceProvider.java | 262 ----------------
.../runtime/tasks/ProcessingTimeService.java | 83 +++++
.../streaming/runtime/tasks/StreamTask.java | 16 +-
.../tasks/SystemProcessingTimeService.java | 262 ++++++++++++++++
.../tasks/TestProcessingTimeService.java | 172 ++++++++++
.../runtime/tasks/TestTimeServiceProvider.java | 172 ----------
.../runtime/tasks/TimeServiceProvider.java | 83 -----
.../operators/StreamSourceOperatorTest.java | 47 +--
.../runtime/operators/StreamTaskTimerTest.java | 6 +-
.../TestProcessingTimeServiceTest.java | 113 +++++++
.../runtime/operators/TestTimeProviderTest.java | 113 -------
...stampsAndPeriodicWatermarksOperatorTest.java | 13 +-
...AlignedProcessingTimeWindowOperatorTest.java | 49 ++-
...AlignedProcessingTimeWindowOperatorTest.java | 56 ++--
.../operators/windowing/NoOpTimerService.java | 4 +-
.../operators/windowing/WindowOperatorTest.java | 14 +-
.../tasks/DefaultTimeServiceProviderTest.java | 313 -------------------
.../runtime/tasks/StreamTaskTestHarness.java | 6 +-
.../tasks/SystemProcessingTimeServiceTest.java | 313 +++++++++++++++++++
.../KeyedOneInputStreamOperatorTestHarness.java | 8 +-
.../flink/streaming/util/MockContext.java | 23 --
.../util/OneInputStreamOperatorTestHarness.java | 43 ++-
.../streaming/util/WindowingTestHarness.java | 6 +-
.../runtime/StreamTaskTimerITCase.java | 10 +-
39 files changed, 1141 insertions(+), 1155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 36b5c5e..971d5f8 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
@@ -127,7 +127,7 @@ public class ContinuousFileMonitoringTest {
ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
reader.setOutputType(typeInfo, executionConfig);
- final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
+ final TestProcessingTimeService timeServiceProvider = new TestProcessingTimeService();
final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 5a5cade..6f8a739 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -285,7 +285,7 @@ public class BucketingSink<T>
private transient Clock clock;
- private transient TimeServiceProvider processingTimeService;
+ private transient ProcessingTimeService processingTimeService;
/**
* Creates a new {@code BucketingSink} that writes files to the given base directory.
@@ -324,7 +324,7 @@ public class BucketingSink<T>
refTruncate = reflectTruncate(fs);
processingTimeService =
- ((StreamingRuntimeContext) getRuntimeContext()).getTimeServiceProvider();
+ ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService();
long currentProcessingTime = processingTimeService.getCurrentProcessingTime();
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index eb01b78..065b54f 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
@@ -118,7 +118,7 @@ public abstract class AbstractFetcher<T, KPH> {
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
PeriodicWatermarkEmitter periodicEmitter =
- new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getTimeServiceProvider(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
+ new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
periodicEmitter.start();
}
}
@@ -466,7 +466,7 @@ public abstract class AbstractFetcher<T, KPH> {
private final SourceContext<?> emitter;
- private final TimeServiceProvider timerService;
+ private final ProcessingTimeService timerService;
private final long interval;
@@ -477,7 +477,7 @@ public abstract class AbstractFetcher<T, KPH> {
PeriodicWatermarkEmitter(
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
SourceContext<?> emitter,
- TimeServiceProvider timerService,
+ ProcessingTimeService timerService,
long autoWatermarkInterval)
{
this.allPartitions = checkNotNull(allPartitions);
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 7db6ba4..0782cb9 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,10 +25,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
@@ -128,7 +128,7 @@ public class AbstractFetcherTimestampsTest {
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock());
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index e1ec4cb..f16eacd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import java.io.Serializable;
import java.util.Collections;
@@ -53,7 +53,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
private final ExecutionConfig execConfig;
- private final TimeServiceProvider timeServiceProvider;
+ private final ProcessingTimeService timeServiceProvider;
public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig());
@@ -70,7 +70,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
int numberOfParallelSubtasks,
int indexOfThisSubtask,
ExecutionConfig execConfig,
- TimeServiceProvider timeServiceProvider) {
+ ProcessingTimeService timeServiceProvider) {
super(new MockStreamOperator(),
new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
@@ -188,7 +188,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
}
@Override
- public TimeServiceProvider getTimeServiceProvider() {
+ public ProcessingTimeService getProcessingTimeService() {
if (timeServiceProvider == null) {
throw new UnsupportedOperationException();
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 769cb6f..be22677 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -107,7 +107,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.readerContext = StreamSourceContexts.getSourceContext(
- timeCharacteristic, getTimerService(), checkpointLock, output, watermarkInterval);
+ timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval);
// and initialize the split reading thread
this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, readerState);
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9184e93..b789c95 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -51,11 +51,12 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,7 +119,7 @@ public abstract class AbstractStreamOperator<OUT>
/** Keyed state store view on the keyed backend */
private transient DefaultKeyedStateStore keyedStateStore;
-
+
/** Operator state backend / store */
private transient OperatorStateBackend operatorStateBackend;
@@ -246,7 +247,7 @@ public abstract class AbstractStreamOperator<OUT>
keySerializer,
container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
subTaskKeyGroupRange);
-
+
this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
}
@@ -396,11 +397,11 @@ public abstract class AbstractStreamOperator<OUT>
}
/**
- * Returns the {@link TimeServiceProvider} responsible for getting the current
+ * Returns the {@link ProcessingTimeService} responsible for getting the current
* processing time and registering timers.
*/
- protected TimeServiceProvider getTimerService() {
- return container.getTimerService();
+ protected ProcessingTimeService getProcessingTimeService() {
+ return container.getProcessingTimeService();
}
/**
@@ -421,9 +422,9 @@ public abstract class AbstractStreamOperator<OUT>
*/
@SuppressWarnings("unchecked")
protected <S extends State, N> S getPartitionedState(
- N namespace, TypeSerializer<N> namespaceSerializer,
+ N namespace, TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, ?> stateDescriptor) throws Exception {
-
+
if (keyedStateStore != null) {
return keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index a07e6b7..5a16db0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -69,7 +69,7 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.ctx = StreamSourceContexts.getSourceContext(
- timeCharacteristic, getTimerService(), lockingObject, collector, watermarkInterval);
+ timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval);
try {
userFunction.run(ctx);
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index d0c4e15..01ae55c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import java.util.concurrent.ScheduledFuture;
@@ -42,7 +42,7 @@ public class StreamSourceContexts {
* </ul>
* */
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
- TimeCharacteristic timeCharacteristic, TimeServiceProvider timeService,
+ TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService,
Object checkpointLock, Output<StreamRecord<OUT>> output, long watermarkInterval) {
final SourceFunction.SourceContext<OUT> ctx;
@@ -51,7 +51,7 @@ public class StreamSourceContexts {
ctx = new ManualWatermarkContext<>(checkpointLock, output);
break;
case IngestionTime:
- ctx = new AutomaticWatermarkContext<>(timeService, checkpointLock, output, watermarkInterval);
+ ctx = new AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, watermarkInterval);
break;
case ProcessingTime:
ctx = new NonTimestampContext<>(checkpointLock, output);
@@ -111,7 +111,7 @@ public class StreamSourceContexts {
*/
private static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
- private final TimeServiceProvider timeService;
+ private final ProcessingTimeService timeService;
private final Object lock;
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
@@ -122,7 +122,7 @@ public class StreamSourceContexts {
private volatile long nextWatermarkTime;
private AutomaticWatermarkContext(
- final TimeServiceProvider timeService,
+ final ProcessingTimeService timeService,
final Object checkpointLock,
final Output<StreamRecord<T>> output,
final long watermarkInterval) {
@@ -201,12 +201,12 @@ public class StreamSourceContexts {
private class WatermarkEmittingTask implements Triggerable {
- private final TimeServiceProvider timeService;
+ private final ProcessingTimeService timeService;
private final Object lock;
private final Output<StreamRecord<T>> output;
private WatermarkEmittingTask(
- TimeServiceProvider timeService,
+ ProcessingTimeService timeService,
Object checkpointLock,
Output<StreamRecord<T>> output) {
this.timeService = timeService;
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index cd0489f..fc9e39e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import java.util.List;
import java.util.Map;
@@ -77,8 +77,8 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
return taskEnvironment.getInputSplitProvider();
}
- public TimeServiceProvider getTimeServiceProvider() {
- return operator.getTimerService();
+ public ProcessingTimeService getProcessingTimeService() {
+ return operator.getProcessingTimeService();
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index c92ff34..0798ed4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -54,8 +54,8 @@ public class ExtractTimestampsOperator<T>
super.open();
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
- long now = getTimerService().getCurrentProcessingTime();
- getTimerService().registerTimer(now + watermarkInterval, this);
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
currentWatermark = Long.MIN_VALUE;
}
@@ -81,8 +81,8 @@ public class ExtractTimestampsOperator<T>
output.emitWatermark(new Watermark(currentWatermark));
}
- long now = getTimerService().getCurrentProcessingTime();
- getTimerService().registerTimer(now + watermarkInterval, this);
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index f791723..b1402ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -54,8 +54,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
- long now = getTimerService().getCurrentProcessingTime();
- getTimerService().registerTimer(now + watermarkInterval, this);
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
@@ -77,8 +77,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
output.emitWatermark(newWatermark);
}
- long now = getTimerService().getCurrentProcessingTime();
- getTimerService().registerTimer(now + watermarkInterval, this);
+ long now = getProcessingTimeService().getCurrentProcessingTime();
+ getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index b39b760..d331d4d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -125,7 +125,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
// decide when to first compute the window and when to slide it
// the values should align with the start of time (that is, the UNIX epoch, not the big bang)
- final long now = getTimerService().getCurrentProcessingTime();
+ final long now = getProcessingTimeService().getCurrentProcessingTime();
nextEvaluationTime = now + windowSlide - (now % windowSlide);
nextSlideTime = now + paneSize - (now % paneSize);
@@ -166,7 +166,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
}
// make sure the first window happens
- getTimerService().registerTimer(firstTriggerTime, this);
+ getProcessingTimeService().registerTimer(firstTriggerTime, this);
}
@Override
@@ -230,7 +230,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
}
long nextTriggerTime = Math.min(nextEvaluationTime, nextSlideTime);
- getTimerService().registerTimer(nextTriggerTime, this);
+ getProcessingTimeService().registerTimer(nextTriggerTime, this);
}
private void computeWindow(long timestamp) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 6609e4d..141b5b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -307,7 +307,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
if (timer != null) {
- nextTimer = getTimerService().registerTimer(timer.timestamp, this);
+ nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 4d8f655..459c679 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -223,7 +223,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override
public long getCurrentProcessingTime() {
- return WindowOperator.this.getTimerService().getCurrentProcessingTime();
+ return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
}
};
@@ -233,7 +233,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// re-register the restored timers (if any)
if (processingTimeTimersQueue.size() > 0) {
- nextTimer = getTimerService().registerTimer(processingTimeTimersQueue.peek().timestamp, this);
+ nextTimer = getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp, this);
}
}
@@ -495,7 +495,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
if (timer != null) {
- nextTimer = getTimerService().registerTimer(timer.timestamp, this);
+ nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
}
}
@@ -697,7 +697,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public long getCurrentProcessingTime() {
- return WindowOperator.this.getTimerService().getCurrentProcessingTime();
+ return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
}
@Override
@@ -717,7 +717,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
if (nextTimer != null) {
nextTimer.cancel(false);
}
- nextTimer = getTimerService().registerTimer(time, WindowOperator.this);
+ nextTimer = getProcessingTimeService().registerTimer(time, WindowOperator.this);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
deleted file mode 100644
index d2c743f..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import javax.annotation.Nonnull;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TimeServiceProvider} which assigns as current processing time the result of calling
- * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
- */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
-
- private static final int STATUS_ALIVE = 0;
- private static final int STATUS_QUIESCED = 1;
- private static final int STATUS_SHUTDOWN = 2;
-
- // ------------------------------------------------------------------------
-
- /** The containing task that owns this time service provider. */
- private final AsyncExceptionHandler task;
-
- /** The lock that timers acquire upon triggering */
- private final Object checkpointLock;
-
- /** The executor service that schedules and calls the triggers of this task*/
- private final ScheduledThreadPoolExecutor timerService;
-
- private final AtomicInteger status;
-
-
- public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) {
- this(failureHandler, checkpointLock, null);
- }
-
- public DefaultTimeServiceProvider(
- AsyncExceptionHandler task,
- Object checkpointLock,
- ThreadFactory threadFactory) {
-
- this.task = checkNotNull(task);
- this.checkpointLock = checkNotNull(checkpointLock);
-
- this.status = new AtomicInteger(STATUS_ALIVE);
-
- if (threadFactory == null) {
- this.timerService = new ScheduledThreadPoolExecutor(1);
- } else {
- this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
- }
-
- // tasks should be removed if the future is canceled
- this.timerService.setRemoveOnCancelPolicy(true);
-
- // make sure shutdown removes all pending tasks
- this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- }
-
- @Override
- public long getCurrentProcessingTime() {
- return System.currentTimeMillis();
- }
-
- @Override
- public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
- long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
-
- // we directly try to register the timer and only react to the status on exception
- // that way we save unnecessary volatile accesses for each timer
- try {
- return timerService.schedule(
- new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
- }
- catch (RejectedExecutionException e) {
- final int status = this.status.get();
- if (status == STATUS_QUIESCED) {
- return new NeverCompleteFuture(delay);
- }
- else if (status == STATUS_SHUTDOWN) {
- throw new IllegalStateException("Timer service is shut down");
- }
- else {
- // something else happened, so propagate the exception
- throw e;
- }
- }
- }
-
- @Override
- public boolean isTerminated() {
- return status.get() == STATUS_SHUTDOWN;
- }
-
- @Override
- public void quiesceAndAwaitPending() throws InterruptedException {
- if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
- timerService.shutdown();
-
- // await forever (almost)
- timerService.awaitTermination(365, TimeUnit.DAYS);
- }
- }
-
- @Override
- public void shutdownService() {
- if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) ||
- status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
- {
- timerService.shutdownNow();
- }
- }
-
- // safety net to destroy the thread pool
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- timerService.shutdownNow();
- }
-
- @VisibleForTesting
- int getNumTasksScheduled() {
- BlockingQueue<?> queue = timerService.getQueue();
- if (queue == null) {
- return 0;
- } else {
- return queue.size();
- }
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Internal task that is invoked by the timer service and triggers the target.
- */
- private static final class TriggerTask implements Runnable {
-
- private final Object lock;
- private final Triggerable target;
- private final long timestamp;
- private final AsyncExceptionHandler exceptionHandler;
-
- TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) {
- this.exceptionHandler = exceptionHandler;
- this.lock = lock;
- this.target = target;
- this.timestamp = timestamp;
- }
-
- @Override
- public void run() {
- synchronized (lock) {
- try {
- target.trigger(timestamp);
- } catch (Throwable t) {
- TimerException asyncException = new TimerException(t);
- exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
-
- private final Object lock = new Object();
-
- private final long delayMillis;
-
- private volatile boolean canceled;
-
-
- private NeverCompleteFuture(long delayMillis) {
- this.delayMillis = delayMillis;
- }
-
- @Override
- public long getDelay(@Nonnull TimeUnit unit) {
- return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public int compareTo(@Nonnull Delayed o) {
- long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
- return Long.compare(this.delayMillis, otherMillis);
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- synchronized (lock) {
- canceled = true;
- lock.notifyAll();
- }
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- return canceled;
- }
-
- @Override
- public boolean isDone() {
- return false;
- }
-
- @Override
- public Object get() throws InterruptedException {
- synchronized (lock) {
- while (!canceled) {
- lock.wait();
- }
- }
- throw new CancellationException();
- }
-
- @Override
- public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
- synchronized (lock) {
- while (!canceled) {
- unit.timedWait(lock, timeout);
- }
-
- if (canceled) {
- throw new CancellationException();
- } else {
- throw new TimeoutException();
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
new file mode 100644
index 0000000..15c3ebb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Defines the current processing time and handles all related actions,
+ * such as register timers for tasks to be executed in the future.
+ *
+ * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of
+ * whether the timer service has been shut down.
+ *
+ * <p>The registration of timers follows a life cycle of three phases:
+ * <ol>
+ * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li>
+ * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
+ * {@link #registerTimer(long, Triggerable)} will not register any further timers, and will
+ * return a "dummy" future as a result. This is used for clean shutdown, where currently firing
+ * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li>
+ * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)}
+ * will result in a hard exception.</li>
+ * </ol>
+ */
+public abstract class ProcessingTimeService {
+
+ /**
+ * Returns the current processing time.
+ */
+ public abstract long getCurrentProcessingTime();
+
+ /**
+ * Registers a task to be executed when (processing) time is {@code timestamp}.
+ *
+ * @param timestamp Time when the task is to be executed (in processing time)
+ * @param target The task to be executed
+ *
+ * @return The future that represents the scheduled task. This always returns some future,
+ * even if the timer was shut down
+ */
+ public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target);
+
+ /**
+ * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
+ */
+ public abstract boolean isTerminated();
+
+ /**
+ * This method puts the service into a state where it does not register new timers, but
+ * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future.
+ * Furthermore, the method clears all not yet started timers, and awaits the completion
+ * of currently executing timers.
+ *
+ * <p>This method can be used to cleanly shut down the timer service. The using components
+ * will not notice that the service is shut down (as for example via exceptions when registering
+ * a new timer), but the service will simply not fire any timer any more.
+ */
+ public abstract void quiesceAndAwaitPending() throws InterruptedException;
+
+ /**
+ * Shuts down and clean up the timer service provider hard and immediately. This does not wait
+ * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)}
+ * will result in a hard exception.
+ */
+ public abstract void shutdownService();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 77efc7b..905782b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -146,11 +146,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private AbstractKeyedStateBackend<?> keyedStateBackend;
/**
- * The internal {@link TimeServiceProvider} used to define the current
+ * The internal {@link ProcessingTimeService} used to define the current
* processing time (default = {@code System.currentTimeMillis()}) and
* register timers for tasks to be executed in the future.
*/
- private TimeServiceProvider timerService;
+ private ProcessingTimeService timerService;
/** The map of user-defined accumulators of this task */
private Map<String, Accumulator<?, ?>> accumulatorMap;
@@ -190,13 +190,13 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// ------------------------------------------------------------------------
/**
- * Allows the user to specify his own {@link TimeServiceProvider TimerServiceProvider}.
- * By default a {@link DefaultTimeServiceProvider DefaultTimerService} is going to be provided.
+ * Allows the user to specify his own {@link ProcessingTimeService TimerServiceProvider}.
+ * By default a {@link SystemProcessingTimeService DefaultTimerService} is going to be provided.
* Changing it can be useful for testing processing time functionality, such as
* {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}
* and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger Triggers}.
* */
- public void setTimeService(TimeServiceProvider timeProvider) {
+ public void setProcessingTimeService(ProcessingTimeService timeProvider) {
if (timeProvider == null) {
throw new RuntimeException("The timeProvider cannot be set to null.");
}
@@ -224,7 +224,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
ThreadFactory timerThreadFactory =
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
- timerService = new DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory);
+ timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
}
operatorChain = new OperatorChain<>(this, getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -765,10 +765,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
/**
- * Returns the {@link TimeServiceProvider} responsible for telling the current
+ * Returns the {@link ProcessingTimeService} responsible for telling the current
* processing time and registering timers.
*/
- public TimeServiceProvider getTimerService() {
+ public ProcessingTimeService getProcessingTimeService() {
if (timerService == null) {
throw new IllegalStateException("The timer service has not been initialized.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
new file mode 100644
index 0000000..3fd4202
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ProcessingTimeService} which assigns as current processing time the result of calling
+ * {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
+ */
+public class SystemProcessingTimeService extends ProcessingTimeService {
+
+ private static final int STATUS_ALIVE = 0;
+ private static final int STATUS_QUIESCED = 1;
+ private static final int STATUS_SHUTDOWN = 2;
+
+ // ------------------------------------------------------------------------
+
+ /** The containing task that owns this time service provider. */
+ private final AsyncExceptionHandler task;
+
+ /** The lock that timers acquire upon triggering */
+ private final Object checkpointLock;
+
+ /** The executor service that schedules and calls the triggers of this task*/
+ private final ScheduledThreadPoolExecutor timerService;
+
+ private final AtomicInteger status;
+
+
+ public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
+ this(failureHandler, checkpointLock, null);
+ }
+
+ public SystemProcessingTimeService(
+ AsyncExceptionHandler task,
+ Object checkpointLock,
+ ThreadFactory threadFactory) {
+
+ this.task = checkNotNull(task);
+ this.checkpointLock = checkNotNull(checkpointLock);
+
+ this.status = new AtomicInteger(STATUS_ALIVE);
+
+ if (threadFactory == null) {
+ this.timerService = new ScheduledThreadPoolExecutor(1);
+ } else {
+ this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
+ }
+
+ // tasks should be removed if the future is canceled
+ this.timerService.setRemoveOnCancelPolicy(true);
+
+ // make sure shutdown removes all pending tasks
+ this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+ long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
+
+ // we directly try to register the timer and only react to the status on exception
+ // that way we save unnecessary volatile accesses for each timer
+ try {
+ return timerService.schedule(
+ new TriggerTask(task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+ }
+ catch (RejectedExecutionException e) {
+ final int status = this.status.get();
+ if (status == STATUS_QUIESCED) {
+ return new NeverCompleteFuture(delay);
+ }
+ else if (status == STATUS_SHUTDOWN) {
+ throw new IllegalStateException("Timer service is shut down");
+ }
+ else {
+ // something else happened, so propagate the exception
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return status.get() == STATUS_SHUTDOWN;
+ }
+
+ @Override
+ public void quiesceAndAwaitPending() throws InterruptedException {
+ if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
+ timerService.shutdown();
+
+ // await forever (almost)
+ timerService.awaitTermination(365, TimeUnit.DAYS);
+ }
+ }
+
+ @Override
+ public void shutdownService() {
+ if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) ||
+ status.compareAndSet(STATUS_QUIESCED, STATUS_SHUTDOWN))
+ {
+ timerService.shutdownNow();
+ }
+ }
+
+ // safety net to destroy the thread pool
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ timerService.shutdownNow();
+ }
+
+ @VisibleForTesting
+ int getNumTasksScheduled() {
+ BlockingQueue<?> queue = timerService.getQueue();
+ if (queue == null) {
+ return 0;
+ } else {
+ return queue.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Internal task that is invoked by the timer service and triggers the target.
+ */
+ private static final class TriggerTask implements Runnable {
+
+ private final Object lock;
+ private final Triggerable target;
+ private final long timestamp;
+ private final AsyncExceptionHandler exceptionHandler;
+
+ TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target, long timestamp) {
+ this.exceptionHandler = exceptionHandler;
+ this.lock = lock;
+ this.target = target;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public void run() {
+ synchronized (lock) {
+ try {
+ target.trigger(timestamp);
+ } catch (Throwable t) {
+ TimerException asyncException = new TimerException(t);
+ exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
+ }
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
+
+ private final Object lock = new Object();
+
+ private final long delayMillis;
+
+ private volatile boolean canceled;
+
+
+ private NeverCompleteFuture(long delayMillis) {
+ this.delayMillis = delayMillis;
+ }
+
+ @Override
+ public long getDelay(@Nonnull TimeUnit unit) {
+ return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public int compareTo(@Nonnull Delayed o) {
+ long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
+ return Long.compare(this.delayMillis, otherMillis);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized (lock) {
+ canceled = true;
+ lock.notifyAll();
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return canceled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return false;
+ }
+
+ @Override
+ public Object get() throws InterruptedException {
+ synchronized (lock) {
+ while (!canceled) {
+ lock.wait();
+ }
+ }
+ throw new CancellationException();
+ }
+
+ @Override
+ public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
+ synchronized (lock) {
+ while (!canceled) {
+ unit.timedWait(lock, timeout);
+ }
+
+ if (canceled) {
+ throw new CancellationException();
+ } else {
+ throw new TimeoutException();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
new file mode 100644
index 0000000..d2bf133
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is a {@link ProcessingTimeService} used <b>strictly for testing</b> the
+ * processing time functionality.
+ * */
+public class TestProcessingTimeService extends ProcessingTimeService {
+
+ private volatile long currentTime = 0;
+
+ private volatile boolean isTerminated;
+ private volatile boolean isQuiesced;
+
+ // sorts the timers by timestamp so that they are processed in the correct order.
+ private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
+
+
+ public void setCurrentTime(long timestamp) throws Exception {
+ this.currentTime = timestamp;
+
+ if (!isQuiesced) {
+ // decide which timers to fire and put them in a list
+ // we do not fire them here to be able to accommodate timers
+ // that register other timers.
+
+ Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
+ List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
+ while (it.hasNext()) {
+ Map.Entry<Long, List<Triggerable>> t = it.next();
+ if (t.getKey() <= this.currentTime) {
+ toRun.add(t);
+ it.remove();
+ }
+ }
+
+ // now do the actual firing.
+ for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+ long now = tasks.getKey();
+ for (Triggerable task: tasks.getValue()) {
+ task.trigger(now);
+ }
+ }
+ }
+ }
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return currentTime;
+ }
+
+ @Override
+ public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+ if (isTerminated) {
+ throw new IllegalStateException("terminated");
+ }
+ if (isQuiesced) {
+ return new DummyFuture();
+ }
+
+ if (timestamp <= currentTime) {
+ try {
+ target.trigger(timestamp);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ List<Triggerable> tasks = registeredTasks.get(timestamp);
+ if (tasks == null) {
+ tasks = new ArrayList<>();
+ registeredTasks.put(timestamp, tasks);
+ }
+ tasks.add(target);
+
+ return new DummyFuture();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return isTerminated;
+ }
+
+ @Override
+ public void quiesceAndAwaitPending() {
+ if (!isTerminated) {
+ isQuiesced = true;
+ registeredTasks.clear();
+ }
+ }
+
+ @Override
+ public void shutdownService() {
+ this.isTerminated = true;
+ }
+
+ public int getNumRegisteredTimers() {
+ int count = 0;
+ for (List<Triggerable> tasks: registeredTasks.values()) {
+ count += tasks.size();
+ }
+ return count;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class DummyFuture implements ScheduledFuture<Object> {
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDone() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
deleted file mode 100644
index 9eb6cd1..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the
- * processing time functionality.
- * */
-public class TestTimeServiceProvider extends TimeServiceProvider {
-
- private volatile long currentTime = 0;
-
- private volatile boolean isTerminated;
- private volatile boolean isQuiesced;
-
- // sorts the timers by timestamp so that they are processed in the correct order.
- private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
-
-
- public void setCurrentTime(long timestamp) throws Exception {
- this.currentTime = timestamp;
-
- if (!isQuiesced) {
- // decide which timers to fire and put them in a list
- // we do not fire them here to be able to accommodate timers
- // that register other timers.
-
- Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
- List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
- while (it.hasNext()) {
- Map.Entry<Long, List<Triggerable>> t = it.next();
- if (t.getKey() <= this.currentTime) {
- toRun.add(t);
- it.remove();
- }
- }
-
- // now do the actual firing.
- for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
- long now = tasks.getKey();
- for (Triggerable task: tasks.getValue()) {
- task.trigger(now);
- }
- }
- }
- }
-
- @Override
- public long getCurrentProcessingTime() {
- return currentTime;
- }
-
- @Override
- public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
- if (isTerminated) {
- throw new IllegalStateException("terminated");
- }
- if (isQuiesced) {
- return new DummyFuture();
- }
-
- if (timestamp <= currentTime) {
- try {
- target.trigger(timestamp);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- List<Triggerable> tasks = registeredTasks.get(timestamp);
- if (tasks == null) {
- tasks = new ArrayList<>();
- registeredTasks.put(timestamp, tasks);
- }
- tasks.add(target);
-
- return new DummyFuture();
- }
-
- @Override
- public boolean isTerminated() {
- return isTerminated;
- }
-
- @Override
- public void quiesceAndAwaitPending() {
- if (!isTerminated) {
- isQuiesced = true;
- registeredTasks.clear();
- }
- }
-
- @Override
- public void shutdownService() {
- this.isTerminated = true;
- }
-
- public int getNumRegisteredTimers() {
- int count = 0;
- for (List<Triggerable> tasks: registeredTasks.values()) {
- count += tasks.size();
- }
- return count;
- }
-
- // ------------------------------------------------------------------------
-
- private static class DummyFuture implements ScheduledFuture<Object> {
-
- @Override
- public long getDelay(TimeUnit unit) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public int compareTo(Delayed o) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean isDone() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object get() throws InterruptedException, ExecutionException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- throw new UnsupportedOperationException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
deleted file mode 100644
index afa6f35..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.concurrent.ScheduledFuture;
-
-/**
- * Defines the current processing time and handles all related actions,
- * such as register timers for tasks to be executed in the future.
- *
- * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless of
- * whether the timer service has been shut down.
- *
- * <p>The registration of timers follows a life cycle of three phases:
- * <ol>
- * <li>In the initial state, it accepts timer registrations and triggers when the time is reached.</li>
- * <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
- * {@link #registerTimer(long, Triggerable)} will not register any further timers, and will
- * return a "dummy" future as a result. This is used for clean shutdown, where currently firing
- * timers are waited for and no future timers can be scheduled, without causing hard exceptions.</li>
- * <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long, Triggerable)}
- * will result in a hard exception.</li>
- * </ol>
- */
-public abstract class TimeServiceProvider {
-
- /**
- * Returns the current processing time.
- */
- public abstract long getCurrentProcessingTime();
-
- /**
- * Registers a task to be executed when (processing) time is {@code timestamp}.
- *
- * @param timestamp Time when the task is to be executed (in processing time)
- * @param target The task to be executed
- *
- * @return The future that represents the scheduled task. This always returns some future,
- * even if the timer was shut down
- */
- public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target);
-
- /**
- * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt> otherwise.
- */
- public abstract boolean isTerminated();
-
- /**
- * This method puts the service into a state where it does not register new timers, but
- * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future.
- * Furthermore, the method clears all not yet started timers, and awaits the completion
- * of currently executing timers.
- *
- * <p>This method can be used to cleanly shut down the timer service. The using components
- * will not notice that the service is shut down (as for example via exceptions when registering
- * a new timer), but the service will simply not fire any timer any more.
- */
- public abstract void quiesceAndAwaitPending() throws InterruptedException;
-
- /**
- * Shuts down and clean up the timer service provider hard and immediately. This does not wait
- * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)}
- * will result in a hard exception.
- */
- public abstract void shutdownService();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 42087b4..f87b5ef 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -38,8 +38,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -67,7 +67,7 @@ public class StreamSourceOperatorTest {
final List<StreamElement> output = new ArrayList<>();
- setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
operator.run(new Object(), new CollectorOutput<String>(output));
assertEquals(1, output.size());
@@ -84,7 +84,7 @@ public class StreamSourceOperatorTest {
new StreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
operator.cancel();
// run and exit
@@ -104,7 +104,7 @@ public class StreamSourceOperatorTest {
new StreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
// trigger an async cancel in a bit
new Thread("canceler") {
@@ -137,7 +137,7 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
operator.stop();
// run and stop
@@ -156,7 +156,7 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0);
// trigger an async cancel in a bit
new Thread("canceler") {
@@ -189,7 +189,7 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>());
// emit latency marks every 10 milliseconds.
- setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10, null);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10);
// trigger an async cancel in a bit
new Thread("canceler") {
@@ -225,15 +225,15 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>());
long watermarkInterval = 10;
- TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
- timeProvider.setCurrentTime(0);
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ processingTimeService.setCurrentTime(0);
- setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, timeProvider);
+ setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, processingTimeService);
final List<StreamElement> output = new ArrayList<>();
StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
- operator.getContainingTask().getTimerService(),
+ operator.getContainingTask().getProcessingTimeService(),
operator.getContainingTask().getCheckpointLock(),
new CollectorOutput<String>(output),
operator.getExecutionConfig().getAutoWatermarkInterval());
@@ -243,7 +243,7 @@ public class StreamSourceOperatorTest {
// going to be aligned with the watermark interval.
for (long i = 1; i < 100; i += watermarkInterval) {
- timeProvider.setCurrentTime(i);
+ processingTimeService.setCurrentTime(i);
}
assertTrue(output.size() == 9);
@@ -257,13 +257,21 @@ public class StreamSourceOperatorTest {
}
// ------------------------------------------------------------------------
-
+
+ @SuppressWarnings("unchecked")
+ private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
+ TimeCharacteristic timeChar,
+ long watermarkInterval,
+ long latencyMarkInterval) {
+ setupSourceOperator(operator, timeChar, watermarkInterval, latencyMarkInterval, new TestProcessingTimeService());
+ }
+
@SuppressWarnings("unchecked")
private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
TimeCharacteristic timeChar,
long watermarkInterval,
long latencyMarkInterval,
- final TimeServiceProvider timeProvider) {
+ final ProcessingTimeService timeProvider) {
ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setAutoWatermarkInterval(watermarkInterval);
@@ -284,12 +292,15 @@ public class StreamSourceOperatorTest {
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
- doAnswer(new Answer<TimeServiceProvider>() {
+ doAnswer(new Answer<ProcessingTimeService>() {
@Override
- public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
+ public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+ if (timeProvider == null) {
+ throw new RuntimeException("The time provider is null.");
+ }
return timeProvider;
}
- }).when(mockTask).getTimerService();
+ }).when(mockTask).getProcessingTimeService();
operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 98058e8..fb1fab5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -64,7 +64,7 @@ public class StreamTaskTimerTest {
testHarness.waitForTaskRunning();
// first one spawns thread
- mapTask.getTimerService().registerTimer(System.currentTimeMillis(), new Triggerable() {
+ mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new Triggerable() {
@Override
public void trigger(long timestamp) {
}
@@ -106,7 +106,7 @@ public class StreamTaskTimerTest {
final long t3 = System.currentTimeMillis() + 100;
final long t4 = System.currentTimeMillis() + 200;
- TimeServiceProvider timeService = mapTask.getTimerService();
+ ProcessingTimeService timeService = mapTask.getProcessingTimeService();
timeService.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
timeService.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
timeService.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
new file mode 100644
index 0000000..9c2cee3
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TestProcessingTimeServiceTest {
+
+ @Test
+ public void testCustomTimeServiceProvider() throws Throwable {
+ TestProcessingTimeService tp = new TestProcessingTimeService();
+
+ final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+ mapTask.setProcessingTimeService(tp);
+
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+ mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+
+ StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
+ streamConfig.setStreamOperator(mapOperator);
+
+ testHarness.invoke();
+
+ assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 0);
+
+ tp.setCurrentTime(11);
+ assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 11);
+
+ tp.setCurrentTime(15);
+ tp.setCurrentTime(16);
+ assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 16);
+
+ // register 2 tasks
+ mapTask.getProcessingTimeService().registerTimer(30, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+
+ }
+ });
+
+ mapTask.getProcessingTimeService().registerTimer(40, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+
+ }
+ });
+
+ assertEquals(2, tp.getNumRegisteredTimers());
+
+ tp.setCurrentTime(35);
+ assertEquals(1, tp.getNumRegisteredTimers());
+
+ tp.setCurrentTime(40);
+ assertEquals(0, tp.getNumRegisteredTimers());
+
+ tp.shutdownService();
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
+
+ private final AtomicReference<Throwable> errorReference;
+
+ public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
+ this.errorReference = errorReference;
+ }
+
+ @Override
+ public void handleAsyncException(String message, Throwable exception) {
+ errorReference.compareAndSet(null, exception);
+ }
+ }
+}