You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/06/20 04:31:25 UTC
[flink-ml] branch master updated: [FLINK-27096] Flush buffer at epoch watermark
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 454f7d1 [FLINK-27096] Flush buffer at epoch watermark
454f7d1 is described below
commit 454f7d13a1a490fb7f0a559bb2a13ae4b0788c29
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Mon Jun 20 12:31:20 2022 +0800
[FLINK-27096] Flush buffer at epoch watermark
This closes #112.
---
.../iteration/broadcast/RecordWriterBroadcastOutput.java | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
index fc99865..c11e6c8 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/RecordWriterBroadcastOutput.java
@@ -18,6 +18,7 @@
package org.apache.flink.iteration.broadcast;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,5 +43,16 @@ public class RecordWriterBroadcastOutput<OUT> implements BroadcastOutput<OUT> {
public void broadcastEmit(StreamRecord<OUT> record) throws IOException {
serializationDelegate.setInstance(record);
recordWriter.broadcastEmit(serializationDelegate);
+ if (isIterationEpochWatermark(record)) {
+ recordWriter.flushAll();
+ }
+ }
+
+ private static <T> boolean isIterationEpochWatermark(StreamRecord<T> record) {
+ if (!(record.getValue() instanceof IterationRecord)) {
+ return false;
+ }
+ IterationRecord<?> iterationRecord = (IterationRecord<?>) record.getValue();
+ return iterationRecord.getType().equals(IterationRecord.Type.EPOCH_WATERMARK);
}
}