You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/27 15:50:35 UTC
[2/4] flink git commit: [FLINK-3647] Change StreamSource to use
Processing-Time Clock Service
[FLINK-3647] Change StreamSource to use Processing-Time Clock Service
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/819fb270
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/819fb270
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/819fb270
Branch: refs/heads/master
Commit: 819fb2706028d9528bb1e424fa81ef85fdc26f48
Parents: 4b5a789
Author: kl0u <kk...@gmail.com>
Authored: Thu Jun 9 21:23:38 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 27 17:16:12 2016 +0200
----------------------------------------------------------------------
.../api/operators/AbstractStreamOperator.java | 4 +-
.../streaming/api/operators/StreamSource.java | 82 ++++++++------
.../operators/StreamSourceOperatorTest.java | 108 +++++++++++++++++--
3 files changed, 149 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/819fb270/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 05fc158..7755347 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -250,8 +250,8 @@ public abstract class AbstractStreamOperator<OUT>
}
/**
- * Register a timer callback. At the specified time the {@link Triggerable} will be invoked.
- * This call is guaranteed to not happen concurrently with method calls on the operator.
+ * Register a timer callback. At the specified time the provided {@link Triggerable} will
+ * be invoked. This call is guaranteed to not happen concurrently with method calls on the operator.
*
* @param time The absolute time in milliseconds.
* @param target The target to be triggered.
http://git-wip-us.apache.org/repos/asf/flink/blob/819fb270/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 68c623e..38c948b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,12 +21,10 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
/**
* {@link StreamOperator} for streaming sources.
@@ -195,7 +193,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
private final Output<StreamRecord<T>> output;
private final StreamRecord<T> reuse;
- private final ScheduledExecutorService scheduleExecutor;
private final ScheduledFuture<?> watermarkTimer;
private final long watermarkInterval;
@@ -216,29 +213,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
this.output = outputParam;
this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord<T>(null);
-
- this.scheduleExecutor = Executors.newScheduledThreadPool(1);
-
- this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- final long currentTime = System.currentTimeMillis();
-
- if (currentTime > nextWatermarkTime) {
- // align the watermarks across all machines. this will ensure that we
- // don't have watermarks that creep along at different intervals because
- // the machine clocks are out of sync
- final long watermarkTime = currentTime - (currentTime % watermarkInterval);
-
- synchronized (lockingObjectParam) {
- if (currentTime > nextWatermarkTime) {
- outputParam.emitWatermark(new Watermark(watermarkTime));
- nextWatermarkTime += watermarkInterval;
- }
- }
- }
- }
- }, 0, watermarkInterval, TimeUnit.MILLISECONDS);
+
+ long now = owner.getCurrentProcessingTime();
+ this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
+ new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
}
@Override
@@ -246,14 +224,21 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
owner.checkAsyncException();
synchronized (lockingObject) {
- final long currentTime = System.currentTimeMillis();
+ final long currentTime = owner.getCurrentProcessingTime();
output.collect(reuse.replace(element, currentTime));
-
+
+ // this is to avoid lock contention in the lockingObject by
+ // sending the watermark before the firing of the watermark
+ // emission task.
+
if (currentTime > nextWatermarkTime) {
// in case we jumped some watermarks, recompute the next watermark time
final long watermarkTime = currentTime - (currentTime % watermarkInterval);
nextWatermarkTime = watermarkTime + watermarkInterval;
output.emitWatermark(new Watermark(watermarkTime));
+
+ // we do not need to register another timer here
+ // because the emitting task will do so.
}
}
}
@@ -276,7 +261,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
// we can shutdown the timer now, no watermarks will be needed any more
watermarkTimer.cancel(true);
- scheduleExecutor.shutdownNow();
}
}
@@ -288,13 +272,47 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
@Override
public void close() {
watermarkTimer.cancel(true);
- scheduleExecutor.shutdownNow();
+ }
+
+ private class WatermarkEmittingTask implements Triggerable {
+
+ private final StreamSource<?, ?> owner;
+ private final Object lockingObject;
+ private final Output<StreamRecord<T>> output;
+
+ private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) {
+ this.owner = src;
+ this.lockingObject = lock;
+ this.output = output;
+ }
+
+ @Override
+ public void trigger(long timestamp) {
+ final long currentTime = owner.getCurrentProcessingTime();
+
+ if (currentTime > nextWatermarkTime) {
+ // align the watermarks across all machines. this will ensure that we
+ // don't have watermarks that creep along at different intervals because
+ // the machine clocks are out of sync
+ final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+
+ synchronized (lockingObject) {
+ if (currentTime > nextWatermarkTime) {
+ output.emitWatermark(new Watermark(watermarkTime));
+ nextWatermarkTime += watermarkInterval;
+ }
+ }
+ }
+
+ owner.registerTimer(owner.getCurrentProcessingTime() + watermarkInterval,
+ new WatermarkEmittingTask(owner, lockingObject, output));
+ }
}
}
/**
* A SourceContext for event time. Sources may directly attach timestamps and generate
- * watermarks, but if records are emitted without timestamps, no timetamps are automatically
+ * watermarks, but if records are emitted without timestamps, no timestamps are automatically
* generated and attached. The records will simply have no timestamp in that case.
*
* Streaming topologies can use timestamp assigner functions to override the timestamps
http://git-wip-us.apache.org/repos/asf/flink/blob/819fb270/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index b368019..0e18166 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -35,14 +35,22 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.ScheduledFuture;
import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -58,7 +66,7 @@ public class StreamSourceOperatorTest {
final List<StreamElement> output = new ArrayList<>();
- setupSourceOperator(operator);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
operator.run(new Object(), new CollectorOutput<String>(output));
assertEquals(1, output.size());
@@ -75,7 +83,7 @@ public class StreamSourceOperatorTest {
new StreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
operator.cancel();
// run and exit
@@ -95,7 +103,7 @@ public class StreamSourceOperatorTest {
new StreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
// trigger an async cancel in a bit
new Thread("canceler") {
@@ -128,7 +136,7 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
operator.stop();
// run and stop
@@ -147,7 +155,7 @@ public class StreamSourceOperatorTest {
new StoppableStreamSource<>(new InfiniteSource<String>());
- setupSourceOperator(operator);
+ setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
// trigger an async cancel in a bit
new Thread("canceler") {
@@ -166,18 +174,61 @@ public class StreamSourceOperatorTest {
assertTrue(output.isEmpty());
}
-
+ @Test
+ public void testAutomaticWatermarkContext() throws Exception {
+
+ // regular stream source operator
+ final StoppableStreamSource<String, InfiniteSource<String>> operator =
+ new StoppableStreamSource<>(new InfiniteSource<String>());
+
+ long watermarkInterval = 10;
+ TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
+ setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider);
+
+ final List<StreamElement> output = new ArrayList<>();
+
+ StreamSource.AutomaticWatermarkContext<String> ctx =
+ new StreamSource.AutomaticWatermarkContext<>(
+ operator,
+ operator.getContainingTask().getCheckpointLock(),
+ new CollectorOutput<String>(output),
+ operator.getExecutionConfig().getAutoWatermarkInterval());
+
+ // periodically emit the watermarks
+ // even though we start from 1 the watermark are still
+ // going to be aligned with the watermark interval.
+
+ for (long i = 1; i < 100; i += watermarkInterval) {
+ timeProvider.setCurrentTime(i);
+ }
+
+ assertTrue(output.size() == 9);
+
+ long nextWatermark = 0;
+ for (StreamElement el : output) {
+ nextWatermark += watermarkInterval;
+ Watermark wm = (Watermark) el;
+ assertTrue(wm.getTimestamp() == nextWatermark);
+ }
+ }
+
// ------------------------------------------------------------------------
@SuppressWarnings("unchecked")
- private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
+ private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
+ TimeCharacteristic timeChar,
+ long watermarkInterval,
+ final TimeServiceProvider timeProvider) {
+
ExecutionConfig executionConfig = new ExecutionConfig();
+ executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
StreamConfig cfg = new StreamConfig(new Configuration());
- cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+ cfg.setTimeCharacteristic(timeChar);
Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
-
+
StreamTask<?, ?> mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task");
when(mockTask.getCheckpointLock()).thenReturn(new Object());
@@ -186,9 +237,44 @@ public class StreamSourceOperatorTest {
when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
- operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
+ doAnswer(new Answer<ScheduledFuture>() {
+ @Override
+ public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable {
+ final long execTime = (Long) invocation.getArguments()[0];
+ final Triggerable target = (Triggerable) invocation.getArguments()[1];
+
+ if (timeProvider == null) {
+ throw new RuntimeException("The time provider is null");
+ }
+
+ timeProvider.registerTimer(execTime, new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ target.trigger(execTime);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ return null;
+ }
+ }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+ doAnswer(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ if (timeProvider == null) {
+ throw new RuntimeException("The time provider is null");
+ }
+ return timeProvider.getCurrentProcessingTime();
+ }
+ }).when(mockTask).getCurrentProcessingTime();
+
+ operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
}
-
+
// ------------------------------------------------------------------------
private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction {