You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/12/10 15:02:36 UTC
[flink] branch master updated: [FLINK-9552][iterations] fix not
syncing on checkpoint lock before emitting records
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0382830 [FLINK-9552][iterations] fix not syncing on checkpoint lock before emitting records
0382830 is described below
commit 03828308e75a19a777b7c999bd5bc9b09388daa2
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Oct 30 13:05:55 2018 +0100
[FLINK-9552][iterations] fix not syncing on checkpoint lock before emitting records
We need to make sure that concurrent access to the RecordWriter is protected by
a lock. It seems that everything but the StreamIterationHead was synchronizing
on the checkpoint lock and hence we sync here as well.
---
.../flink/streaming/runtime/tasks/StreamIterationHead.java | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index c54235c..ecef7f0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -76,8 +76,10 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
// If timestamps are enabled we make sure to remove cyclic watermark dependencies
if (isSerializingTimestamps()) {
- for (RecordWriterOutput<OUT> output : outputs) {
- output.emitWatermark(new Watermark(Long.MAX_VALUE));
+ synchronized (getCheckpointLock()) {
+ for (RecordWriterOutput<OUT> output : outputs) {
+ output.emitWatermark(new Watermark(Long.MAX_VALUE));
+ }
}
}
@@ -87,8 +89,10 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
dataChannel.take();
if (nextRecord != null) {
- for (RecordWriterOutput<OUT> output : outputs) {
- output.collect(nextRecord);
+ synchronized (getCheckpointLock()) {
+ for (RecordWriterOutput<OUT> output : outputs) {
+ output.collect(nextRecord);
+ }
}
}
else {