You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/06/27 15:50:35 UTC

[2/4] flink git commit: [FLINK-3647] Change StreamSource to use Processing-Time Clock Service

[FLINK-3647] Change StreamSource to use Processing-Time Clock Service


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/819fb270
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/819fb270
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/819fb270

Branch: refs/heads/master
Commit: 819fb2706028d9528bb1e424fa81ef85fdc26f48
Parents: 4b5a789
Author: kl0u <kk...@gmail.com>
Authored: Thu Jun 9 21:23:38 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jun 27 17:16:12 2016 +0200

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   4 +-
 .../streaming/api/operators/StreamSource.java   |  82 ++++++++------
 .../operators/StreamSourceOperatorTest.java     | 108 +++++++++++++++++--
 3 files changed, 149 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/819fb270/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 05fc158..7755347 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -250,8 +250,8 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 
 	/**
-	 * Register a timer callback. At the specified time the {@link Triggerable} will be invoked.
-	 * This call is guaranteed to not happen concurrently with method calls on the operator.
+	 * Register a timer callback. At the specified time the provided {@link Triggerable} will
+	 * be invoked. This call is guaranteed to not happen concurrently with method calls on the operator.
 	 *
 	 * @param time The absolute time in milliseconds.
 	 * @param target The target to be triggered.

http://git-wip-us.apache.org/repos/asf/flink/blob/819fb270/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 68c623e..38c948b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,12 +21,10 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 /**
  * {@link StreamOperator} for streaming sources.
@@ -195,7 +193,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
 		
-		private final ScheduledExecutorService scheduleExecutor;
 		private final ScheduledFuture<?> watermarkTimer;
 		private final long watermarkInterval;
 
@@ -216,29 +213,10 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 			this.output = outputParam;
 			this.watermarkInterval = watermarkInterval;
 			this.reuse = new StreamRecord<T>(null);
-			
-			this.scheduleExecutor = Executors.newScheduledThreadPool(1);
-
-			this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
-				@Override
-				public void run() {
-					final long currentTime = System.currentTimeMillis();
-					
-					if (currentTime > nextWatermarkTime) {
-						// align the watermarks across all machines. this will ensure that we
-						// don't have watermarks that creep along at different intervals because
-						// the machine clocks are out of sync
-						final long watermarkTime = currentTime - (currentTime % watermarkInterval);
-						
-						synchronized (lockingObjectParam) {
-							if (currentTime > nextWatermarkTime) {
-								outputParam.emitWatermark(new Watermark(watermarkTime));
-								nextWatermarkTime += watermarkInterval;
-							}
-						}
-					}
-				}
-			}, 0, watermarkInterval, TimeUnit.MILLISECONDS);
+
+			long now = owner.getCurrentProcessingTime();
+			this.watermarkTimer = owner.registerTimer(now + watermarkInterval,
+				new WatermarkEmittingTask(owner, lockingObjectParam, outputParam));
 		}
 
 		@Override
@@ -246,14 +224,21 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 			owner.checkAsyncException();
 			
 			synchronized (lockingObject) {
-				final long currentTime = System.currentTimeMillis();
+				final long currentTime = owner.getCurrentProcessingTime();
 				output.collect(reuse.replace(element, currentTime));
-				
+
+				// this is to avoid lock contention in the lockingObject by
+				// sending the watermark before the firing of the watermark
+				// emission task.
+
 				if (currentTime > nextWatermarkTime) {
 					// in case we jumped some watermarks, recompute the next watermark time
 					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
 					nextWatermarkTime = watermarkTime + watermarkInterval;
 					output.emitWatermark(new Watermark(watermarkTime));
+
+					// we do not need to register another timer here
+					// because the emitting task will do so.
 				}
 			}
 		}
@@ -276,7 +261,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 
 				// we can shutdown the timer now, no watermarks will be needed any more
 				watermarkTimer.cancel(true);
-				scheduleExecutor.shutdownNow();
 			}
 		}
 
@@ -288,13 +272,47 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 		@Override
 		public void close() {
 			watermarkTimer.cancel(true);
-			scheduleExecutor.shutdownNow();
+		}
+
+		private class WatermarkEmittingTask implements Triggerable {
+
+			private final StreamSource<?, ?> owner;
+			private final Object lockingObject;
+			private final Output<StreamRecord<T>> output;
+
+			private WatermarkEmittingTask(StreamSource<?, ?> src, Object lock, Output<StreamRecord<T>> output) {
+				this.owner = src;
+				this.lockingObject = lock;
+				this.output = output;
+			}
+
+			@Override
+			public void trigger(long timestamp) {
+				final long currentTime = owner.getCurrentProcessingTime();
+
+				if (currentTime > nextWatermarkTime) {
+					// align the watermarks across all machines. this will ensure that we
+					// don't have watermarks that creep along at different intervals because
+					// the machine clocks are out of sync
+					final long watermarkTime = currentTime - (currentTime % watermarkInterval);
+
+					synchronized (lockingObject) {
+						if (currentTime > nextWatermarkTime) {
+							output.emitWatermark(new Watermark(watermarkTime));
+							nextWatermarkTime += watermarkInterval;
+						}
+					}
+				}
+
+				owner.registerTimer(owner.getCurrentProcessingTime() + watermarkInterval,
+					new WatermarkEmittingTask(owner, lockingObject, output));
+			}
 		}
 	}
 
 	/**
 	 * A SourceContext for event time. Sources may directly attach timestamps and generate
-	 * watermarks, but if records are emitted without timestamps, no timetamps are automatically
+	 * watermarks, but if records are emitted without timestamps, no timestamps are automatically
 	 * generated and attached. The records will simply have no timestamp in that case.
 	 * 
 	 * Streaming topologies can use timestamp assigner functions to override the timestamps

http://git-wip-us.apache.org/repos/asf/flink/blob/819fb270/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index b368019..0e18166 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -35,14 +35,22 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ScheduledFuture;
 
 import static org.junit.Assert.*;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -58,7 +66,7 @@ public class StreamSourceOperatorTest {
 		
 		final List<StreamElement> output = new ArrayList<>();
 		
-		setupSourceOperator(operator);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
 		operator.run(new Object(), new CollectorOutput<String>(output));
 		
 		assertEquals(1, output.size());
@@ -75,7 +83,7 @@ public class StreamSourceOperatorTest {
 				new StreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
 		operator.cancel();
 
 		// run and exit
@@ -95,7 +103,7 @@ public class StreamSourceOperatorTest {
 				new StreamSource<>(new InfiniteSource<String>());
 
 		
-		setupSourceOperator(operator);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
 		
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -128,7 +136,7 @@ public class StreamSourceOperatorTest {
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
 		operator.stop();
 
 		// run and stop
@@ -147,7 +155,7 @@ public class StreamSourceOperatorTest {
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -166,18 +174,61 @@ public class StreamSourceOperatorTest {
 		assertTrue(output.isEmpty());
 	}
 	
-	
+	@Test
+	public void testAutomaticWatermarkContext() throws Exception {
+
+		// regular stream source operator
+		final StoppableStreamSource<String, InfiniteSource<String>> operator =
+			new StoppableStreamSource<>(new InfiniteSource<String>());
+
+		long watermarkInterval = 10;
+		TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
+		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider);
+
+		final List<StreamElement> output = new ArrayList<>();
+
+		StreamSource.AutomaticWatermarkContext<String> ctx =
+			new StreamSource.AutomaticWatermarkContext<>(
+				operator,
+				operator.getContainingTask().getCheckpointLock(),
+				new CollectorOutput<String>(output),
+				operator.getExecutionConfig().getAutoWatermarkInterval());
+
+		// periodically emit the watermarks
+		// even though we start from 1 the watermark are still
+		// going to be aligned with the watermark interval.
+
+		for (long i = 1; i < 100; i += watermarkInterval)  {
+			timeProvider.setCurrentTime(i);
+		}
+
+		assertTrue(output.size() == 9);
+
+		long nextWatermark = 0;
+		for (StreamElement el : output) {
+			nextWatermark += watermarkInterval;
+			Watermark wm = (Watermark) el;
+			assertTrue(wm.getTimestamp() == nextWatermark);
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	
 	@SuppressWarnings("unchecked")
-	private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
+	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
+												TimeCharacteristic timeChar,
+												long watermarkInterval,
+												final TimeServiceProvider timeProvider) {
+
 		ExecutionConfig executionConfig = new ExecutionConfig();
+		executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		
-		cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+		cfg.setTimeCharacteristic(timeChar);
 
 		Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
-		
+
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);
 		when(mockTask.getName()).thenReturn("Mock Task");
 		when(mockTask.getCheckpointLock()).thenReturn(new Object());
@@ -186,9 +237,44 @@ public class StreamSourceOperatorTest {
 		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
 		when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
 
-		operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
+		doAnswer(new Answer<ScheduledFuture>() {
+			@Override
+			public ScheduledFuture answer(InvocationOnMock invocation) throws Throwable {
+				final long execTime = (Long) invocation.getArguments()[0];
+				final Triggerable target = (Triggerable) invocation.getArguments()[1];
+
+				if (timeProvider == null) {
+					throw new RuntimeException("The time provider is null");
+				}
+
+				timeProvider.registerTimer(execTime, new Runnable() {
+
+					@Override
+					public void run() {
+						try {
+							target.trigger(execTime);
+						} catch (Exception e) {
+							e.printStackTrace();
+						}
+					}
+				});
+				return null;
+			}
+		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+		doAnswer(new Answer<Long>() {
+			@Override
+			public Long answer(InvocationOnMock invocation) throws Throwable {
+				if (timeProvider == null) {
+					throw new RuntimeException("The time provider is null");
+				}
+				return timeProvider.getCurrentProcessingTime();
+			}
+		}).when(mockTask).getCurrentProcessingTime();
+
+		operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
 	}
-	
+
 	// ------------------------------------------------------------------------
 	
 	private static final class FiniteSource<T> implements SourceFunction<T>, StoppableFunction {