You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/08 02:13:53 UTC

[flink] branch release-1.11 updated: [FLINK-18110][fs-connector] StreamingFileSink notifies for buckets detected to be inactive on restoring

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 26d7a12  [FLINK-18110][fs-connector] StreamingFileSink notifies for buckets detected to be inactive on restoring
26d7a12 is described below

commit 26d7a124a52758305478dc71af5f8746c36a34cb
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Mon Jun 8 10:09:50 2020 +0800

    [FLINK-18110][fs-connector] StreamingFileSink notifies for buckets detected to be inactive on restoring
    
    
    This closes #12496
---
 .../api/functions/sink/filesystem/Buckets.java     | 23 +++++++----
 .../api/functions/sink/filesystem/BucketsTest.java | 48 +++++++++++++++++++++-
 2 files changed, 62 insertions(+), 9 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 39acc29..f936348 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -187,6 +187,7 @@ public class Buckets<IN, BucketID> {
 
 	private void updateActiveBucketId(final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
 		if (!restoredBucket.isActive()) {
+			notifyBucketInactive(restoredBucket);
 			return;
 		}
 
@@ -212,10 +213,7 @@ public class Buckets<IN, BucketID> {
 				// We've dealt with all the pending files and the writer for this bucket is not currently open.
 				// Therefore this bucket is currently inactive and we can remove it from our state.
 				activeBucketIt.remove();
-
-				if (bucketLifeCycleListener != null) {
-					bucketLifeCycleListener.bucketInactive(bucket);
-				}
+				notifyBucketInactive(bucket);
 			}
 		}
 	}
@@ -304,10 +302,7 @@ public class Buckets<IN, BucketID> {
 					rollingPolicy,
 					outputFileConfig);
 			activeBuckets.put(bucketId, bucket);
-
-			if (bucketLifeCycleListener != null) {
-				bucketLifeCycleListener.bucketCreated(bucket);
-			}
+			notifyBucketCreate(bucket);
 		}
 		return bucket;
 	}
@@ -332,6 +327,18 @@ public class Buckets<IN, BucketID> {
 		return new Path(basePath, child);
 	}
 
+	private void notifyBucketCreate(Bucket<IN, BucketID> bucket) {
+		if (bucketLifeCycleListener != null) {
+			bucketLifeCycleListener.bucketCreated(bucket);
+		}
+	}
+
+	private void notifyBucketInactive(Bucket<IN, BucketID> bucket) {
+		if (bucketLifeCycleListener != null) {
+			bucketLifeCycleListener.bucketInactive(bucket);
+		}
+	}
+
 	/**
 	 * The {@link BucketAssigner.Context} exposed to the
 	 * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 9707ec7..f982ced 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -40,6 +40,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -409,6 +411,48 @@ public class BucketsTest {
 		Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
 	}
 
+	@Test
+	public void testBucketLifeCycleListenerOnRestoring() throws Exception {
+		File outDir = TEMP_FOLDER.newFolder();
+		Path path = new Path(outDir.toURI());
+		OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
+			new OnProcessingTimePolicy<>(2L);
+		RecordBucketLifeCycleListener bucketLifeCycleListener = new RecordBucketLifeCycleListener();
+		Buckets<String, String> buckets = createBuckets(
+			path,
+			rollOnProcessingTimeCountingPolicy,
+			bucketLifeCycleListener,
+			0,
+			OutputFileConfig.builder().build());
+		ListState<byte[]> bucketStateContainer = new MockListState<>();
+		ListState<Long> partCounterContainer = new MockListState<>();
+
+		buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
+		buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L));
+
+		// Will close the part file writer of the bucket "test1". Now bucket "test1" have only
+		// one pending file while bucket "test2" has an on-writing in-progress file.
+		buckets.onProcessingTime(4);
+		buckets.snapshotState(0, bucketStateContainer, partCounterContainer);
+
+		// On restoring the bucket "test1" will commit its pending file and become inactive.
+		buckets = restoreBuckets(
+			path,
+			rollOnProcessingTimeCountingPolicy,
+			bucketLifeCycleListener,
+			0,
+			bucketStateContainer,
+			partCounterContainer,
+			OutputFileConfig.builder().build());
+
+		Assert.assertEquals(new HashSet<>(Collections.singletonList("test2")), buckets.getActiveBuckets().keySet());
+		List<Tuple2<RecordBucketLifeCycleListener.EventType, String>> expectedEvents = Arrays.asList(
+			new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test1"),
+			new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test2"),
+			new Tuple2<>(RecordBucketLifeCycleListener.EventType.INACTIVE, "test1"));
+		Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
+	}
+
 	private static class RecordBucketLifeCycleListener implements BucketLifeCycleListener<String, String> {
 		public enum EventType {
 			CREATED,
@@ -477,6 +521,7 @@ public class BucketsTest {
 		return restoreBuckets(
 				basePath,
 				rollingPolicy,
+				null,
 				subtaskIdx,
 				bucketState,
 				partCounterState,
@@ -486,6 +531,7 @@ public class BucketsTest {
 	private static Buckets<String, String> restoreBuckets(
 			final Path basePath,
 			final RollingPolicy<String, String> rollingPolicy,
+			final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
 			final int subtaskIdx,
 			final ListState<byte[]> bucketState,
 			final ListState<Long> partCounterState,
@@ -493,7 +539,7 @@ public class BucketsTest {
 		final Buckets<String, String> restoredBuckets = createBuckets(
 			basePath,
 			rollingPolicy,
-			null,
+			bucketLifeCycleListener,
 			subtaskIdx,
 			outputFileConfig);
 		restoredBuckets.initializeState(bucketState, partCounterState);