You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:43:43 UTC

[07/17] flink git commit: [hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges

[hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c34ff1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c34ff1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c34ff1f

Branch: refs/heads/master
Commit: 5c34ff1f2b6f8b7c35885646be5b60701cc10348
Parents: f41b00e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Apr 30 18:04:43 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 15:50:54 2018 +0800

----------------------------------------------------------------------
 .../tests/SequenceGeneratorSource.java          | 28 ++++++++++++++++++++
 1 file changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c34ff1f/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
index e641551..40c0db5 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
@@ -91,7 +91,14 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i
 
 	@Override
 	public void run(SourceContext<Event> ctx) throws Exception {
+		if (keyRanges.size() > 0) {
+			runActive(ctx);
+		} else {
+			runIdle(ctx);
+		}
+	}
 
+	private void runActive(SourceContext<Event> ctx) throws Exception {
 		Random random = new Random();
 
 		// this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
@@ -133,6 +140,27 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i
 		}
 	}
 
+	private void runIdle(SourceContext<Event> ctx) throws Exception {
+		ctx.markAsTemporarilyIdle();
+
+		// just wait until this source is canceled
+		final Object waitLock = new Object();
+		while (running) {
+			try {
+				//noinspection SynchronizationOnLocalVariableOrMethodParameter
+				synchronized (waitLock) {
+					waitLock.wait();
+				}
+			}
+			catch (InterruptedException e) {
+				if (!running) {
+					// restore the interrupted state, and fall through the loop
+					Thread.currentThread().interrupt();
+				}
+			}
+		}
+	}
+
 	private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
 		return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
 	}