You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/09 08:17:58 UTC
[flink] 02/02: [FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 890b83ca2c5d64f6707e83952270cfca05159227
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Wed Feb 9 15:23:15 2022 +0800
[FLINK-25572][connectors/filesystem] Update File Sink to use decomposed interfaces
This closes #18642.
---
.../apache/flink/connector/file/sink/FileSink.java | 51 ++++++++--------
.../file/sink/committer/FileCommitter.java | 14 ++---
.../connector/file/sink/writer/FileWriter.java | 34 +++++++----
.../file/sink/writer/FileWriterBucket.java | 4 +-
.../file/sink/committer/FileCommitterTest.java | 55 ++++++++++-------
...leWriterBucketStateSerializerMigrationTest.java | 13 ++--
.../connector/file/sink/writer/FileWriterTest.java | 36 +++++------
.../connector/kafka/sink/KafkaCommitterTest.java | 55 +++--------------
.../connector/sink2/mocks/MockCommitRequest.java | 69 ++++++++++++++++++++++
.../table/planner/factories/TestFileFactory.java | 6 +-
10 files changed, 196 insertions(+), 141 deletions(-)
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
index 920a11b..566037f 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
@@ -23,10 +23,10 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.StatefulSink.WithCompatibleState;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriter;
@@ -52,8 +52,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -108,7 +106,10 @@ import static org.apache.flink.util.Preconditions.checkState;
* written to its output
*/
@Experimental
-public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBucketState, Void> {
+public class FileSink<IN>
+ implements StatefulSink<IN, FileWriterBucketState>,
+ TwoPhaseCommittingSink<IN, FileSinkCommittable>,
+ WithCompatibleState {
private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder;
@@ -117,17 +118,23 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
}
@Override
- public SinkWriter<IN, FileSinkCommittable, FileWriterBucketState> createWriter(
- InitContext context, List<FileWriterBucketState> states) throws IOException {
+ public FileWriter<IN> createWriter(InitContext context) throws IOException {
+ return bucketsBuilder.createWriter(context);
+ }
+
+ @Override
+ public StatefulSinkWriter<IN, FileWriterBucketState> restoreWriter(
+ InitContext context, Collection<FileWriterBucketState> recoveredState)
+ throws IOException {
FileWriter<IN> writer = bucketsBuilder.createWriter(context);
- writer.initializeState(states);
+ writer.initializeState(recoveredState);
return writer;
}
@Override
- public Optional<SimpleVersionedSerializer<FileWriterBucketState>> getWriterStateSerializer() {
+ public SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() {
try {
- return Optional.of(bucketsBuilder.getWriterStateSerializer());
+ return bucketsBuilder.getWriterStateSerializer();
} catch (IOException e) {
// it's not optimal that we have to do this but creating the serializers for the
// FileSink requires (among other things) a call to FileSystem.get() which declares
@@ -137,14 +144,14 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
}
@Override
- public Optional<Committer<FileSinkCommittable>> createCommitter() throws IOException {
- return Optional.of(bucketsBuilder.createCommitter());
+ public Committer<FileSinkCommittable> createCommitter() throws IOException {
+ return bucketsBuilder.createCommitter();
}
@Override
- public Optional<SimpleVersionedSerializer<FileSinkCommittable>> getCommittableSerializer() {
+ public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() {
try {
- return Optional.of(bucketsBuilder.getCommittableSerializer());
+ return bucketsBuilder.getCommittableSerializer();
} catch (IOException e) {
// it's not optimal that we have to do this but creating the serializers for the
// FileSink requires (among other things) a call to FileSystem.get() which declares
@@ -154,17 +161,7 @@ public class FileSink<IN> implements Sink<IN, FileSinkCommittable, FileWriterBuc
}
@Override
- public Optional<GlobalCommitter<FileSinkCommittable, Void>> createGlobalCommitter() {
- return Optional.empty();
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
- return Optional.empty();
- }
-
- @Override
- public Collection<String> getCompatibleStateNames() {
+ public Collection<String> getCompatibleWriterStateNames() {
// StreamingFileSink
return Collections.singleton("bucket-states");
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
index ccd3018..c72b399 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
@@ -19,14 +19,13 @@
package org.apache.flink.connector.file.sink.committer;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
+import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -48,9 +47,10 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
}
@Override
- public List<FileSinkCommittable> commit(List<FileSinkCommittable> committables)
- throws IOException {
- for (FileSinkCommittable committable : committables) {
+ public void commit(Collection<CommitRequest<FileSinkCommittable>> requests)
+ throws IOException, InterruptedException {
+ for (CommitRequest<FileSinkCommittable> request : requests) {
+ FileSinkCommittable committable = request.getCommittable();
if (committable.hasPendingFile()) {
// We should always use commitAfterRecovery which contains additional checks.
bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
@@ -61,8 +61,6 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
committable.getInProgressFileToCleanup());
}
}
-
- return Collections.emptyList();
}
@Override
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
index 15aebd0..fad3b50 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
@@ -20,8 +20,10 @@ package org.apache.flink.connector.file.sink.writer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSink.StatefulSinkWriter;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.core.fs.Path;
@@ -39,6 +41,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -58,8 +61,10 @@ import static org.apache.flink.util.Preconditions.checkState;
*/
@Internal
public class FileWriter<IN>
- implements SinkWriter<IN, FileSinkCommittable, FileWriterBucketState>,
- Sink.ProcessingTimeService.ProcessingTimeCallback {
+ implements StatefulSinkWriter<IN, FileWriterBucketState>,
+ TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, FileSinkCommittable>,
+ SinkWriter<IN>,
+ ProcessingTimeService.ProcessingTimeCallback {
private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
@@ -75,7 +80,7 @@ public class FileWriter<IN>
private final RollingPolicy<IN, String> rollingPolicy;
- private final Sink.ProcessingTimeService processingTimeService;
+ private final ProcessingTimeService processingTimeService;
private final long bucketCheckInterval;
@@ -89,6 +94,8 @@ public class FileWriter<IN>
private final Counter recordsOutCounter;
+ private boolean endOfInput;
+
/**
* A constructor creating a new empty bucket manager.
*
@@ -107,7 +114,7 @@ public class FileWriter<IN>
final BucketWriter<IN, String> bucketWriter,
final RollingPolicy<IN, String> rollingPolicy,
final OutputFileConfig outputFileConfig,
- final Sink.ProcessingTimeService processingTimeService,
+ final ProcessingTimeService processingTimeService,
final long bucketCheckInterval) {
this.basePath = checkNotNull(basePath);
@@ -148,7 +155,7 @@ public class FileWriter<IN>
* @throws IOException if anything goes wrong during retrieving the state or
* restoring/committing of any in-progress/pending part files
*/
- public void initializeState(List<FileWriterBucketState> bucketStates) throws IOException {
+ public void initializeState(Collection<FileWriterBucketState> bucketStates) throws IOException {
checkNotNull(bucketStates, "The retrieved state was null.");
for (FileWriterBucketState state : bucketStates) {
@@ -179,7 +186,7 @@ public class FileWriter<IN>
}
@Override
- public void write(IN element, Context context) throws IOException {
+ public void write(IN element, Context context) throws IOException, InterruptedException {
// setting the values in the bucketer context
bucketerContext.update(
context.timestamp(),
@@ -193,7 +200,12 @@ public class FileWriter<IN>
}
@Override
- public List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+ public void flush(boolean endOfInput) throws IOException, InterruptedException {
+ this.endOfInput = endOfInput;
+ }
+
+ @Override
+ public Collection<FileSinkCommittable> prepareCommit() throws IOException {
List<FileSinkCommittable> committables = new ArrayList<>();
// Every time before we prepare commit, we first check and remove the inactive
@@ -206,7 +218,7 @@ public class FileWriter<IN>
if (!entry.getValue().isActive()) {
activeBucketIt.remove();
} else {
- committables.addAll(entry.getValue().prepareCommit(flush));
+ committables.addAll(entry.getValue().prepareCommit(endOfInput));
}
}
@@ -263,7 +275,7 @@ public class FileWriter<IN>
private void registerNextBucketInspectionTimer() {
final long nextInspectionTime =
processingTimeService.getCurrentProcessingTime() + bucketCheckInterval;
- processingTimeService.registerProcessingTimer(nextInspectionTime, this);
+ processingTimeService.registerTimer(nextInspectionTime, this);
}
/**
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
index cec8648..3f5f5f9 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
@@ -191,9 +191,9 @@ class FileWriterBucket<IN> {
inProgressPart.write(element, currentTime);
}
- List<FileSinkCommittable> prepareCommit(boolean flush) throws IOException {
+ List<FileSinkCommittable> prepareCommit(boolean endOfInput) throws IOException {
if (inProgressPart != null
- && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || flush)) {
+ && (rollingPolicy.shouldRollOnCheckpoint(inProgressPart) || endOfInput)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Closing in-progress part file for bucket id={} on checkpoint.", bucketId);
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
index 935d170..0b07370 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.connector.file.sink.committer;
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.connector.file.sink.utils.NoOpBucketWriter;
@@ -28,9 +30,11 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -43,15 +47,16 @@ public class FileCommitterTest {
StubBucketWriter stubBucketWriter = new StubBucketWriter();
FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
- FileSinkCommittable fileSinkCommittable =
- new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable());
- List<FileSinkCommittable> toRetry =
- fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
+ MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
+ new MockCommitRequest<>(
+ new FileSinkCommittable(
+ new FileSinkTestUtils.TestPendingFileRecoverable()));
+ fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size());
assertEquals(0, stubBucketWriter.getNumCleanUp());
assertTrue(stubBucketWriter.getRecoveredPendingFiles().get(0).isCommitted());
- assertEquals(0, toRetry.size());
+ assertEquals(0, fileSinkCommittable.getNumberOfRetries());
}
@Test
@@ -59,14 +64,15 @@ public class FileCommitterTest {
StubBucketWriter stubBucketWriter = new StubBucketWriter();
FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
- FileSinkCommittable fileSinkCommittable =
- new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable());
- List<FileSinkCommittable> toRetry =
- fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
+ MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
+ new MockCommitRequest<>(
+ new FileSinkCommittable(
+ new FileSinkTestUtils.TestInProgressFileRecoverable()));
+ fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size());
assertEquals(1, stubBucketWriter.getNumCleanUp());
- assertEquals(0, toRetry.size());
+ assertEquals(0, fileSinkCommittable.getNumberOfRetries());
}
@Test
@@ -74,23 +80,28 @@ public class FileCommitterTest {
StubBucketWriter stubBucketWriter = new StubBucketWriter();
FileCommitter fileCommitter = new FileCommitter(stubBucketWriter);
- List<FileSinkCommittable> committables =
- Arrays.asList(
- new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
- new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
- new FileSinkCommittable(
- new FileSinkTestUtils.TestInProgressFileRecoverable()),
- new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable()),
- new FileSinkCommittable(
- new FileSinkTestUtils.TestInProgressFileRecoverable()));
- List<FileSinkCommittable> toRetry = fileCommitter.commit(committables);
+ Collection<CommitRequest<FileSinkCommittable>> committables =
+ Stream.of(
+ new FileSinkCommittable(
+ new FileSinkTestUtils.TestPendingFileRecoverable()),
+ new FileSinkCommittable(
+ new FileSinkTestUtils.TestPendingFileRecoverable()),
+ new FileSinkCommittable(
+ new FileSinkTestUtils.TestInProgressFileRecoverable()),
+ new FileSinkCommittable(
+ new FileSinkTestUtils.TestPendingFileRecoverable()),
+ new FileSinkCommittable(
+ new FileSinkTestUtils.TestInProgressFileRecoverable()))
+ .map(MockCommitRequest::new)
+ .collect(Collectors.toList());
+ fileCommitter.commit(committables);
assertEquals(3, stubBucketWriter.getRecoveredPendingFiles().size());
assertEquals(2, stubBucketWriter.getNumCleanUp());
stubBucketWriter
.getRecoveredPendingFiles()
.forEach(pendingFile -> assertTrue(pendingFile.isCommitted()));
- assertEquals(0, toRetry.size());
+ assertTrue(committables.stream().allMatch(c -> c.getNumberOfRetries() == 0));
}
// ------------------------------- Mock Classes --------------------------------
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
index 65f15f6..228b9c9 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterBucketStateSerializerMigrationTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.connector.file.sink.writer;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.connector.sink2.Committer.CommitRequest;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
@@ -169,7 +171,7 @@ public class FileWriterBucketStateSerializerMigrationTest {
}
@Test
- public void testSerializationFull() throws IOException {
+ public void testSerializationFull() throws IOException, InterruptedException {
testDeserializationFull(true, "full");
}
@@ -180,12 +182,12 @@ public class FileWriterBucketStateSerializerMigrationTest {
}
@Test
- public void testSerializationNullInProgress() throws IOException {
+ public void testSerializationNullInProgress() throws IOException, InterruptedException {
testDeserializationFull(false, "full-no-in-progress");
}
private void testDeserializationFull(final boolean withInProgress, final String scenarioName)
- throws IOException {
+ throws IOException, InterruptedException {
final BucketStatePathResolver pathResolver =
new BucketStatePathResolver(BASE_PATH, previousVersion);
@@ -221,7 +223,10 @@ public class FileWriterBucketStateSerializerMigrationTest {
// simulates we commit the recovered pending files on the first checkpoint
bucket.snapshotState();
- List<FileSinkCommittable> committables = bucket.prepareCommit(false);
+ Collection<CommitRequest<FileSinkCommittable>> committables =
+ bucket.prepareCommit(false).stream()
+ .map(MockCommitRequest::new)
+ .collect(Collectors.toList());
FileCommitter committer = new FileCommitter(createBucketWriter());
committer.commit(committables);
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
index a8fe439..e521f61 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java
@@ -18,9 +18,9 @@
package org.apache.flink.connector.file.sink.writer;
+import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
@@ -53,12 +53,14 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
+import java.util.concurrent.ScheduledFuture;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -92,7 +94,7 @@ public class FileWriterTest {
fileWriter.write("test2", new ContextImpl());
fileWriter.write("test3", new ContextImpl());
- List<FileSinkCommittable> committables = fileWriter.prepareCommit(false);
+ Collection<FileSinkCommittable> committables = fileWriter.prepareCommit();
assertEquals(3, committables.size());
}
@@ -112,7 +114,7 @@ public class FileWriterTest {
fileWriter.write("test3", new ContextImpl());
assertEquals(3, fileWriter.getActiveBuckets().size());
- fileWriter.prepareCommit(false);
+ fileWriter.prepareCommit();
List<FileWriterBucketState> states = fileWriter.snapshotState(1L);
assertEquals(3, states.size());
@@ -145,7 +147,7 @@ public class FileWriterTest {
firstFileWriter.write("test2", new ContextImpl());
firstFileWriter.write("test3", new ContextImpl());
- firstFileWriter.prepareCommit(false);
+ firstFileWriter.prepareCommit();
List<FileWriterBucketState> firstState = firstFileWriter.snapshotState(1L);
FileWriter<String> secondFileWriter =
@@ -157,7 +159,7 @@ public class FileWriterTest {
secondFileWriter.write("test1", new ContextImpl());
secondFileWriter.write("test2", new ContextImpl());
- secondFileWriter.prepareCommit(false);
+ secondFileWriter.prepareCommit();
List<FileWriterBucketState> secondState = secondFileWriter.snapshotState(1L);
List<FileWriterBucketState> mergedState = new ArrayList<>();
@@ -197,17 +199,17 @@ public class FileWriterTest {
path, OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
fileWriter.write("test", new ContextImpl());
- fileWriter.prepareCommit(false);
+ fileWriter.prepareCommit();
fileWriter.snapshotState(1L);
// No more records and another call to prepareCommit will makes it inactive
- fileWriter.prepareCommit(false);
+ fileWriter.prepareCommit();
assertTrue(fileWriter.getActiveBuckets().isEmpty());
}
@Test
- public void testOnProcessingTime() throws IOException, InterruptedException {
+ public void testOnProcessingTime() throws Exception {
File outDir = TEMP_FOLDER.newFolder();
Path path = new Path(outDir.toURI());
@@ -247,7 +249,7 @@ public class FileWriterTest {
// Close, pre-commit & clear all the pending records.
processingTimeService.advanceTo(30);
- fileWriter.prepareCommit(false);
+ fileWriter.prepareCommit();
// Test timer re-registration.
fileWriter.write("test1", new ContextImpl());
@@ -278,7 +280,7 @@ public class FileWriterTest {
}
@Test
- public void testNumberRecordsOutCounter() throws IOException {
+ public void testNumberRecordsOutCounter() throws IOException, InterruptedException {
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
File outDir = TEMP_FOLDER.newFolder();
@@ -350,8 +352,7 @@ public class FileWriterTest {
}
}
- private static class ManuallyTriggeredProcessingTimeService
- implements Sink.ProcessingTimeService {
+ private static class ManuallyTriggeredProcessingTimeService implements ProcessingTimeService {
private long now;
@@ -364,20 +365,21 @@ public class FileWriterTest {
}
@Override
- public void registerProcessingTimer(
+ public ScheduledFuture<?> registerTimer(
long time, ProcessingTimeCallback processingTimeCallback) {
if (time <= now) {
try {
processingTimeCallback.onProcessingTime(now);
- } catch (IOException | InterruptedException e) {
+ } catch (Exception e) {
ExceptionUtils.rethrow(e);
}
} else {
timers.add(new Tuple2<>(time, processingTimeCallback));
}
+ return null;
}
- public void advanceTo(long time) throws IOException, InterruptedException {
+ public void advanceTo(long time) throws Exception {
if (time > now) {
now = time;
@@ -464,7 +466,7 @@ public class FileWriterTest {
BucketAssigner<String, String> bucketAssigner,
RollingPolicy<String, String> rollingPolicy,
OutputFileConfig outputFileConfig,
- Sink.ProcessingTimeService processingTimeService,
+ ProcessingTimeService processingTimeService,
long bucketCheckInterval)
throws IOException {
return new FileWriter<>(
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 157699f..30bbdca7 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -17,7 +17,7 @@
package org.apache.flink.connector.kafka.sink;
-import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.connector.sink2.mocks.MockCommitRequest;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.kafka.clients.CommonClientConfigs;
@@ -50,8 +50,8 @@ public class KafkaCommitterTest {
new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID);
Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
new Recyclable<>(producer, p -> {})) {
- final MockCommitRequest request =
- new MockCommitRequest(
+ final MockCommitRequest<KafkaCommittable> request =
+ new MockCommitRequest<>(
new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
producer.resumeTransaction(PRODUCER_ID, EPOCH);
@@ -73,12 +73,13 @@ public class KafkaCommitterTest {
Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
new Recyclable<>(producer, p -> {})) {
// will fail because transaction not started
- final MockCommitRequest request =
- new MockCommitRequest(
+ final MockCommitRequest<KafkaCommittable> request =
+ new MockCommitRequest<>(
new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
committer.commit(Collections.singletonList(request));
- assertThat(request.failedWithUnknownReason).isInstanceOf(IllegalStateException.class);
- assertThat(request.failedWithUnknownReason.getMessage())
+ assertThat(request.getFailedWithUnknownReason())
+ .isInstanceOf(IllegalStateException.class);
+ assertThat(request.getFailedWithUnknownReason().getMessage())
.contains("Transaction was not started");
assertThat(recyclable.isRecycled()).isTrue();
}
@@ -93,44 +94,4 @@ public class KafkaCommitterTest {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return properties;
}
-
- private static class MockCommitRequest implements Committer.CommitRequest<KafkaCommittable> {
-
- private final KafkaCommittable committable;
- private int retries = 0;
- Throwable failedWithUnknownReason;
-
- MockCommitRequest(KafkaCommittable committable) {
- this.committable = committable;
- }
-
- @Override
- public KafkaCommittable getCommittable() {
- return committable;
- }
-
- @Override
- public int getNumberOfRetries() {
- return retries;
- }
-
- @Override
- public void signalFailedWithKnownReason(Throwable t) {}
-
- @Override
- public void signalFailedWithUnknownReason(Throwable t) {
- failedWithUnknownReason = t;
- }
-
- @Override
- public void retryLater() {
- retries++;
- }
-
- @Override
- public void updateAndRetryLater(KafkaCommittable committable) {}
-
- @Override
- public void signalAlreadyCommitted() {}
- }
}
diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java
new file mode 100644
index 0000000..75a652a
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/connector/sink2/mocks/MockCommitRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.sink2.mocks;
+
+import org.apache.flink.api.connector.sink2.Committer;
+
+/**
+ * A simple {@link Committer.CommitRequest} used for testing.
+ *
+ * @param <CommT> committable type
+ */
+public class MockCommitRequest<CommT> implements Committer.CommitRequest<CommT> {
+
+ private final CommT committable;
+ private int retries = 0;
+ private Throwable failedWithUnknownReason;
+
+ public MockCommitRequest(CommT committable) {
+ this.committable = committable;
+ }
+
+ @Override
+ public CommT getCommittable() {
+ return committable;
+ }
+
+ @Override
+ public int getNumberOfRetries() {
+ return retries;
+ }
+
+ @Override
+ public void signalFailedWithKnownReason(Throwable t) {}
+
+ @Override
+ public void signalFailedWithUnknownReason(Throwable t) {
+ failedWithUnknownReason = t;
+ }
+
+ @Override
+ public void retryLater() {
+ retries++;
+ }
+
+ @Override
+ public void updateAndRetryLater(CommT committable) {}
+
+ @Override
+ public void signalAlreadyCommitted() {}
+
+ public Throwable getFailedWithUnknownReason() {
+ return failedWithUnknownReason;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
index 8949160..626a395 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.SinkProvider;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -56,7 +56,7 @@ import java.util.Set;
/**
* Test implementation of {@link DynamicTableSourceFactory} and {@link DynamicTableSinkFactory} that
- * creates a file source and sink based on {@link SourceProvider} and {@link SinkProvider}.
+ * creates a file source and sink based on {@link SourceProvider} and {@link SinkV2Provider}.
*/
public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
@@ -158,7 +158,7 @@ public class TestFileFactory implements DynamicTableSourceFactory, DynamicTableS
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final FileSink<RowData> fileSink =
FileSink.forRowFormat(path, new RowDataEncoder()).build();
- return SinkProvider.of(fileSink);
+ return SinkV2Provider.of(fileSink);
}
@Override