You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/27 08:28:00 UTC
[flink] 01/02: [FLINK-17934][fs-connector] StreamingFileWriter
should set chainingStrategy
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cdfb0304c9e982795fa4c839559ca0283db9b424
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Wed May 27 12:32:27 2020 +0800
[FLINK-17934][fs-connector] StreamingFileWriter should set chainingStrategy
---
.../org/apache/flink/table/filesystem/stream/StreamingFileWriter.java | 2 ++
1 file changed, 2 insertions(+)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index e02ba3a..842f833 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -73,6 +74,7 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
this.bucketCheckInterval = bucketCheckInterval;
this.bucketsBuilder = bucketsBuilder;
this.listener = listener;
+ setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override