You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:19 UTC
[04/11] flink git commit: [FLINK-4877] Rename TimeServiceProvider to
ProcessingTimeService
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
deleted file mode 100644
index a8f2dc4..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-public class TestTimeProviderTest {
-
- @Test
- public void testCustomTimeServiceProvider() throws Throwable {
- TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
- mapTask.setTimeService(tp);
-
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
- mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
-
- StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
- streamConfig.setStreamOperator(mapOperator);
-
- testHarness.invoke();
-
- assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
-
- tp.setCurrentTime(11);
- assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
-
- tp.setCurrentTime(15);
- tp.setCurrentTime(16);
- assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
-
- // register 2 tasks
- mapTask.getTimerService().registerTimer(30, new Triggerable() {
- @Override
- public void trigger(long timestamp) {
-
- }
- });
-
- mapTask.getTimerService().registerTimer(40, new Triggerable() {
- @Override
- public void trigger(long timestamp) {
-
- }
- });
-
- assertEquals(2, tp.getNumRegisteredTimers());
-
- tp.setCurrentTime(35);
- assertEquals(1, tp.getNumRegisteredTimers());
-
- tp.setCurrentTime(40);
- assertEquals(0, tp.getNumRegisteredTimers());
-
- tp.shutdownService();
- }
-
- // ------------------------------------------------------------------------
-
- public static class ReferenceSettingExceptionHandler implements AsyncExceptionHandler {
-
- private final AtomicReference<Throwable> errorReference;
-
- public ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
- this.errorReference = errorReference;
- }
-
- @Override
- public void handleAsyncException(String message, Throwable exception) {
- errorReference.compareAndSet(null, exception);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index 4d5c881..af99d0d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Test;
@@ -41,9 +42,13 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
final ExecutionConfig config = new ExecutionConfig();
config.setAutoWatermarkInterval(50);
+
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
OneInputStreamOperatorTestHarness<Long, Long> testHarness =
- new OneInputStreamOperatorTestHarness<Long, Long>(operator, config);
+ new OneInputStreamOperatorTestHarness<Long, Long>(operator, config, processingTimeService);
+
+ long currentTime = 0;
testHarness.open();
@@ -71,7 +76,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
// check the invariant
assertTrue(lastWatermark < nextElementValue);
} else {
- Thread.sleep(10);
+ currentTime = currentTime + 10;
+ processingTimeService.setCurrentTime(currentTime);
}
}
@@ -102,7 +108,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
// check the invariant
assertTrue(lastWatermark < nextElementValue);
} else {
- Thread.sleep(10);
+ currentTime = currentTime + 10;
+ processingTimeService.setCurrentTime(currentTime);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index e96109e..128c88b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,12 +39,12 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
@@ -186,8 +186,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final TimeServiceProvider timerService = new NoOpTimerService();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
+ final ProcessingTimeService timerService = new NoOpTimerService();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
AccumulatingProcessingTimeWindowOperator<String, String, String> op;
@@ -234,13 +234,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final int windowSize = 50;
final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -299,12 +299,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -371,12 +371,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -438,12 +438,12 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -503,7 +503,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSize);
- TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+ TestProcessingTimeService timerService = new TestProcessingTimeService();
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
@@ -542,7 +542,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSize);
- timerService = new TestTimeServiceProvider();
+ timerService = new TestProcessingTimeService();
testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
testHarness.setup();
@@ -583,7 +583,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
final int windowSlide = 50;
final int windowSize = factor * windowSlide;
- TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+ TestProcessingTimeService timerService = new TestProcessingTimeService();
// sliding window (200 msecs) every 50 msecs
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
@@ -631,7 +631,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSlide);
- timerService = new TestTimeServiceProvider();
+ timerService = new TestProcessingTimeService();
testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
testHarness.setup();
@@ -684,7 +684,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
new StatefulFunction(), identitySelector,
IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
- TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+ TestProcessingTimeService timerService = new TestProcessingTimeService();
OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
@@ -777,7 +777,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static StreamTask<?, ?> createMockTask(Object lock) {
+ private static StreamTask<?, ?> createMockTask() {
Configuration configuration = new Configuration();
configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
@@ -785,7 +785,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
when(task.getName()).thenReturn("Test task name");
when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(task.getCheckpointLock()).thenReturn(lock);
final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -801,10 +800,10 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
}
private static StreamTask<?, ?> createMockTaskWithTimer(
- final TimeServiceProvider timerService, final Object lock)
+ final ProcessingTimeService timerService)
{
- StreamTask<?, ?> mockTask = createMockTask(lock);
- when(mockTask.getTimerService()).thenReturn(timerService);
+ StreamTask<?, ?> mockTask = createMockTask();
+ when(mockTask.getProcessingTimeService()).thenReturn(timerService);
return mockTask;
}
@@ -819,7 +818,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
return result;
}
- private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+ private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception {
timers.shutdownService();
while (!timers.isTerminated()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 802329b..bb64a08 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,12 +40,13 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -194,8 +195,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
@SuppressWarnings("unchecked")
final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final TimeServiceProvider timerService = new NoOpTimerService();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, new Object());
+ final ProcessingTimeService timerService = new NoOpTimerService();
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
AggregatingProcessingTimeWindowOperator<String, String> op;
@@ -242,7 +243,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
@@ -255,7 +256,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE, 10), out);
op.open();
@@ -311,13 +312,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final int windowSize = 50;
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
new AggregatingProcessingTimeWindowOperator<>(
@@ -389,13 +390,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -471,12 +472,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
// tumbling window that triggers every 20 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -541,12 +542,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final Object lock = new Object();
final AtomicReference<Throwable> error = new AtomicReference<>();
- final TimeServiceProvider timerService = new DefaultTimeServiceProvider(
+ final ProcessingTimeService timerService = new SystemProcessingTimeService(
new ReferenceSettingExceptionHandler(error), lock);
try {
final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+ final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService);
ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
@@ -605,7 +606,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final int windowSize = 200;
- TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+ TestProcessingTimeService timerService = new TestProcessingTimeService();
// tumbling window that triggers every 50 milliseconds
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -655,7 +656,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
- timerService = new TestTimeServiceProvider();
+ timerService = new TestProcessingTimeService();
testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
testHarness.setup();
@@ -698,7 +699,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final int windowSlide = 50;
final int windowSize = factor * windowSlide;
- TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+ TestProcessingTimeService timerService = new TestProcessingTimeService();
// sliding window (200 msecs) every 50 msecs
AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
@@ -748,7 +749,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSlide);
- timerService = new TestTimeServiceProvider();
+ timerService = new TestProcessingTimeService();
testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
testHarness.setup();
@@ -796,7 +797,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
try {
final long twoSeconds = 2000;
- TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+ TestProcessingTimeService timerService = new TestProcessingTimeService();
StatefulFunction.globalCounts.clear();
@@ -850,7 +851,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
final int windowSlide = 50;
final int windowSize = factor * windowSlide;
- TestTimeServiceProvider timerService = new TestTimeServiceProvider();
+ TestProcessingTimeService timerService = new TestProcessingTimeService();
StatefulFunction.globalCounts.clear();
@@ -977,7 +978,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
// ------------------------------------------------------------------------
- private static StreamTask<?, ?> createMockTask(Object lock) {
+ private static StreamTask<?, ?> createMockTask() {
Configuration configuration = new Configuration();
configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
@@ -985,7 +986,6 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
when(task.getName()).thenReturn("Test task name");
when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(task.getCheckpointLock()).thenReturn(lock);
final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = mock(TaskManagerRuntimeInfo.class);
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -996,10 +996,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
return task;
}
- private static StreamTask<?, ?> createMockTaskWithTimer(final TimeServiceProvider timerService, final Object lock)
+ private static StreamTask<?, ?> createMockTaskWithTimer(final ProcessingTimeService timerService)
{
- StreamTask<?, ?> mockTask = createMockTask(lock);
- when(mockTask.getTimerService()).thenReturn(timerService);
+ StreamTask<?, ?> mockTask = createMockTask();
+ when(mockTask.getProcessingTimeService()).thenReturn(timerService);
return mockTask;
}
@@ -1018,7 +1018,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
return result;
}
- private static void shutdownTimerServiceAndWait(TimeServiceProvider timers) throws Exception {
+ private static void shutdownTimerServiceAndWait(ProcessingTimeService timers) throws Exception {
timers.shutdownService();
while (!timers.isTerminated()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
index d0c5050..a7a71cf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -19,11 +19,11 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import java.util.concurrent.ScheduledFuture;
-class NoOpTimerService extends TimeServiceProvider {
+class NoOpTimerService extends ProcessingTimeService {
private volatile boolean terminated;
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2b0b915..38f0778 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -63,7 +63,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -850,8 +850,6 @@ public class WindowOperatorTest extends TestLogger {
TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
- TestTimeServiceProvider timer = new TestTimeServiceProvider();
-
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents",
new SumReducer(),
inputType.createSerializer(new ExecutionConfig()));
@@ -869,7 +867,7 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
- operator, new ExecutionConfig(), timer,
+ operator, new ExecutionConfig(),
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
@@ -898,7 +896,7 @@ public class WindowOperatorTest extends TestLogger {
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness =
new KeyedOneInputStreamOperatorTestHarness<>(
- otherOperator, new ExecutionConfig(), timer,
+ otherOperator, new ExecutionConfig(),
new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
otherTestHarness.setup();
@@ -928,7 +926,7 @@ public class WindowOperatorTest extends TestLogger {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
- TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+ TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -987,7 +985,7 @@ public class WindowOperatorTest extends TestLogger {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
- TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+ TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1059,7 +1057,7 @@ public class WindowOperatorTest extends TestLogger {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(), 0);
- TestTimeServiceProvider testTimeProvider = new TestTimeServiceProvider();
+ TestProcessingTimeService testTimeProvider = new TestProcessingTimeService();
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
deleted file mode 100644
index 29e13ed..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import org.junit.Test;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class DefaultTimeServiceProviderTest {
-
- @Test
- public void testTriggerHoldsLock() throws Exception {
-
- final Object lock = new Object();
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
- new ReferenceSettingExceptionHandler(errorRef), lock);
-
- try {
- assertEquals(0, timer.getNumTasksScheduled());
-
- // schedule something
- ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
- @Override
- public void trigger(long timestamp) {
- assertTrue(Thread.holdsLock(lock));
- }
- });
-
- // wait until the execution is over
- future.get();
- assertEquals(0, timer.getNumTasksScheduled());
-
- // check that no asynchronous error was reported
- if (errorRef.get() != null) {
- throw new Exception(errorRef.get());
- }
- }
- finally {
- timer.shutdownService();
- }
- }
-
- @Test
- public void testImmediateShutdown() throws Exception {
-
- final Object lock = new Object();
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
- new ReferenceSettingExceptionHandler(errorRef), lock);
-
- try {
- assertFalse(timer.isTerminated());
-
- final OneShotLatch latch = new OneShotLatch();
-
- // the task should trigger immediately and should block until terminated with interruption
- timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
- @Override
- public void trigger(long timestamp) throws Exception {
- latch.trigger();
- Thread.sleep(100000000);
- }
- });
-
- latch.await();
- timer.shutdownService();
-
- // can only enter this scope after the triggerable is interrupted
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (lock) {
- assertTrue(timer.isTerminated());
- }
-
- try {
- timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() {
- @Override
- public void trigger(long timestamp) {}
- });
-
- fail("should result in an exception");
- }
- catch (IllegalStateException e) {
- // expected
- }
-
- // obviously, we have an asynchronous interrupted exception
- assertNotNull(errorRef.get());
- assertTrue(errorRef.get().getCause() instanceof InterruptedException);
-
- assertEquals(0, timer.getNumTasksScheduled());
- }
- finally {
- timer.shutdownService();
- }
- }
-
- @Test
- public void testQuiescing() throws Exception {
-
- final Object lock = new Object();
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
- new ReferenceSettingExceptionHandler(errorRef), lock);
-
- try {
- final OneShotLatch latch = new OneShotLatch();
-
- final ReentrantLock scopeLock = new ReentrantLock();
-
- timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() {
- @Override
- public void trigger(long timestamp) throws Exception {
- scopeLock.lock();
- try {
- latch.trigger();
- // delay a bit before leaving the method
- Thread.sleep(5);
- } finally {
- scopeLock.unlock();
- }
- }
- });
-
- // after the task triggered, shut the timer down cleanly, waiting for the task to finish
- latch.await();
- timer.quiesceAndAwaitPending();
-
- // should be able to immediately acquire the lock, since the task must have exited by now
- assertTrue(scopeLock.tryLock());
-
- // should be able to schedule more tasks (that never get executed)
- ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
- @Override
- public void trigger(long timestamp) throws Exception {
- throw new Exception("test");
- }
- });
- assertNotNull(future);
-
- // nothing should be scheduled right now
- assertEquals(0, timer.getNumTasksScheduled());
-
- // check that no asynchronous error was reported - that ensures that the newly scheduled
- // triggerable did, in fact, not trigger
- if (errorRef.get() != null) {
- throw new Exception(errorRef.get());
- }
- }
- finally {
- timer.shutdownService();
- }
- }
-
- @Test
- public void testFutureCancellation() throws Exception {
-
- final Object lock = new Object();
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
- new ReferenceSettingExceptionHandler(errorRef), lock);
-
- try {
- assertEquals(0, timer.getNumTasksScheduled());
-
- // schedule something
- ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
- @Override
- public void trigger(long timestamp) {}
- });
- assertEquals(1, timer.getNumTasksScheduled());
-
- future.cancel(false);
-
- assertEquals(0, timer.getNumTasksScheduled());
-
- // check that no asynchronous error was reported
- if (errorRef.get() != null) {
- throw new Exception(errorRef.get());
- }
- }
- finally {
- timer.shutdownService();
- }
- }
-
- @Test
- public void testExceptionReporting() throws InterruptedException {
- final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
- final OneShotLatch latch = new OneShotLatch();
- final Object lock = new Object();
-
- TimeServiceProvider timeServiceProvider = new DefaultTimeServiceProvider(
- new AsyncExceptionHandler() {
- @Override
- public void handleAsyncException(String message, Throwable exception) {
- exceptionWasThrown.set(true);
- latch.trigger();
- }
- }, lock);
-
- timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() {
- @Override
- public void trigger(long timestamp) throws Exception {
- throw new Exception("Exception in Timer");
- }
- });
-
- latch.await();
- assertTrue(exceptionWasThrown.get());
- }
-
- @Test
- public void testTimerSorting() throws Exception {
- final Object lock = new Object();
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final DefaultTimeServiceProvider timer = new DefaultTimeServiceProvider(
- new ReferenceSettingExceptionHandler(errorRef), lock);
-
- try {
- final OneShotLatch sync = new OneShotLatch();
-
- // we block the timer execution to make sure we have all the time
- // to register some additional timers out of order
- timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
- @Override
- public void trigger(long timestamp) throws Exception {
- sync.await();
- }
- });
-
- // schedule two timers out of order something
- final long now = System.currentTimeMillis();
- final long time1 = now + 6;
- final long time2 = now + 5;
- final long time3 = now + 8;
- final long time4 = now - 2;
-
- final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
- Triggerable trigger = new Triggerable() {
- @Override
- public void trigger(long timestamp) {
- timestamps.add(timestamp);
- }
- };
-
- // schedule
- ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger);
- ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger);
- ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger);
- ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger);
-
- // now that everything is scheduled, unblock the timer service
- sync.trigger();
-
- // wait until both are complete
- future1.get();
- future2.get();
- future3.get();
- future4.get();
-
- // verify that the order is 4 - 2 - 1 - 3
- assertEquals(4, timestamps.size());
- assertEquals(time4, timestamps.take().longValue());
- assertEquals(time2, timestamps.take().longValue());
- assertEquals(time1, timestamps.take().longValue());
- assertEquals(time3, timestamps.take().longValue());
-
- // check that no asynchronous error was reported
- if (errorRef.get() != null) {
- throw new Exception(errorRef.get());
- }
- }
- finally {
- timer.shutdownService();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index ce62624..ab7bf69 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -113,11 +113,11 @@ public class StreamTaskTestHarness<OUT> {
outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer);
}
- public TimeServiceProvider getTimerService() {
+ public ProcessingTimeService getProcessingTimeService() {
if (!(task instanceof StreamTask)) {
- throw new UnsupportedOperationException("getTimerService() only supported on StreamTasks.");
+ throw new UnsupportedOperationException("getProcessingTimeService() only supported on StreamTasks.");
}
- return ((StreamTask) task).getTimerService();
+ return ((StreamTask) task).getProcessingTimeService();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
new file mode 100644
index 0000000..e7944df
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import org.junit.Test;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SystemProcessingTimeServiceTest {
+
+ @Test
+ public void testTriggerHoldsLock() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ assertEquals(0, timer.getNumTasksScheduled());
+
+ // schedule something
+ ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+ assertTrue(Thread.holdsLock(lock));
+ }
+ });
+
+ // wait until the execution is over
+ future.get();
+ assertEquals(0, timer.getNumTasksScheduled());
+
+ // check that no asynchronous error was reported
+ if (errorRef.get() != null) {
+ throw new Exception(errorRef.get());
+ }
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+
+ @Test
+ public void testImmediateShutdown() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ assertFalse(timer.isTerminated());
+
+ final OneShotLatch latch = new OneShotLatch();
+
+ // the task should trigger immediately and should block until terminated with interruption
+ timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ latch.trigger();
+ Thread.sleep(100000000);
+ }
+ });
+
+ latch.await();
+ timer.shutdownService();
+
+ // can only enter this scope after the triggerable is interrupted
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (lock) {
+ assertTrue(timer.isTerminated());
+ }
+
+ try {
+ timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {}
+ });
+
+ fail("should result in an exception");
+ }
+ catch (IllegalStateException e) {
+ // expected
+ }
+
+ // obviously, we have an asynchronous interrupted exception
+ assertNotNull(errorRef.get());
+ assertTrue(errorRef.get().getCause() instanceof InterruptedException);
+
+ assertEquals(0, timer.getNumTasksScheduled());
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+
+ @Test
+ public void testQuiescing() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ final OneShotLatch latch = new OneShotLatch();
+
+ final ReentrantLock scopeLock = new ReentrantLock();
+
+ timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ scopeLock.lock();
+ try {
+ latch.trigger();
+ // delay a bit before leaving the method
+ Thread.sleep(5);
+ } finally {
+ scopeLock.unlock();
+ }
+ }
+ });
+
+ // after the task triggered, shut the timer down cleanly, waiting for the task to finish
+ latch.await();
+ timer.quiesceAndAwaitPending();
+
+ // should be able to immediately acquire the lock, since the task must have exited by now
+ assertTrue(scopeLock.tryLock());
+
+ // should be able to schedule more tasks (that never get executed)
+ ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ throw new Exception("test");
+ }
+ });
+ assertNotNull(future);
+
+ // nothing should be scheduled right now
+ assertEquals(0, timer.getNumTasksScheduled());
+
+ // check that no asynchronous error was reported - that ensures that the newly scheduled
+ // triggerable did, in fact, not trigger
+ if (errorRef.get() != null) {
+ throw new Exception(errorRef.get());
+ }
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+
+ @Test
+ public void testFutureCancellation() throws Exception {
+
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ assertEquals(0, timer.getNumTasksScheduled());
+
+ // schedule something
+ ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {}
+ });
+ assertEquals(1, timer.getNumTasksScheduled());
+
+ future.cancel(false);
+
+ assertEquals(0, timer.getNumTasksScheduled());
+
+ // check that no asynchronous error was reported
+ if (errorRef.get() != null) {
+ throw new Exception(errorRef.get());
+ }
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+
+ @Test
+ public void testExceptionReporting() throws InterruptedException {
+ final AtomicBoolean exceptionWasThrown = new AtomicBoolean(false);
+ final OneShotLatch latch = new OneShotLatch();
+ final Object lock = new Object();
+
+ ProcessingTimeService timeServiceProvider = new SystemProcessingTimeService(
+ new AsyncExceptionHandler() {
+ @Override
+ public void handleAsyncException(String message, Throwable exception) {
+ exceptionWasThrown.set(true);
+ latch.trigger();
+ }
+ }, lock);
+
+ timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ throw new Exception("Exception in Timer");
+ }
+ });
+
+ latch.await();
+ assertTrue(exceptionWasThrown.get());
+ }
+
+ @Test
+ public void testTimerSorting() throws Exception {
+ final Object lock = new Object();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ final SystemProcessingTimeService timer = new SystemProcessingTimeService(
+ new ReferenceSettingExceptionHandler(errorRef), lock);
+
+ try {
+ final OneShotLatch sync = new OneShotLatch();
+
+ // we block the timer execution to make sure we have all the time
+ // to register some additional timers out of order
+ timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ sync.await();
+ }
+ });
+
+ // schedule two timers out of order something
+ final long now = System.currentTimeMillis();
+ final long time1 = now + 6;
+ final long time2 = now + 5;
+ final long time3 = now + 8;
+ final long time4 = now - 2;
+
+ final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
+ Triggerable trigger = new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+ timestamps.add(timestamp);
+ }
+ };
+
+ // schedule
+ ScheduledFuture<?> future1 = timer.registerTimer(time1, trigger);
+ ScheduledFuture<?> future2 = timer.registerTimer(time2, trigger);
+ ScheduledFuture<?> future3 = timer.registerTimer(time3, trigger);
+ ScheduledFuture<?> future4 = timer.registerTimer(time4, trigger);
+
+ // now that everything is scheduled, unblock the timer service
+ sync.trigger();
+
+ // wait until both are complete
+ future1.get();
+ future2.get();
+ future3.get();
+ future4.get();
+
+ // verify that the order is 4 - 2 - 1 - 3
+ assertEquals(4, timestamps.size());
+ assertEquals(time4, timestamps.take().longValue());
+ assertEquals(time2, timestamps.take().longValue());
+ assertEquals(time1, timestamps.take().longValue());
+ assertEquals(time3, timestamps.take().longValue());
+
+ // check that no asynchronous error was reported
+ if (errorRef.get() != null) {
+ throw new Exception(errorRef.get());
+ }
+ }
+ finally {
+ timer.shutdownService();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 41968e6..6ad684b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -50,7 +50,7 @@ import static org.mockito.Mockito.doAnswer;
/**
* Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get
- * a {@link AbstractKeyedStateBackend}.
+ * a {@link KeyedStateBackend}.
*
*/
public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@@ -94,7 +94,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig,
- TestTimeServiceProvider testTimeProvider,
+ ProcessingTimeService testTimeProvider,
KeySelector<IN, K> keySelector,
TypeInformation<K> keyType) {
super(operator, executionConfig, testTimeProvider);
@@ -187,7 +187,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
}
/**
- *
+ *
*/
@Override
public void restore(StreamStateHandle snapshot) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 2dd2163..c2763d8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -19,24 +19,14 @@ package org.apache.flink.streaming.util;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class MockContext<IN, OUT> {
@@ -95,17 +85,4 @@ public class MockContext<IN, OUT> {
return result;
}
-
- private static StreamTask<?, ?> createMockTaskWithTimer(
- final TimeServiceProvider timerService, final Object lock)
- {
- StreamTask<?, ?> task = mock(StreamTask.class);
- when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
- when(task.getName()).thenReturn("Test task name");
- when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024));
- when(task.getCheckpointLock()).thenReturn(lock);
- when(task.getTimerService()).thenReturn(timerService);
- return task;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 9f8d223..4104049 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -43,11 +43,11 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.Preconditions;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -79,9 +79,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
final ExecutionConfig executionConfig;
- final Object checkpointLock;
-
- final TimeServiceProvider timeServiceProvider;
+ final ProcessingTimeService processingTimeService;
StreamTask<?, ?> mockTask;
@@ -105,36 +103,36 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig) {
- this(operator, executionConfig, null);
+ this(operator, executionConfig, new TestProcessingTimeService());
}
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig,
- TestTimeServiceProvider testTimeProvider) {
- this(operator, executionConfig, new Object(), testTimeProvider);
+ ProcessingTimeService processingTimeService) {
+ this(operator, executionConfig, new Object(), processingTimeService);
}
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig,
Object checkpointLock,
- TimeServiceProvider testTimeProvider) {
+ ProcessingTimeService processingTimeService) {
+ this.processingTimeService = Preconditions.checkNotNull(processingTimeService);
this.operator = operator;
- this.outputList = new ConcurrentLinkedQueue<Object>();
+ this.outputList = new ConcurrentLinkedQueue<>();
Configuration underlyingConfig = new Configuration();
this.config = new StreamConfig(underlyingConfig);
this.config.setCheckpointingEnabled(true);
this.executionConfig = executionConfig;
- this.checkpointLock = checkpointLock;
this.closableRegistry = new ClosableRegistry();
final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
+ when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
when(mockTask.getConfiguration()).thenReturn(config);
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
when(mockTask.getEnvironment()).thenReturn(env);
@@ -183,15 +181,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
throw new RuntimeException(e.getMessage(), e);
}
- timeServiceProvider = testTimeProvider != null ? testTimeProvider :
- new DefaultTimeServiceProvider(mockTask, this.checkpointLock);
-
- doAnswer(new Answer<TimeServiceProvider>() {
+ doAnswer(new Answer<ProcessingTimeService>() {
@Override
- public TimeServiceProvider answer(InvocationOnMock invocation) throws Throwable {
- return timeServiceProvider;
+ public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable {
+ return OneInputStreamOperatorTestHarness.this.processingTimeService;
}
- }).when(mockTask).getTimerService();
+ }).when(mockTask).getProcessingTimeService();
}
public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
@@ -219,9 +214,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
}
/**
- * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
- * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
- * to extract only the StreamRecords.
+ * Get all the output from the task. This contains StreamRecords and Events interleaved.
*/
public ConcurrentLinkedQueue<Object> getOutput() {
return outputList;
@@ -316,8 +309,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
public void close() throws Exception {
operator.close();
operator.dispose();
- if (timeServiceProvider != null) {
- timeServiceProvider.shutdownService();
+ if (processingTimeService != null) {
+ processingTimeService.shutdownService();
}
setupCalled = false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index a4e26f0..ed9a7cd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
@@ -50,7 +50,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
public class WindowingTestHarness<K, IN, W extends Window> {
- private final TestTimeServiceProvider timeServiceProvider;
+ private final TestProcessingTimeService timeServiceProvider;
private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
@@ -80,7 +80,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
trigger,
allowedLateness);
- timeServiceProvider = new TestTimeServiceProvider();
+ timeServiceProvider = new TestProcessingTimeService();
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, timeServiceProvider, keySelector, keyType);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index 707ce0f..e7f62fd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -192,7 +192,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
}
if (first) {
- getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+ getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
first = false;
}
numElements++;
@@ -209,7 +209,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
try {
numTimers++;
throwIfDone();
- getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
+ getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
} finally {
semaphore.release();
}
@@ -251,7 +251,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
}
if (first) {
- getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+ getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
first = false;
}
numElements++;
@@ -266,7 +266,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
}
if (first) {
- getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+ getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, this);
first = false;
}
numElements++;
@@ -284,7 +284,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
try {
numTimers++;
throwIfDone();
- getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
+ getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
} finally {
semaphore.release();
}