You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/08 02:13:53 UTC
[flink] branch release-1.11 updated: [FLINK-18110][fs-connector]
StreamingFileSink notifies for buckets detected to be inactive on restoring
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 26d7a12 [FLINK-18110][fs-connector] StreamingFileSink notifies for buckets detected to be inactive on restoring
26d7a12 is described below
commit 26d7a124a52758305478dc71af5f8746c36a34cb
Author: Yun Gao <ga...@gmail.com>
AuthorDate: Mon Jun 8 10:09:50 2020 +0800
[FLINK-18110][fs-connector] StreamingFileSink notifies for buckets detected to be inactive on restoring
This closes #12496
---
.../api/functions/sink/filesystem/Buckets.java | 23 +++++++----
.../api/functions/sink/filesystem/BucketsTest.java | 48 +++++++++++++++++++++-
2 files changed, 62 insertions(+), 9 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 39acc29..f936348 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -187,6 +187,7 @@ public class Buckets<IN, BucketID> {
private void updateActiveBucketId(final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
if (!restoredBucket.isActive()) {
+ notifyBucketInactive(restoredBucket);
return;
}
@@ -212,10 +213,7 @@ public class Buckets<IN, BucketID> {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();
-
- if (bucketLifeCycleListener != null) {
- bucketLifeCycleListener.bucketInactive(bucket);
- }
+ notifyBucketInactive(bucket);
}
}
}
@@ -304,10 +302,7 @@ public class Buckets<IN, BucketID> {
rollingPolicy,
outputFileConfig);
activeBuckets.put(bucketId, bucket);
-
- if (bucketLifeCycleListener != null) {
- bucketLifeCycleListener.bucketCreated(bucket);
- }
+ notifyBucketCreate(bucket);
}
return bucket;
}
@@ -332,6 +327,18 @@ public class Buckets<IN, BucketID> {
return new Path(basePath, child);
}
+ private void notifyBucketCreate(Bucket<IN, BucketID> bucket) {
+ if (bucketLifeCycleListener != null) {
+ bucketLifeCycleListener.bucketCreated(bucket);
+ }
+ }
+
+ private void notifyBucketInactive(Bucket<IN, BucketID> bucket) {
+ if (bucketLifeCycleListener != null) {
+ bucketLifeCycleListener.bucketInactive(bucket);
+ }
+ }
+
/**
* The {@link BucketAssigner.Context} exposed to the
* {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 9707ec7..f982ced 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -40,6 +40,8 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -409,6 +411,48 @@ public class BucketsTest {
Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
}
+ @Test
+ public void testBucketLifeCycleListenerOnRestoring() throws Exception {
+ File outDir = TEMP_FOLDER.newFolder();
+ Path path = new Path(outDir.toURI());
+ OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
+ new OnProcessingTimePolicy<>(2L);
+ RecordBucketLifeCycleListener bucketLifeCycleListener = new RecordBucketLifeCycleListener();
+ Buckets<String, String> buckets = createBuckets(
+ path,
+ rollOnProcessingTimeCountingPolicy,
+ bucketLifeCycleListener,
+ 0,
+ OutputFileConfig.builder().build());
+ ListState<byte[]> bucketStateContainer = new MockListState<>();
+ ListState<Long> partCounterContainer = new MockListState<>();
+
+ buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
+ buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L));
+
+ // Will close the part file writer of the bucket "test1". Now bucket "test1" have only
+ // one pending file while bucket "test2" has an on-writing in-progress file.
+ buckets.onProcessingTime(4);
+ buckets.snapshotState(0, bucketStateContainer, partCounterContainer);
+
+ // On restoring the bucket "test1" will commit its pending file and become inactive.
+ buckets = restoreBuckets(
+ path,
+ rollOnProcessingTimeCountingPolicy,
+ bucketLifeCycleListener,
+ 0,
+ bucketStateContainer,
+ partCounterContainer,
+ OutputFileConfig.builder().build());
+
+ Assert.assertEquals(new HashSet<>(Collections.singletonList("test2")), buckets.getActiveBuckets().keySet());
+ List<Tuple2<RecordBucketLifeCycleListener.EventType, String>> expectedEvents = Arrays.asList(
+ new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test1"),
+ new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test2"),
+ new Tuple2<>(RecordBucketLifeCycleListener.EventType.INACTIVE, "test1"));
+ Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
+ }
+
private static class RecordBucketLifeCycleListener implements BucketLifeCycleListener<String, String> {
public enum EventType {
CREATED,
@@ -477,6 +521,7 @@ public class BucketsTest {
return restoreBuckets(
basePath,
rollingPolicy,
+ null,
subtaskIdx,
bucketState,
partCounterState,
@@ -486,6 +531,7 @@ public class BucketsTest {
private static Buckets<String, String> restoreBuckets(
final Path basePath,
final RollingPolicy<String, String> rollingPolicy,
+ final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
final int subtaskIdx,
final ListState<byte[]> bucketState,
final ListState<Long> partCounterState,
@@ -493,7 +539,7 @@ public class BucketsTest {
final Buckets<String, String> restoredBuckets = createBuckets(
basePath,
rollingPolicy,
- null,
+ bucketLifeCycleListener,
subtaskIdx,
outputFileConfig);
restoredBuckets.initializeState(bucketState, partCounterState);