You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/27 08:28:01 UTC

[flink] 02/02: [FLINK-17934][fs-connector] Add listener to Buckets and remove listener for BucketsBuilder

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5a9fe5d431cc7722a0af0ab2d6a95460e7e38e77
Author: Gao Yun <ga...@gmail.com>
AuthorDate: Wed May 27 10:46:46 2020 +0800

    [FLINK-17934][fs-connector] Add listener to Buckets and remove listener for BucketsBuilder
---
 .../flink/connectors/hive/HiveTableSink.java       |  7 +---
 .../HadoopPathBasedBulkFormatBuilder.java          | 13 ------
 .../sink/filesystem/BucketLifeCycleListener.java   |  4 +-
 .../api/functions/sink/filesystem/Buckets.java     | 12 +++---
 .../sink/filesystem/StreamingFileSink.java         | 18 ---------
 .../sink/filesystem/BucketAssignerITCases.java     |  1 -
 .../api/functions/sink/filesystem/BucketsTest.java | 17 ++++----
 .../sink/filesystem/RollingPolicyTest.java         |  1 -
 .../table/filesystem/FileSystemTableSink.java      | 12 ++----
 .../filesystem/stream/InactiveBucketListener.java  | 46 ----------------------
 .../filesystem/stream/StreamingFileWriter.java     | 28 ++++++++-----
 11 files changed, 39 insertions(+), 120 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index aa83d7a..35e355c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -46,7 +46,6 @@ import org.apache.flink.table.filesystem.FileSystemOutputFormat;
 import org.apache.flink.table.filesystem.FileSystemTableSink;
 import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
 import org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy;
-import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
@@ -185,16 +184,14 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 						true,
 						conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
 						conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis());
-				InactiveBucketListener listener = new InactiveBucketListener();
 
 				Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
