You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/06/20 04:31:25 UTC

[flink-ml] branch master updated: [FLINK-27096] Flush buffer at epoch watermark

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 454f7d1  [FLINK-27096] Flush buffer at epoch watermark
454f7d1 is described below

commit 454f7d13a1a490fb7f0a559bb2a13ae4b0788c29
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Mon Jun 20 12:31:20 2022 +0800

    [FLINK-27096] Flush buffer at epoch watermark
    
    This closes #112.
---
 .../iteration/broadcast/RecordWriterBroadcastOutput.java     | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
index fc99865..c11e6c8 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
@@ -18,6 +18,7 @@
 package org.apache.flink.iteration.broadcast;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.iteration.IterationRecord;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,5 +43,16 @@ public class RecordWriterBroadcastOutput<OUT> implements BroadcastOutput<OUT> {
     public void broadcastEmit(StreamRecord<OUT> record) throws IOException {
         serializationDelegate.setInstance(record);
         recordWriter.broadcastEmit(serializationDelegate);
+        if (isIterationEpochWatermark(record)) {
+            recordWriter.flushAll();
+        }
+    }
+
+    private static <T> boolean isIterationEpochWatermark(StreamRecord<T> record) {
+        if (!(record.getValue() instanceof IterationRecord)) {
+            return false;
+        }
+        IterationRecord<?> iterationRecord = (IterationRecord<?>) record.getValue();
+        return iterationRecord.getType().equals(IterationRecord.Type.EPOCH_WATERMARK);
     }
 }