You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:26 UTC
[flink] 04/06: [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 824752c82729b5fd6aab5c6f205476ae63c8aff5
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jan 25 18:31:17 2022 +0800
[FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.
---
.../connector/file/sink/FileSinkCommittable.java | 74 +++++++++++++++++++++-
.../file/sink/FileSinkCommittableSerializer.java | 46 ++++++++++++--
.../file/sink/committer/FileCommitter.java | 21 ++++++
.../file/sink/writer/FileWriterBucket.java | 5 +-
.../file/sink/FileCommittableSerializerTest.java | 25 +++++++-
.../file/sink/committer/FileCommitterTest.java | 14 ++--
6 files changed, 167 insertions(+), 18 deletions(-)
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
index 2c5e8e5..7ea5b1d 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
@@ -19,11 +19,13 @@
package org.apache.flink.connector.file.sink;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -34,26 +36,51 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class FileSinkCommittable implements Serializable {
+ private final String bucketId;
+
@Nullable private final InProgressFileWriter.PendingFileRecoverable pendingFile;
@Nullable private final InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup;
- public FileSinkCommittable(InProgressFileWriter.PendingFileRecoverable pendingFile) {
+ @Nullable private final Path compactedFileToCleanup;
+
+ public FileSinkCommittable(
+ String bucketId, InProgressFileWriter.PendingFileRecoverable pendingFile) {
+ this.bucketId = bucketId;
this.pendingFile = checkNotNull(pendingFile);
this.inProgressFileToCleanup = null;
+ this.compactedFileToCleanup = null;
}
public FileSinkCommittable(
+ String bucketId,
InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) {
+ this.bucketId = bucketId;
this.pendingFile = null;
this.inProgressFileToCleanup = checkNotNull(inProgressFileToCleanup);
+ this.compactedFileToCleanup = null;
+ }
+
+ public FileSinkCommittable(String bucketId, Path compactedFileToCleanup) {
+ this.bucketId = bucketId;
+ this.pendingFile = null;
+ this.inProgressFileToCleanup = null;
+ this.compactedFileToCleanup = checkNotNull(compactedFileToCleanup);
}
FileSinkCommittable(
+ String bucketId,
@Nullable InProgressFileWriter.PendingFileRecoverable pendingFile,
- @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) {
+ @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup,
+ @Nullable Path compactedFileToCleanup) {
+ this.bucketId = bucketId;
this.pendingFile = pendingFile;
this.inProgressFileToCleanup = inProgressFileToCleanup;
+ this.compactedFileToCleanup = compactedFileToCleanup;
+ }
+
+ public String getBucketId() {
+ return bucketId;
}
public boolean hasPendingFile() {
@@ -73,4 +100,47 @@ public class FileSinkCommittable implements Serializable {
public InProgressFileWriter.InProgressFileRecoverable getInProgressFileToCleanup() {
return inProgressFileToCleanup;
}
+
+ public boolean hasCompactedFileToCleanup() {
+ return compactedFileToCleanup != null;
+ }
+
+ @Nullable
+ public Path getCompactedFileToCleanup() {
+ return compactedFileToCleanup;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FileSinkCommittable that = (FileSinkCommittable) o;
+ return Objects.equals(bucketId, that.bucketId)
+ && Objects.equals(pendingFile, that.pendingFile)
+ && Objects.equals(inProgressFileToCleanup, that.inProgressFileToCleanup)
+ && Objects.equals(compactedFileToCleanup, that.compactedFileToCleanup);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(bucketId, pendingFile, inProgressFileToCleanup, compactedFileToCleanup);
+ }
+
+ @Override
+ public String toString() {
+ return "FileSinkCommittable{"
+ + "bucketId='"
+ + bucketId
+ + ", pendingFile="
+ + pendingFile
+ + ", inProgressFileToCleanup="
+ + inProgressFileToCleanup
+ + ", compactedFileToCleanup="
+ + compactedFileToCleanup
+ + '}';
+ }
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
index 99ef42f..febc9a5 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.file.sink;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
@@ -55,14 +56,14 @@ public class FileSinkCommittableSerializer
@Override
public int getVersion() {
- return 1;
+ return 2;
}
@Override
public byte[] serialize(FileSinkCommittable committable) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeInt(MAGIC_NUMBER);
- serializeV1(committable, out);
+ serializeV2(committable, out);
return out.getCopyOfBuffer();
}
@@ -74,14 +75,17 @@ public class FileSinkCommittableSerializer
case 1:
validateMagicNumber(in);
return deserializeV1(in);
+ case 2:
+ validateMagicNumber(in);
+ return deserializeV2(in);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
- private void serializeV1(FileSinkCommittable committable, DataOutputView dataOutputView)
+ private void serializeV2(FileSinkCommittable committable, DataOutputView dataOutputView)
throws IOException {
-
+ dataOutputView.writeUTF(committable.getBucketId());
if (committable.hasPendingFile()) {
dataOutputView.writeBoolean(true);
SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -99,6 +103,13 @@ public class FileSinkCommittableSerializer
} else {
dataOutputView.writeBoolean(false);
}
+
+ if (committable.hasCompactedFileToCleanup()) {
+ dataOutputView.writeBoolean(true);
+ dataOutputView.writeUTF(committable.getCompactedFileToCleanup().toUri().toString());
+ } else {
+ dataOutputView.writeBoolean(false);
+ }
}
private FileSinkCommittable deserializeV1(DataInputView dataInputView) throws IOException {
@@ -116,7 +127,32 @@ public class FileSinkCommittableSerializer
inProgressFileSerializer, dataInputView);
}
- return new FileSinkCommittable(pendingFile, inProgressFileToCleanup);
+ return new FileSinkCommittable("", pendingFile, inProgressFileToCleanup, null);
+ }
+
+ private FileSinkCommittable deserializeV2(DataInputView dataInputView) throws IOException {
+ String bucketId = dataInputView.readUTF();
+ InProgressFileWriter.PendingFileRecoverable pendingFile = null;
+ if (dataInputView.readBoolean()) {
+ pendingFile =
+ SimpleVersionedSerialization.readVersionAndDeSerialize(
+ pendingFileSerializer, dataInputView);
+ }
+
+ InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup = null;
+ if (dataInputView.readBoolean()) {
+ inProgressFileToCleanup =
+ SimpleVersionedSerialization.readVersionAndDeSerialize(
+ inProgressFileSerializer, dataInputView);
+ }
+
+ Path committedFileToCleanup = null;
+ if (dataInputView.readBoolean()) {
+ committedFileToCleanup = new Path(dataInputView.readUTF());
+ }
+
+ return new FileSinkCommittable(
+ bucketId, pendingFile, inProgressFileToCleanup, committedFileToCleanup);
}
private static void validateMagicNumber(DataInputView in) throws IOException {
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
index c72b399..7590178 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
@@ -22,8 +22,12 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Collection;
@@ -40,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class FileCommitter implements Committer<FileSinkCommittable> {
+ private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class);
+
private final BucketWriter<?, ?> bucketWriter;
public FileCommitter(BucketWriter<?, ?> bucketWriter) {
@@ -60,6 +66,21 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
bucketWriter.cleanupInProgressFileRecoverable(
committable.getInProgressFileToCleanup());
}
+
+ if (committable.hasCompactedFileToCleanup()) {
+ Path committedFileToCleanup = committable.getCompactedFileToCleanup();
+ try {
+ committedFileToCleanup.getFileSystem().delete(committedFileToCleanup, false);
+ } catch (Exception e) {
+ // Try best to cleanup compacting files, skip if failed.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Failed to cleanup a compacted file, the file will be remained and should not be visible: {}",
+ committedFileToCleanup,
+ e);
+ }
+ }
+ }
}
}
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
index 3f5f5f9..8385fe9 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
@@ -202,11 +202,12 @@ class FileWriterBucket<IN> {
}
List<FileSinkCommittable> committables = new ArrayList<>();
- pendingFiles.forEach(pendingFile -> committables.add(new FileSinkCommittable(pendingFile)));
+ pendingFiles.forEach(
+ pendingFile -> committables.add(new FileSinkCommittable(bucketId, pendingFile)));
pendingFiles.clear();
if (inProgressFileToCleanup != null) {
- committables.add(new FileSinkCommittable(inProgressFileToCleanup));
+ committables.add(new FileSinkCommittable(bucketId, inProgressFileToCleanup));
inProgressFileToCleanup = null;
}
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
index 523fde0..ffda5dd 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.file.sink;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
+import org.apache.flink.core.fs.Path;
import org.junit.ClassRule;
import org.junit.Test;
@@ -36,23 +37,43 @@ public class FileCommittableSerializerTest {
@Test
public void testCommittableWithPendingFile() throws IOException {
FileSinkCommittable committable =
- new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable());
+ new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable());
FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+ assertEquals(committable.getBucketId(), deserialized.getBucketId());
assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
assertEquals(
committable.getInProgressFileToCleanup(),
deserialized.getInProgressFileToCleanup());
+ assertEquals(
+ committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
}
@Test
public void testCommittableWithInProgressFileToCleanup() throws IOException {
FileSinkCommittable committable =
- new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable());
+ new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable());
+ FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+ assertEquals(committable.getBucketId(), deserialized.getBucketId());
+ assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
+ assertEquals(
+ committable.getInProgressFileToCleanup(),
+ deserialized.getInProgressFileToCleanup());
+ assertEquals(
+ committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
+ }
+
+ @Test
+ public void testCommittableWithCompactedFileToCleanup() throws IOException {
+ FileSinkCommittable committable =
+ new FileSinkCommittable("0", new Path("/tmp/mock_path_to_cleanup"));
FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+ assertEquals(committable.getBucketId(), deserialized.getBucketId());
assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
assertEquals(
committable.getInProgressFileToCleanup(),
deserialized.getInProgressFileToCleanup());
+ assertEquals(
+ committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
}
private FileSinkCommittable serializeAndDeserialize(FileSinkCommittable committable)
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
index 0b07370..5b7e21b 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
@@ -50,7 +50,7 @@ public class FileCommitterTest {
MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
new MockCommitRequest<>(
new FileSinkCommittable(
- new FileSinkTestUtils.TestPendingFileRecoverable()));
+ "0", new FileSinkTestUtils.TestPendingFileRecoverable()));
fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size());
@@ -67,7 +67,7 @@ public class FileCommitterTest {
MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
new MockCommitRequest<>(
new FileSinkCommittable(
- new FileSinkTestUtils.TestInProgressFileRecoverable()));
+ "0", new FileSinkTestUtils.TestInProgressFileRecoverable()));
fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size());
@@ -83,15 +83,15 @@ public class FileCommitterTest {
Collection<CommitRequest<FileSinkCommittable>> committables =
Stream.of(
new FileSinkCommittable(
- new FileSinkTestUtils.TestPendingFileRecoverable()),
+ "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
new FileSinkCommittable(
- new FileSinkTestUtils.TestPendingFileRecoverable()),
+ "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
new FileSinkCommittable(
- new FileSinkTestUtils.TestInProgressFileRecoverable()),
+ "0", new FileSinkTestUtils.TestInProgressFileRecoverable()),
new FileSinkCommittable(
- new FileSinkTestUtils.TestPendingFileRecoverable()),
+ "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
new FileSinkCommittable(
- new FileSinkTestUtils.TestInProgressFileRecoverable()))
+ "0", new FileSinkTestUtils.TestInProgressFileRecoverable()))
.map(MockCommitRequest::new)
.collect(Collectors.toList());
fileCommitter.commit(committables);