You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/10/05 22:16:52 UTC
[02/17] flink git commit: [hotfix] Various code cleanups around time
service and asynchronous exceptions
[hotfix] Various code cleanups around time service and asynchronous exceptions
- DefaultTimeServiceProvider now owns scheduled executor
- Enforce that an asynchronous exception handler is always set
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954ef08f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954ef08f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954ef08f
Branch: refs/heads/master
Commit: 954ef08f374d7e7c1f2b469201b1ea05c6376b33
Parents: 8ff451b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Oct 4 16:15:05 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Oct 5 19:36:13 2016 +0200
----------------------------------------------------------------------
.../AbstractFetcherTimestampsTest.java | 122 +++++++++++--------
.../kafka/testutils/MockRuntimeContext.java | 40 +++---
.../api/operators/StreamSourceContexts.java | 6 +-
.../runtime/io/StreamInputProcessor.java | 4 +-
.../runtime/tasks/AsyncExceptionHandler.java | 1 +
.../runtime/tasks/AsynchronousException.java | 11 +-
.../tasks/DefaultTimeServiceProvider.java | 57 +++++----
.../runtime/tasks/OneInputStreamTask.java | 2 +-
.../streaming/runtime/tasks/StreamTask.java | 18 +--
.../runtime/tasks/TestTimeServiceProvider.java | 2 +-
.../runtime/tasks/TwoInputStreamTask.java | 2 +-
.../runtime/operators/TimeProviderTest.java | 45 +++++--
...AlignedProcessingTimeWindowOperatorTest.java | 84 ++++++++-----
...AlignedProcessingTimeWindowOperatorTest.java | 101 +++++++++------
.../operators/windowing/NoOpTimerService.java | 49 ++++++++
.../util/OneInputStreamOperatorTestHarness.java | 6 +-
16 files changed, 347 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 8c68fbe..c3ba7b7 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,7 +25,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.util.SerializedValue;
import org.junit.Test;
@@ -34,6 +37,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
@@ -110,6 +114,7 @@ public class AbstractFetcherTimestampsTest {
@Test
public void testPeriodicWatermarks() throws Exception {
+
ExecutionConfig config = new ExecutionConfig();
config.setAutoWatermarkInterval(10);
@@ -120,61 +125,70 @@ public class AbstractFetcherTimestampsTest {
TestSourceContext<Long> sourceContext = new TestSourceContext<>();
- TestFetcher<Long> fetcher = new TestFetcher<>(
- sourceContext, originalPartitions,
- new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
- null, new MockRuntimeContext(17, 3, config, sourceContext.getCheckpointLock()));
-
- final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
- final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
- final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
-
- // elements generate a watermark if the timestamp is a multiple of three
-
- // elements for partition 1
- fetcher.emitRecord(1L, part1, 1L);
- fetcher.emitRecord(2L, part1, 2L);
- fetcher.emitRecord(3L, part1, 3L);
- assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
-
- // elements for partition 2
- fetcher.emitRecord(12L, part2, 1L);
- assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock());
- // elements for partition 3
- fetcher.emitRecord(101L, part3, 1L);
- fetcher.emitRecord(102L, part3, 2L);
- assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-
- // now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
- assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 3
- fetcher.emitRecord(1003L, part3, 3L);
- fetcher.emitRecord(1004L, part3, 4L);
- fetcher.emitRecord(1005L, part3, 5L);
- assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-
- // advance partition 1 beyond partition 2 - this bumps the watermark
- fetcher.emitRecord(30L, part1, 4L);
- assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
- assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
-
- // this blocks until the periodic thread emitted the watermark
- assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-
- // advance partition 2 again - this bumps the watermark
- fetcher.emitRecord(13L, part2, 2L);
- fetcher.emitRecord(14L, part2, 3L);
- fetcher.emitRecord(15L, part2, 3L);
-
- // this blocks until the periodic thread emitted the watermark
- long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
- assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+ try {
+ TestFetcher<Long> fetcher = new TestFetcher<>(
+ sourceContext, originalPartitions,
+ new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+ null, new MockRuntimeContext(17, 3, config, timerService));
+
+ final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+ final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+ final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+ // elements generate a watermark if the timestamp is a multiple of three
+
+ // elements for partition 1
+ fetcher.emitRecord(1L, part1, 1L);
+ fetcher.emitRecord(2L, part1, 2L);
+ fetcher.emitRecord(3L, part1, 3L);
+ assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 2
+ fetcher.emitRecord(12L, part2, 1L);
+ assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+ // elements for partition 3
+ fetcher.emitRecord(101L, part3, 1L);
+ fetcher.emitRecord(102L, part3, 2L);
+ assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+ // now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+ assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 3
+ fetcher.emitRecord(1003L, part3, 3L);
+ fetcher.emitRecord(1004L, part3, 4L);
+ fetcher.emitRecord(1005L, part3, 5L);
+ assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+ // advance partition 1 beyond partition 2 - this bumps the watermark
+ fetcher.emitRecord(30L, part1, 4L);
+ assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+ assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+
+ // this blocks until the periodic thread emitted the watermark
+ assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+ // advance partition 2 again - this bumps the watermark
+ fetcher.emitRecord(13L, part2, 2L);
+ fetcher.emitRecord(14L, part2, 3L);
+ fetcher.emitRecord(15L, part2, 3L);
+
+ // this blocks until the periodic thread emitted the watermark
+ long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+ assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+ }
+ finally {
+ timerService.shutdownService();
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 5be4195..e1ec4cb 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -32,45 +32,46 @@ import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
@SuppressWarnings("deprecation")
public class MockRuntimeContext extends StreamingRuntimeContext {
private final int numberOfParallelSubtasks;
private final int indexOfThisSubtask;
-
- private final ExecutionConfig execConfig;
- private final TimeServiceProvider timerService;
+ private final ExecutionConfig execConfig;
+ private final TimeServiceProvider timeServiceProvider;
+
public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
- this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), new Object());
+ this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig());
}
public MockRuntimeContext(
- int numberOfParallelSubtasks,
- int indexOfThisSubtask,
- ExecutionConfig execConfig,
- Object checkpointLock) {
-
+ int numberOfParallelSubtasks,
+ int indexOfThisSubtask,
+ ExecutionConfig execConfig) {
+ this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, null);
+ }
+
+ public MockRuntimeContext(
+ int numberOfParallelSubtasks,
+ int indexOfThisSubtask,
+ ExecutionConfig execConfig,
+ TimeServiceProvider timeServiceProvider) {
+
super(new MockStreamOperator(),
new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
Collections.<String, Accumulator<?, ?>>emptyMap());
@@ -78,8 +79,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.indexOfThisSubtask = indexOfThisSubtask;
this.execConfig = execConfig;
- this.timerService = DefaultTimeServiceProvider.
- createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock);
+ this.timeServiceProvider = timeServiceProvider;
}
@Override
@@ -189,7 +189,11 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
@Override
public TimeServiceProvider getTimeServiceProvider() {
- return timerService;
+ if (timeServiceProvider == null) {
+ throw new UnsupportedOperationException();
+ } else {
+ return timeServiceProvider;
+ }
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index abaf4e7..a290deb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -35,9 +35,9 @@ public class StreamSourceContexts {
* Depending on the {@link TimeCharacteristic}, this method will return the adequate
* {@link org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. That is:
* <ul>
- * <li> {@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}
- * <li> {@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}
- * <li> {@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}
+ * <li>{@link TimeCharacteristic#IngestionTime} = {@link AutomaticWatermarkContext}</li>
+ * <li>{@link TimeCharacteristic#ProcessingTime} = {@link NonTimestampContext}</li>
+ * <li>{@link TimeCharacteristic#EventTime} = {@link ManualWatermarkContext}</li>
* </ul>
* */
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 85e9297..2dbc6d4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -83,7 +83,9 @@ public class StreamInputProcessor<IN> {
private Counter numRecordsIn;
@SuppressWarnings("unchecked")
- public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
+ public StreamInputProcessor(
+ InputGate[] inputGates,
+ TypeSerializer<IN> inputSerializer,
StatefulTask checkpointedTask,
CheckpointingMode checkpointMode,
IOManager ioManager,
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index 4c55055..a8125c3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.tasks;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
index 311e0cd..cda0511 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
@@ -15,22 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.annotation.Internal;
/**
- * {@code RuntimeException} for wrapping exceptions that are thrown in Threads that are not the
- * main compute Thread.
+ * An exception for wrapping exceptions that are thrown by an operator in threads other than the
+ * main compute thread of that operator.
*/
@Internal
-public class AsynchronousException extends RuntimeException {
+public class AsynchronousException extends Exception {
private static final long serialVersionUID = 1L;
public AsynchronousException(Throwable cause) {
super(cause);
}
+ public AsynchronousException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
@Override
public String toString() {
return "AsynchronousException{" + getCause() + "}";
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index 9534b3c..5664eac 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,15 +17,15 @@
package org.apache.flink.streaming.runtime.tasks;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.util.Preconditions;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link TimeServiceProvider} which assigns as current processing time the result of calling
* {@link System#currentTimeMillis()} and registers timers using a {@link ScheduledThreadPoolExecutor}.
@@ -35,24 +35,34 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
/** The containing task that owns this time service provider. */
private final AsyncExceptionHandler task;
+ /** The lock that timers acquire upon triggering */
private final Object checkpointLock;
/** The executor service that schedules and calls the triggers of this task*/
- private final ScheduledExecutorService timerService;
+ private final ScheduledThreadPoolExecutor timerService;
+
- public static DefaultTimeServiceProvider create(
- AsyncExceptionHandler exceptionHandler,
- ScheduledExecutorService executor,
- Object checkpointLock) {
- return new DefaultTimeServiceProvider(exceptionHandler, executor, checkpointLock);
+ public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, Object checkpointLock) {
+ this(failureHandler, checkpointLock, null);
}
- private DefaultTimeServiceProvider(AsyncExceptionHandler task,
- ScheduledExecutorService threadPoolExecutor,
- Object checkpointLock) {
- this.task = Preconditions.checkNotNull(task);
- this.timerService = Preconditions.checkNotNull(threadPoolExecutor);
- this.checkpointLock = Preconditions.checkNotNull(checkpointLock);
+ public DefaultTimeServiceProvider(
+ AsyncExceptionHandler task,
+ Object checkpointLock,
+ ThreadFactory threadFactory) {
+
+ this.task = checkNotNull(task);
+ this.checkpointLock = checkNotNull(checkpointLock);
+
+ if (threadFactory == null) {
+ this.timerService = new ScheduledThreadPoolExecutor(1);
+ } else {
+ this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
+ }
+
+ // allow trigger tasks to be removed if all timers for
+ // that timestamp are removed by user
+ this.timerService.setRemoveOnCancelPolicy(true);
}
@Override
@@ -76,6 +86,13 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
timerService.shutdownNow();
}
+ // safety net to destroy the thread pool
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ timerService.shutdownNow();
+ }
+
/**
* Internal task that is invoked by the timer service and triggers the target.
*/
@@ -105,14 +122,4 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
}
}
}
-
- @VisibleForTesting
- public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
- return new DefaultTimeServiceProvider(new AsyncExceptionHandler() {
- @Override
- public void handleAsyncException(String message, Throwable exception) {
- exception.printStackTrace();
- }
- }, executor, checkpointLock);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index cf8853e..0a6534b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
final Object lock = getCheckpointLock();
while (running && inputProcessor.processInput(operator, lock)) {
-
+ // all the work happens in the "processInput" method
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 33317fa..040ec66 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -67,7 +67,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
/**
* Base class for all streaming tasks. A task is the unit of local processing that is deployed
@@ -223,15 +223,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// if the clock is not already set, then assign a default TimeServiceProvider
if (timerService == null) {
+ ThreadFactory timerThreadFactory =
+ new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
- new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
-
- // allow trigger tasks to be removed if all timers for
- // that timestamp are removed by user
- executor.setRemoveOnCancelPolicy(true);
-
- timerService = DefaultTimeServiceProvider.create(this, executor, getCheckpointLock());
+ timerService = new DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory);
}
operatorChain = new OperatorChain<>(this, getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -305,10 +300,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
// stop all timers and threads
if (timerService != null) {
try {
- if (!timerService.isTerminated()) {
- LOG.info("Timer service is shutting down.");
- timerService.shutdownService();
- }
+ timerService.shutdownService();
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index a21a2e1..81faec9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -86,7 +86,7 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
return isTerminated;
}
- public int getNoOfRegisteredTimers() {
+ public int getNumRegisteredTimers() {
int count = 0;
for (List<Triggerable> tasks: registeredTasks.values()) {
count += tasks.size();
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 0197c53..fb08959 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
final Object lock = getCheckpointLock();
while (running && inputProcessor.processInput(operator, lock)) {
-
+ // all the work happens in the "processInput" method
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 0351978..8d3e621 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.streaming.runtime.operators;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -28,6 +29,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -37,13 +39,14 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
+@PrepareForTest({ResultPartitionWriter.class})
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
public class TimeProviderTest {
@@ -52,8 +55,10 @@ public class TimeProviderTest {
final OneShotLatch latch = new OneShotLatch();
final Object lock = new Object();
- TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
- .createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
final List<Long> timestamps = new ArrayList<>();
@@ -114,6 +119,8 @@ public class TimeProviderTest {
lastTs = timestamp;
counter++;
}
+
+ assertNull(error.get());
}
@Test
@@ -124,14 +131,14 @@ public class TimeProviderTest {
final Object lock = new Object();
- TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
- .create(new AsyncExceptionHandler() {
+ TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
+ new AsyncExceptionHandler() {
@Override
public void handleAsyncException(String message, Throwable exception) {
exceptionWasThrown.compareAndSet(false, true);
latch.trigger();
}
- }, Executors.newSingleThreadScheduledExecutor(), lock);
+ }, lock);
long now = System.currentTimeMillis();
timeServiceProvider.registerTimer(now, new Triggerable() {
@@ -182,7 +189,7 @@ public class TimeProviderTest {
}
});
- Assert.assertEquals(provider.getNoOfRegisteredTimers(), 4);
+ Assert.assertEquals(provider.getNumRegisteredTimers(), 4);
provider.setCurrentTime(100);
long seen = 0;
@@ -233,14 +240,30 @@ public class TimeProviderTest {
}
});
- assertEquals(2, tp.getNoOfRegisteredTimers());
+ assertEquals(2, tp.getNumRegisteredTimers());
tp.setCurrentTime(35);
- assertEquals(1, tp.getNoOfRegisteredTimers());
+ assertEquals(1, tp.getNumRegisteredTimers());
tp.setCurrentTime(40);
- assertEquals(0, tp.getNoOfRegisteredTimers());
+ assertEquals(0, tp.getNumRegisteredTimers());
tp.shutdownService();
}
+
+ // ------------------------------------------------------------------------
+
+ public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
+
+ private final AtomicReference<Throwable> errorReference;
+
+ public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
+ this.errorReference = errorReference;
+ }
+
+ @Override
+ public void handleAsyncException(String message, Throwable exception) {
+ errorReference.compareAndSet(null, exception);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 30f38e3..4c6d391 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -58,7 +59,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -182,14 +183,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowTriggerTimeAlignment() throws Exception {
- final Object lock = new Object();
- TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
-
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
- StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final TimeServiceProvider timerService = new NoOpTimerService();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
AccumulatingProcessingTimeWindowOperator<String, String, String> op;
@@ -201,11 +199,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
- timerService.shutdownService();
- timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
- mockTask = createMockTaskWithTimer(timerService, lock);
-
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -214,11 +207,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
- timerService.shutdownService();
- timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
- mockTask = createMockTaskWithTimer(timerService, lock);
-
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -227,11 +215,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
- timerService.shutdownService();
- timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
- mockTask = createMockTaskWithTimer(timerService, lock);
-
op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -244,16 +227,15 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
e.printStackTrace();
fail(e.getMessage());
}
- finally {
- timerService.shutdownService();
- }
}
@Test
public void testTumblingWindow() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final int windowSize = 50;
@@ -285,6 +267,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
List<Integer> result = out.getElements();
@@ -294,6 +278,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
for (int i = 0; i < numElements; i++) {
assertEquals(i, result.get(i).intValue());
}
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -307,8 +295,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testSlidingWindow() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
@@ -335,6 +325,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
// get and verify the result
@@ -361,6 +353,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
lastCount = 1;
}
}
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
} finally {
timerService.shutdownService();
}
@@ -369,8 +365,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testTumblingWindowSingleElements() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
@@ -412,7 +410,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -426,8 +430,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testSlidingWindowSingleElements() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
@@ -460,7 +466,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -798,4 +810,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
return result;
}
+
+ private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+ timers.shutdownService();
+
+ while (!timers.isTerminated()) {
+ Thread.sleep(2);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7539c2d..88e28bc 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,14 +40,15 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
import org.junit.After;
import org.junit.Test;
@@ -60,7 +61,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -190,15 +191,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testWindowTriggerTimeAlignment() throws Exception {
- final Object lock = new Object();
- TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
-
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
- StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
+ final TimeServiceProvider timerService = new NoOpTimerService();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
+
AggregatingProcessingTimeWindowOperator<String, String> op;
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
@@ -209,11 +207,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
- timerService.shutdownService();
- timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
- mockTask = createMockTaskWithTimer(timerService, lock);
-
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -222,11 +215,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
- timerService.shutdownService();
- timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
- mockTask = createMockTaskWithTimer(timerService, lock);
-
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -235,11 +223,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
- timerService.shutdownService();
- timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
- mockTask = createMockTaskWithTimer(timerService, lock);
-
op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
@@ -251,16 +234,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
- } finally {
- timerService.shutdownService();
}
}
@Test
public void testTumblingWindowUniqueElements() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final int windowSize = 50;
@@ -297,6 +280,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
@@ -305,6 +290,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertEquals(i, result.get(i).f0.intValue());
assertEquals(i, result.get(i).f1.intValue());
}
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -318,8 +307,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testTumblingWindowDuplicateElements() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final int windowSize = 50;
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
@@ -364,6 +355,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
// we have ideally one element per window. we may have more, when we emitted a value into the
@@ -373,6 +366,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// deduplicate for more accurate checks
HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result);
assertTrue(set.size() == 10);
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -386,8 +383,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testSlidingWindow() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
@@ -418,6 +417,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
// get and verify the result
@@ -445,6 +446,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
lastCount = 1;
}
}
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -458,8 +463,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testSlidingWindowSingleElements() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
@@ -504,7 +511,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
synchronized (lock) {
op.close();
}
+
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -518,8 +531,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
@Test
public void testPropagateExceptionsFromProcessElement() throws Exception {
final Object lock = new Object();
- final TimeServiceProvider timerService = DefaultTimeServiceProvider.createForTesting(
- Executors.newSingleThreadScheduledExecutor(), lock);
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
@@ -556,7 +571,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
assertTrue(e.getMessage().contains("Artificial Test Exception"));
}
+ shutdownTimerServiceAndWait(timerService);
op.dispose();
+
+ if (error.get() != null) {
+ throw new Exception(error.get());
+ }
}
catch (Exception e) {
e.printStackTrace();
@@ -971,8 +991,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
}
private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) {
- StreamConfig cfg = new StreamConfig(new Configuration());
- return cfg;
+ return new StreamConfig(new Configuration());
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -985,4 +1004,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
}
return result;
}
+
+ private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+ timers.shutdownService();
+
+ while (!timers.isTerminated()) {
+ Thread.sleep(2);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
new file mode 100644
index 0000000..16e658e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
+import java.util.concurrent.ScheduledFuture;
+
+class NoOpTimerService extends TimeServiceProvider {
+
+ private volatile boolean terminated;
+
+ @Override
+ public long getCurrentProcessingTime() {
+ return System.currentTimeMillis();
+ }
+
+ @Override
+ public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+ return null;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return terminated;
+ }
+
+ @Override
+ public void shutdownService() throws Exception {
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index d8a0ee2..9d8e6a5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -42,12 +42,12 @@ import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -145,7 +145,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
@Override
public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
- final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0];
+ final StreamOperator<?> operator = (StreamOperator<?>) invocationOnMock.getArguments()[0];
return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
}
}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
@@ -154,7 +154,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
timeServiceProvider = testTimeProvider != null ? testTimeProvider :
- DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(), this.checkpointLock);
+ new DefaultTimeServiceProvider(mockTask, this.checkpointLock);
doAnswer(new Answer<TimeServiceProvider>() {
@Override