You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/25 02:28:44 UTC

[flink] branch release-1.11 updated: [FLINK-17878][fs-connector] Transient watermark attribute should be initial at runtime in streaming file operators

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new ff3ea32  [FLINK-17878][fs-connector] Transient watermark attribute should be initial at runtime in streaming file operators
ff3ea32 is described below

commit ff3ea323c8d98e075a742afad6b08ec18a830046
Author: Xiaogang Zhou <zh...@163.com>
AuthorDate: Mon May 25 10:28:14 2020 +0800

    [FLINK-17878][fs-connector] Transient watermark attribute should be initial at runtime in streaming file operators
    
    
    This closes #12299
---
 .../apache/flink/table/filesystem/stream/StreamingFileCommitter.java   | 3 ++-
 .../org/apache/flink/table/filesystem/stream/StreamingFileWriter.java  | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
index 449f572..2ae1829 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileCommitter.java
@@ -78,7 +78,7 @@ public class StreamingFileCommitter extends AbstractStreamOperator<Void>
 
 	private transient TaskTracker taskTracker;
 
-	private transient long currentWatermark = Long.MIN_VALUE;
+	private transient long currentWatermark;
 
 	private transient List<PartitionCommitPolicy> policies;
 
@@ -98,6 +98,7 @@ public class StreamingFileCommitter extends AbstractStreamOperator<Void>
 	@Override
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
+		currentWatermark = Long.MIN_VALUE;
 		FileSystem fileSystem = locationPath.getFileSystem();
 		this.trigger = PartitionCommitTrigger.create(
 				context.isRestored(),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index ed55d3d..e02ba3a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -61,7 +61,7 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 
 	private transient StreamingFileSinkHelper<RowData> helper;
 
-	private transient long currentWatermark = Long.MIN_VALUE;
+	private transient long currentWatermark;
 
 	private transient Set<String> inactivePartitions;
 
@@ -87,6 +87,7 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 				bucketCheckInterval);
 
 		inactivePartitions = new HashSet<>();
+		currentWatermark = Long.MIN_VALUE;
 		listener.setInactiveConsumer(b -> inactivePartitions.add(b));
 	}