-				BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> builder;
+				BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
 				if (userMrWriter || !bulkFactory.isPresent()) {
 					HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
 					builder = new HadoopPathBasedBulkFormatBuilder<>(
 							new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
 							.withRollingPolicy(rollingPolicy)
-							.withBucketLifeCycleListener(listener)
 							.withOutputFileConfig(outputFileConfig);
 					LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
 				} else {
@@ -202,7 +199,6 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 							new org.apache.flink.core.fs.Path(sd.getLocation()),
 							new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
 							.withBucketAssigner(assigner)
-							.withBucketLifeCycleListener(listener)
 							.withRollingPolicy(rollingPolicy)
 							.withOutputFileConfig(outputFileConfig);
 					LOG.info("Hive streaming sink: Use native parquet&orc writer.");
@@ -215,7 +211,6 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
 						overwrite,
 						dataStream,
 						builder,
-						listener,
 						msFactory);
 			}
 		} catch (TException e) {
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
index df51ff4..6ec8651 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.hadoop.bulk.DefaultHadoopFileCommitterFactory;
 import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
@@ -30,8 +29,6 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 
 /**
@@ -54,9 +51,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath
 
 	private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
 
-	@Nullable
-	private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
 	private BucketFactory<IN, BucketID> bucketFactory;
 
 	private OutputFileConfig outputFileConfig;
@@ -108,12 +102,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath
 		return self();
 	}
 
-	@Internal
-	public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
-		this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
-		return self();
-	}
-
 	public T withBucketFactory(BucketFactory<IN, BucketID> factory) {
 		this.bucketFactory = Preconditions.checkNotNull(factory);
 		return self();
@@ -140,7 +128,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath
 				writerFactory,
 				fileCommitterFactory),
 			rollingPolicy,
-			bucketLifeCycleListener,
 			subtaskIndex,
 			outputFileConfig);
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
index f90f2a8..6667196 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
@@ -20,13 +20,11 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
 
-import java.io.Serializable;
-
 /**
  * Listener about the status of {@link Bucket}.
  */
 @Internal
-public interface BucketLifeCycleListener<IN, BucketID> extends Serializable {
+public interface BucketLifeCycleListener<IN, BucketID> {
 
 	/**
 	 * Notifies a new bucket has been created.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 0c9b73f..39acc29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -63,9 +63,6 @@ public class Buckets<IN, BucketID> {
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-	@Nullable
-	private final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
 	// --------------------------- runtime fields -----------------------------
 
 	private final int subtaskIndex;
@@ -78,6 +75,9 @@ public class Buckets<IN, BucketID> {
 
 	private final OutputFileConfig outputFileConfig;
 
+	@Nullable
+	private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
+
 	// --------------------------- State Related Fields -----------------------------
 
 	private final BucketStateSerializer<BucketID> bucketStateSerializer;
@@ -97,7 +97,6 @@ public class Buckets<IN, BucketID> {
 			final BucketFactory<IN, BucketID> bucketFactory,
 			final BucketWriter<IN, BucketID> bucketWriter,
 			final RollingPolicy<IN, BucketID> rollingPolicy,
-			@Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
 			final int subtaskIndex,
 			final OutputFileConfig outputFileConfig) {
 
@@ -106,7 +105,6 @@ public class Buckets<IN, BucketID> {
 		this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
 		this.bucketWriter = Preconditions.checkNotNull(bucketWriter);
 		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
-		this.bucketLifeCycleListener = bucketLifeCycleListener;
 		this.subtaskIndex = subtaskIndex;
 
 		this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
@@ -121,6 +119,10 @@ public class Buckets<IN, BucketID> {
 		this.maxPartCounter = 0L;
 	}
 
+	public void setBucketLifeCycleListener(BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener) {
+		this.bucketLifeCycleListener = Preconditions.checkNotNull(bucketLifeCycleListener);
+	}
+
 	/**
 	 * Initializes the state after recovery from a failure.
 	 *
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0962799..407420d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -189,8 +189,6 @@ public class StreamingFileSink<IN>
 
 		private OutputFileConfig outputFileConfig;
 
-		private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
 		protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
 			this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
 		}
@@ -231,12 +229,6 @@ public class StreamingFileSink<IN>
 			return self();
 		}
 
-		@Internal
-		public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
-			this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
-			return self();
-		}
-
 		public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
 			this.outputFileConfig = outputFileConfig;
 			return self();
@@ -267,7 +259,6 @@ public class StreamingFileSink<IN>
 					bucketFactory,
 					new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
 					rollingPolicy,
-					bucketLifeCycleListener,
 					subtaskIndex,
 					outputFileConfig);
 		}
@@ -303,8 +294,6 @@ public class StreamingFileSink<IN>
 
 		private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
 
-		private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
 		private BucketFactory<IN, BucketID> bucketFactory;
 
 		private OutputFileConfig outputFileConfig;
@@ -350,12 +339,6 @@ public class StreamingFileSink<IN>
 			return self();
 		}
 
-		@Internal
-		public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
-			this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
-			return self();
-		}
-
 		@VisibleForTesting
 		T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
 			this.bucketFactory = Preconditions.checkNotNull(factory);
@@ -387,7 +370,6 @@ public class StreamingFileSink<IN>
 					bucketFactory,
 					new BulkBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory),
 					rollingPolicy,
-					bucketLifeCycleListener,
 					subtaskIndex,
 					outputFileConfig);
 		}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index ff2cc5a..e51043e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -57,7 +57,6 @@ public class BucketAssignerITCases {
 			new DefaultBucketFactoryImpl<>(),
 			new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 			rollingPolicy,
-			null,
 			0,
 			OutputFileConfig.builder().build()
 		);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 8e2117a..9707ec7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -36,8 +36,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import javax.annotation.Nullable;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -324,7 +322,6 @@ public class BucketsTest {
 				new DefaultBucketFactoryImpl<>(),
 				new RowWiseBucketWriter<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				DefaultRollingPolicy.builder().build(),
-				null,
 				2,
 				OutputFileConfig.builder().build()
 		);
@@ -452,19 +449,23 @@ public class BucketsTest {
 	private static Buckets<String, String> createBuckets(
 			final Path basePath,
 			final RollingPolicy<String, String> rollingPolicy,
-			@Nullable final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
+			final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
 			final int subtaskIdx,
 			final OutputFileConfig outputFileConfig) throws IOException {
-		return new Buckets<>(
+		Buckets<String, String> buckets = new Buckets<>(
 				basePath,
 				new TestUtils.StringIdentityBucketAssigner(),
 				new DefaultBucketFactoryImpl<>(),
 				new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				rollingPolicy,
-				bucketLifeCycleListener,
 				subtaskIdx,
-				outputFileConfig
-		);
+				outputFileConfig);
+
+		if (bucketLifeCycleListener != null) {
+			buckets.setBucketLifeCycleListener(bucketLifeCycleListener);
+		}
+
+		return buckets;
 	}
 
 	private static Buckets<String, String> restoreBuckets(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 2a4da34..1dbd30f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -204,7 +204,6 @@ public class RollingPolicyTest {
 				new DefaultBucketFactoryImpl<>(),
 				new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
 				rollingPolicyToTest,
-				null,
 				0,
 				OutputFileConfig.builder().build()
 		);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 8ac6c0b..6408145 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -46,7 +46,6 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
-import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
 import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;
 import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
 import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
@@ -150,21 +149,18 @@ public class FileSystemTableSink implements
 					conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
 					conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis());
 
-			BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
-			InactiveBucketListener listener = new InactiveBucketListener();
+			BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
 			if (writer instanceof Encoder) {
 				//noinspection unchecked
 				bucketsBuilder = StreamingFileSink.forRowFormat(
 						path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
 						.withBucketAssigner(assigner)
-						.withBucketLifeCycleListener(listener)
 						.withRollingPolicy(rollingPolicy);
 			} else {
 				//noinspection unchecked
 				bucketsBuilder = StreamingFileSink.forBulkFormat(
 						path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
 						.withBucketAssigner(assigner)
-						.withBucketLifeCycleListener(listener)
 						.withRollingPolicy(rollingPolicy);
 			}
 			return createStreamingSink(
@@ -175,7 +171,6 @@ public class FileSystemTableSink implements
 					overwrite,
 					dataStream,
 					bucketsBuilder,
-					listener,
 					metaStoreFactory);
 		}
 	}
@@ -187,15 +182,14 @@ public class FileSystemTableSink implements
 			ObjectIdentifier tableIdentifier,
 			boolean overwrite,
 			DataStream<RowData> inputStream,
-			BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
-			InactiveBucketListener listener,
+			BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
 			TableMetaStoreFactory msFactory) {
 		if (overwrite) {
 			throw new IllegalStateException("Streaming mode not support overwrite.");
 		}
 
 		StreamingFileWriter fileWriter = new StreamingFileWriter(
-				BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, bucketsBuilder, listener);
+				BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, bucketsBuilder);
 		DataStream<CommitMessage> writerStream = inputStream.transform(
 				StreamingFileWriter.class.getSimpleName(),
 				TypeExtractor.createTypeInfo(CommitMessage.class),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java
deleted file mode 100644
index 69ab147..0000000
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.filesystem.stream;
-
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
-import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
-import org.apache.flink.table.data.RowData;
-
-import java.util.function.Consumer;
-
-/**
- * Inactive {@link BucketLifeCycleListener} to obtain inactive buckets to consumer.
- */
-public class InactiveBucketListener implements BucketLifeCycleListener<RowData, String> {
-
-	private transient Consumer<String> inactiveConsumer;
-
-	public void setInactiveConsumer(Consumer<String> inactiveConsumer) {
-		this.inactiveConsumer = inactiveConsumer;
-	}
-
-	@Override
-	public void bucketCreated(Bucket<RowData, String> bucket) {
-	}
-
-	@Override
-	public void bucketInactive(Bucket<RowData, String> bucket) {
-		inactiveConsumer.accept(bucket.getBucketId());
-	}
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index 842f833..c2186bf 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.filesystem.stream;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
 import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
@@ -51,14 +53,12 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 
 	private final long bucketCheckInterval;
 
-	private final StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
-			StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
-
-	private final InactiveBucketListener listener;
+	private final StreamingFileSink.BucketsBuilder<RowData, String, ? extends
+			StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder;
 
 	// --------------------------- runtime fields -----------------------------
 
-	private transient Buckets<RowData, ?> buckets;
+	private transient Buckets<RowData, String> buckets;
 
 	private transient StreamingFileSinkHelper<RowData> helper;
 
@@ -68,12 +68,10 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 
 	public StreamingFileWriter(
 			long bucketCheckInterval,
-			StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
-					StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
-			InactiveBucketListener listener) {
+			StreamingFileSink.BucketsBuilder<RowData, String, ? extends
+					StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder) {
 		this.bucketCheckInterval = bucketCheckInterval;
 		this.bucketsBuilder = bucketsBuilder;
-		this.listener = listener;
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
@@ -90,7 +88,17 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
 
 		inactivePartitions = new HashSet<>();
 		currentWatermark = Long.MIN_VALUE;
-		listener.setInactiveConsumer(b -> inactivePartitions.add(b));
+		buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
+
+			@Override
+			public void bucketCreated(Bucket<RowData, String> bucket) {
+			}
+
+			@Override
+			public void bucketInactive(Bucket<RowData, String> bucket) {
+				inactivePartitions.add(bucket.getBucketId());
+			}
+		});
 	}
 
 	@Override