You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/05/27 08:28:01 UTC
[flink] 02/02: [FLINK-17934][fs-connector] Add listener to Buckets
and remove listener for BucketsBuilder
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5a9fe5d431cc7722a0af0ab2d6a95460e7e38e77
Author: Gao Yun <ga...@gmail.com>
AuthorDate: Wed May 27 10:46:46 2020 +0800
[FLINK-17934][fs-connector] Add listener to Buckets and remove listener for BucketsBuilder
---
.../flink/connectors/hive/HiveTableSink.java | 7 +---
.../HadoopPathBasedBulkFormatBuilder.java | 13 ------
.../sink/filesystem/BucketLifeCycleListener.java | 4 +-
.../api/functions/sink/filesystem/Buckets.java | 12 +++---
.../sink/filesystem/StreamingFileSink.java | 18 ---------
.../sink/filesystem/BucketAssignerITCases.java | 1 -
.../api/functions/sink/filesystem/BucketsTest.java | 17 ++++----
.../sink/filesystem/RollingPolicyTest.java | 1 -
.../table/filesystem/FileSystemTableSink.java | 12 ++----
.../filesystem/stream/InactiveBucketListener.java | 46 ----------------------
.../filesystem/stream/StreamingFileWriter.java | 28 ++++++++-----
11 files changed, 39 insertions(+), 120 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index aa83d7a..35e355c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -46,7 +46,6 @@ import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.FileSystemTableSink;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableBucketAssigner;
import org.apache.flink.table.filesystem.FileSystemTableSink.TableRollingPolicy;
-import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
@@ -185,16 +184,14 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
true,
conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis());
- InactiveBucketListener listener = new InactiveBucketListener();
Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
- BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> builder;
+ BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> builder;
if (userMrWriter || !bulkFactory.isPresent()) {
HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory);
builder = new HadoopPathBasedBulkFormatBuilder<>(
new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner)
.withRollingPolicy(rollingPolicy)
- .withBucketLifeCycleListener(listener)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer.");
} else {
@@ -202,7 +199,6 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
new org.apache.flink.core.fs.Path(sd.getLocation()),
new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer))
.withBucketAssigner(assigner)
- .withBucketLifeCycleListener(listener)
.withRollingPolicy(rollingPolicy)
.withOutputFileConfig(outputFileConfig);
LOG.info("Hive streaming sink: Use native parquet&orc writer.");
@@ -215,7 +211,6 @@ public class HiveTableSink implements AppendStreamTableSink, PartitionableTableS
overwrite,
dataStream,
builder,
- listener,
msFactory);
}
} catch (TException e) {
diff --git a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
index df51ff4..6ec8651 100644
--- a/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
+++ b/flink-formats/flink-hadoop-bulk/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
-import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.hadoop.bulk.DefaultHadoopFileCommitterFactory;
import org.apache.flink.formats.hadoop.bulk.HadoopFileCommitterFactory;
@@ -30,8 +29,6 @@ import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
-import javax.annotation.Nullable;
-
import java.io.IOException;
/**
@@ -54,9 +51,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath
private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
- @Nullable
- private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
private BucketFactory<IN, BucketID> bucketFactory;
private OutputFileConfig outputFileConfig;
@@ -108,12 +102,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath
return self();
}
- @Internal
- public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
- this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
- return self();
- }
-
public T withBucketFactory(BucketFactory<IN, BucketID> factory) {
this.bucketFactory = Preconditions.checkNotNull(factory);
return self();
@@ -140,7 +128,6 @@ public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends HadoopPath
writerFactory,
fileCommitterFactory),
rollingPolicy,
- bucketLifeCycleListener,
subtaskIndex,
outputFileConfig);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
index f90f2a8..6667196 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketLifeCycleListener.java
@@ -20,13 +20,11 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
-import java.io.Serializable;
-
/**
* Listener about the status of {@link Bucket}.
*/
@Internal
-public interface BucketLifeCycleListener<IN, BucketID> extends Serializable {
+public interface BucketLifeCycleListener<IN, BucketID> {
/**
* Notifies a new bucket has been created.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index 0c9b73f..39acc29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -63,9 +63,6 @@ public class Buckets<IN, BucketID> {
private final RollingPolicy<IN, BucketID> rollingPolicy;
- @Nullable
- private final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
// --------------------------- runtime fields -----------------------------
private final int subtaskIndex;
@@ -78,6 +75,9 @@ public class Buckets<IN, BucketID> {
private final OutputFileConfig outputFileConfig;
+ @Nullable
+ private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
+
// --------------------------- State Related Fields -----------------------------
private final BucketStateSerializer<BucketID> bucketStateSerializer;
@@ -97,7 +97,6 @@ public class Buckets<IN, BucketID> {
final BucketFactory<IN, BucketID> bucketFactory,
final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
- @Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
final int subtaskIndex,
final OutputFileConfig outputFileConfig) {
@@ -106,7 +105,6 @@ public class Buckets<IN, BucketID> {
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
this.bucketWriter = Preconditions.checkNotNull(bucketWriter);
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
- this.bucketLifeCycleListener = bucketLifeCycleListener;
this.subtaskIndex = subtaskIndex;
this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
@@ -121,6 +119,10 @@ public class Buckets<IN, BucketID> {
this.maxPartCounter = 0L;
}
+ public void setBucketLifeCycleListener(BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener) {
+ this.bucketLifeCycleListener = Preconditions.checkNotNull(bucketLifeCycleListener);
+ }
+
/**
* Initializes the state after recovery from a failure.
*
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index 0962799..407420d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -189,8 +189,6 @@ public class StreamingFileSink<IN>
private OutputFileConfig outputFileConfig;
- private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
}
@@ -231,12 +229,6 @@ public class StreamingFileSink<IN>
return self();
}
- @Internal
- public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
- this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
- return self();
- }
-
public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
this.outputFileConfig = outputFileConfig;
return self();
@@ -267,7 +259,6 @@ public class StreamingFileSink<IN>
bucketFactory,
new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
rollingPolicy,
- bucketLifeCycleListener,
subtaskIndex,
outputFileConfig);
}
@@ -303,8 +294,6 @@ public class StreamingFileSink<IN>
private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;
- private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;
-
private BucketFactory<IN, BucketID> bucketFactory;
private OutputFileConfig outputFileConfig;
@@ -350,12 +339,6 @@ public class StreamingFileSink<IN>
return self();
}
- @Internal
- public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
- this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
- return self();
- }
-
@VisibleForTesting
T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
this.bucketFactory = Preconditions.checkNotNull(factory);
@@ -387,7 +370,6 @@ public class StreamingFileSink<IN>
bucketFactory,
new BulkBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory),
rollingPolicy,
- bucketLifeCycleListener,
subtaskIndex,
outputFileConfig);
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index ff2cc5a..e51043e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -57,7 +57,6 @@ public class BucketAssignerITCases {
new DefaultBucketFactoryImpl<>(),
new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicy,
- null,
0,
OutputFileConfig.builder().build()
);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index 8e2117a..9707ec7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -36,8 +36,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import javax.annotation.Nullable;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -324,7 +322,6 @@ public class BucketsTest {
new DefaultBucketFactoryImpl<>(),
new RowWiseBucketWriter<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
DefaultRollingPolicy.builder().build(),
- null,
2,
OutputFileConfig.builder().build()
);
@@ -452,19 +449,23 @@ public class BucketsTest {
private static Buckets<String, String> createBuckets(
final Path basePath,
final RollingPolicy<String, String> rollingPolicy,
- @Nullable final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
+ final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
final int subtaskIdx,
final OutputFileConfig outputFileConfig) throws IOException {
- return new Buckets<>(
+ Buckets<String, String> buckets = new Buckets<>(
basePath,
new TestUtils.StringIdentityBucketAssigner(),
new DefaultBucketFactoryImpl<>(),
new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicy,
- bucketLifeCycleListener,
subtaskIdx,
- outputFileConfig
- );
+ outputFileConfig);
+
+ if (bucketLifeCycleListener != null) {
+ buckets.setBucketLifeCycleListener(bucketLifeCycleListener);
+ }
+
+ return buckets;
}
private static Buckets<String, String> restoreBuckets(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 2a4da34..1dbd30f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -204,7 +204,6 @@ public class RollingPolicyTest {
new DefaultBucketFactoryImpl<>(),
new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicyToTest,
- null,
0,
OutputFileConfig.builder().build()
);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 8ac6c0b..6408145 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -46,7 +46,6 @@ import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FileSystemFormatFactory;
-import org.apache.flink.table.filesystem.stream.InactiveBucketListener;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;
import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
@@ -150,21 +149,18 @@ public class FileSystemTableSink implements
conf.get(SINK_ROLLING_POLICY_FILE_SIZE),
conf.get(SINK_ROLLING_POLICY_TIME_INTERVAL).toMillis());
- BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
- InactiveBucketListener listener = new InactiveBucketListener();
+ BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
if (writer instanceof Encoder) {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forRowFormat(
path, new ProjectionEncoder((Encoder<RowData>) writer, computer))
.withBucketAssigner(assigner)
- .withBucketLifeCycleListener(listener)
.withRollingPolicy(rollingPolicy);
} else {
//noinspection unchecked
bucketsBuilder = StreamingFileSink.forBulkFormat(
path, new ProjectionBulkFactory((BulkWriter.Factory<RowData>) writer, computer))
.withBucketAssigner(assigner)
- .withBucketLifeCycleListener(listener)
.withRollingPolicy(rollingPolicy);
}
return createStreamingSink(
@@ -175,7 +171,6 @@ public class FileSystemTableSink implements
overwrite,
dataStream,
bucketsBuilder,
- listener,
metaStoreFactory);
}
}
@@ -187,15 +182,14 @@ public class FileSystemTableSink implements
ObjectIdentifier tableIdentifier,
boolean overwrite,
DataStream<RowData> inputStream,
- BucketsBuilder<RowData, ?, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
- InactiveBucketListener listener,
+ BucketsBuilder<RowData, String, ? extends BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
TableMetaStoreFactory msFactory) {
if (overwrite) {
throw new IllegalStateException("Streaming mode not support overwrite.");
}
StreamingFileWriter fileWriter = new StreamingFileWriter(
- BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, bucketsBuilder, listener);
+ BucketsBuilder.DEFAULT_BUCKET_CHECK_INTERVAL, bucketsBuilder);
DataStream<CommitMessage> writerStream = inputStream.transform(
StreamingFileWriter.class.getSimpleName(),
TypeExtractor.createTypeInfo(CommitMessage.class),
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java
deleted file mode 100644
index 69ab147..0000000
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/InactiveBucketListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.filesystem.stream;
-
-import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
-import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
-import org.apache.flink.table.data.RowData;
-
-import java.util.function.Consumer;
-
-/**
- * Inactive {@link BucketLifeCycleListener} to obtain inactive buckets to consumer.
- */
-public class InactiveBucketListener implements BucketLifeCycleListener<RowData, String> {
-
- private transient Consumer<String> inactiveConsumer;
-
- public void setInactiveConsumer(Consumer<String> inactiveConsumer) {
- this.inactiveConsumer = inactiveConsumer;
- }
-
- @Override
- public void bucketCreated(Bucket<RowData, String> bucket) {
- }
-
- @Override
- public void bucketInactive(Bucket<RowData, String> bucket) {
- inactiveConsumer.accept(bucket.getBucketId());
- }
-}
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
index 842f833..c2186bf 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/stream/StreamingFileWriter.java
@@ -20,6 +20,8 @@ package org.apache.flink.table.filesystem.stream;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Bucket;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper;
@@ -51,14 +53,12 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
private final long bucketCheckInterval;
- private final StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
- StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder;
-
- private final InactiveBucketListener listener;
+ private final StreamingFileSink.BucketsBuilder<RowData, String, ? extends
+ StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder;
// --------------------------- runtime fields -----------------------------
- private transient Buckets<RowData, ?> buckets;
+ private transient Buckets<RowData, String> buckets;
private transient StreamingFileSinkHelper<RowData> helper;
@@ -68,12 +68,10 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
public StreamingFileWriter(
long bucketCheckInterval,
- StreamingFileSink.BucketsBuilder<RowData, ?, ? extends
- StreamingFileSink.BucketsBuilder<RowData, ?, ?>> bucketsBuilder,
- InactiveBucketListener listener) {
+ StreamingFileSink.BucketsBuilder<RowData, String, ? extends
+ StreamingFileSink.BucketsBuilder<RowData, String, ?>> bucketsBuilder) {
this.bucketCheckInterval = bucketCheckInterval;
this.bucketsBuilder = bucketsBuilder;
- this.listener = listener;
setChainingStrategy(ChainingStrategy.ALWAYS);
}
@@ -90,7 +88,17 @@ public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage>
inactivePartitions = new HashSet<>();
currentWatermark = Long.MIN_VALUE;
- listener.setInactiveConsumer(b -> inactivePartitions.add(b));
+ buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() {
+
+ @Override
+ public void bucketCreated(Bucket<RowData, String> bucket) {
+ }
+
+ @Override
+ public void bucketInactive(Bucket<RowData, String> bucket) {
+ inactivePartitions.add(bucket.getBucketId());
+ }
+ });
}
@Override