You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 02:28:44 UTC
[flink] branch release-1.11 updated: [FLINK-17878][fs-connector]
Transient watermark attribute should be initial at runtime in streaming
file operators
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new ff3ea32 [FLINK-17878][fs-connector] Transient watermark attribute should be initial at runtime in streaming file operators
ff3ea32 is described below
commit ff3ea323c8d98e075a742afad6b08ec18a830046
Author: Xiaogang Zhou <zh...@163.com>
AuthorDate: Mon May 25 10:28:14 2020 +0800
[FLINK-17878][fs-connector] Transient watermark attribute should be initial at runtime in streaming file operators
This closes #12299
---
.../apache/flink/table/filesystem/stream/StreamingFileCommitter.java | 3 ++-
.../org/apache/flink/table/filesystem/stream/StreamingFileWriter.java | 3 ++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
index 449f572..2ae1829 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
@@ -78,7 +78,7 @@ public class StreamingFileCommitter extends AbstractStreamOperator<Void>
private transient TaskTracker taskTracker;
- private transient long currentWatermark = Long.MIN_VALUE;
+ private transient long currentWatermark;
private transient List<PartitionCommitPolicy> policies;
@@ -98,6 +98,7 @@ public class StreamingFileCommitter extends AbstractStreamOperator<Void>
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
+ currentWatermark = Long.MIN_VALUE;
FileSystem fileSystem = locationPath.getFileSystem();
this.trigger = PartitionCommitTrigger.create(
context.isRestored(),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index ed55d3d..e02ba3a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -61,7 +61,7 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
private transient StreamingFileSinkHelper<RowData> helper;
- private transient long currentWatermark = Long.MIN_VALUE;
+ private transient long currentWatermark;
private transient Set<String> inactivePartitions;
@@ -87,6 +87,7 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
bucketCheckInterval);
inactivePartitions = new HashSet<>();
+ currentWatermark = Long.MIN_VALUE;
listener.setInactiveConsumer(b -> inactivePartitions.add(b));
}