You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/12/10 15:02:36 UTC

[flink] branch master updated: [FLINK-9552][iterations] fix not syncing on checkpoint lock before emitting records

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0382830  [FLINK-9552][iterations] fix not syncing on checkpoint lock before emitting records
0382830 is described below

commit 03828308e75a19a777b7c999bd5bc9b09388daa2
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Oct 30 13:05:55 2018 +0100

    [FLINK-9552][iterations] fix not syncing on checkpoint lock before emitting records
    
    We need to make sure that concurrent access to the RecordWriter is protected by
    a lock. It seems that everything but the StreamIterationHead was synchronizing
    on the checkpoint lock and hence we sync here as well.
---
 .../flink/streaming/runtime/tasks/StreamIterationHead.java   | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index c54235c..ecef7f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -76,8 +76,10 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 
 			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
 			if (isSerializingTimestamps()) {
-				for (RecordWriterOutput<OUT> output : outputs) {
-					output.emitWatermark(new Watermark(Long.MAX_VALUE));
+				synchronized (getCheckpointLock()) {
+					for (RecordWriterOutput<OUT> output : outputs) {
+						output.emitWatermark(new Watermark(Long.MAX_VALUE));
+					}
 				}
 			}
 
@@ -87,8 +89,10 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 					dataChannel.take();
 
 				if (nextRecord != null) {
-					for (RecordWriterOutput<OUT> output : outputs) {
-						output.collect(nextRecord);
+					synchronized (getCheckpointLock()) {
+						for (RecordWriterOutput<OUT> output : outputs) {
+							output.collect(nextRecord);
+						}
 					}
 				}
 				else {