You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/12/11 10:59:48 UTC

[flink] branch master updated: [FLINK-11041][test] ReinterpretDataStreamAsKeyedStreamITCase source should hold checkpointing lock

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 70b2029  [FLINK-11041][test] ReinterpretDataStreamAsKeyedStreamITCase source should hold checkpointing lock
70b2029 is described below

commit 70b2029f8a3d4ca2d3cb7bd7fddac9bb5b3e8f07
Author: Stefan Richter <s....@data-artisans.com>
AuthorDate: Mon Dec 10 16:52:02 2018 +0100

    [FLINK-11041][test] ReinterpretDataStreamAsKeyedStreamITCase source should hold checkpointing lock
---
 .../api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java      | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
index 6a1b9ee..ff28dff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java
@@ -122,7 +122,9 @@ public class ReinterpretDataStreamAsKeyedStreamITCase {
 		public void run(SourceContext<Tuple2<Integer, Integer>> out) throws Exception {
 			Random random = new Random(42);
 			while (--remainingEvents >= 0) {
-				out.collect(new Tuple2<>(random.nextInt(numKeys), 1));
+				synchronized (out.getCheckpointLock()) {
+					out.collect(new Tuple2<>(random.nextInt(numKeys), 1));
+				}
 			}
 		}