You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/27 08:28:00 UTC

[flink] 01/02: [FLINK-17934][fs-connector] StreamingFileWriter should set chainingStrategy

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cdfb0304c9e982795fa4c839559ca0283db9b424
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed May 27 12:32:27 2020 +0800

    [FLINK-17934][fs-connector] StreamingFileWriter should set chainingStrategy
---
 .../org/apache/flink/table/filesystem/stream/StreamingFileWriter.java   | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index e02ba3a..842f833 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -73,6 +74,7 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 		this.bucketCheckInterval = bucketCheckInterval;
 		this.bucketsBuilder = bucketsBuilder;
 		this.listener = listener;
+		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
 	@Override