You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/05/22 08:43:43 UTC
[07/17] flink git commit: [hotfix] [e2e-tests] Make
SequenceGeneratorSource usable for 0-size key ranges
[hotfix] [e2e-tests] Make SequenceGeneratorSource usable for 0-size key ranges
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c34ff1f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c34ff1f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c34ff1f
Branch: refs/heads/master
Commit: 5c34ff1f2b6f8b7c35885646be5b60701cc10348
Parents: f41b00e
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Mon Apr 30 18:04:43 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue May 22 15:50:54 2018 +0800
----------------------------------------------------------------------
.../tests/SequenceGeneratorSource.java | 28 ++++++++++++++++++++
1 file changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5c34ff1f/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
index e641551..40c0db5 100644
--- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
+++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
@@ -91,7 +91,14 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i
@Override
public void run(SourceContext<Event> ctx) throws Exception {
+ if (keyRanges.size() > 0) {
+ runActive(ctx);
+ } else {
+ runIdle(ctx);
+ }
+ }
+ private void runActive(SourceContext<Event> ctx) throws Exception {
Random random = new Random();
// this holds the current event time, from which generated events can up to +/- (maxOutOfOrder).
@@ -133,6 +140,27 @@ public class SequenceGeneratorSource extends RichParallelSourceFunction<Event> i
}
}
+ private void runIdle(SourceContext<Event> ctx) throws Exception {
+ ctx.markAsTemporarilyIdle();
+
+ // just wait until this source is canceled
+ final Object waitLock = new Object();
+ while (running) {
+ try {
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (waitLock) {
+ waitLock.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ if (!running) {
+ // restore the interrupted state, and fall through the loop
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
private long generateEventTimeWithOutOfOrderness(Random random, long correctTime) {
return correctTime - maxOutOfOrder + ((random.nextLong() & Long.MAX_VALUE) % (2 * maxOutOfOrder));
}