You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:26 UTC

[flink] 04/06: [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 824752c82729b5fd6aab5c6f205476ae63c8aff5
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jan 25 18:31:17 2022 +0800

    [FLINK-25583][connectors/filesystem] Add bucketId and compactedFileToCleanup in FileSinkCommittable, delete compactedFileToCleanup in FileCommitter.
---
 .../connector/file/sink/FileSinkCommittable.java   | 74 +++++++++++++++++++++-
 .../file/sink/FileSinkCommittableSerializer.java   | 46 ++++++++++++--
 .../file/sink/committer/FileCommitter.java         | 21 ++++++
 .../file/sink/writer/FileWriterBucket.java         |  5 +-
 .../file/sink/FileCommittableSerializerTest.java   | 25 +++++++-
 .../file/sink/committer/FileCommitterTest.java     | 14 ++--
 6 files changed, 167 insertions(+), 18 deletions(-)

diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
index 2c5e8e5..7ea5b1d 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittable.java
@@ -19,11 +19,13 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -34,26 +36,51 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class FileSinkCommittable implements Serializable {
 
+    private final String bucketId;
+
     @Nullable private final InProgressFileWriter.PendingFileRecoverable pendingFile;
 
     @Nullable private final InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup;
 
-    public FileSinkCommittable(InProgressFileWriter.PendingFileRecoverable pendingFile) {
+    @Nullable private final Path compactedFileToCleanup;
+
+    public FileSinkCommittable(
+            String bucketId, InProgressFileWriter.PendingFileRecoverable pendingFile) {
+        this.bucketId = bucketId;
         this.pendingFile = checkNotNull(pendingFile);
         this.inProgressFileToCleanup = null;
+        this.compactedFileToCleanup = null;
     }
 
     public FileSinkCommittable(
+            String bucketId,
             InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) {
+        this.bucketId = bucketId;
         this.pendingFile = null;
         this.inProgressFileToCleanup = checkNotNull(inProgressFileToCleanup);
+        this.compactedFileToCleanup = null;
+    }
+
+    public FileSinkCommittable(String bucketId, Path compactedFileToCleanup) {
+        this.bucketId = bucketId;
+        this.pendingFile = null;
+        this.inProgressFileToCleanup = null;
+        this.compactedFileToCleanup = checkNotNull(compactedFileToCleanup);
     }
 
     FileSinkCommittable(
+            String bucketId,
             @Nullable InProgressFileWriter.PendingFileRecoverable pendingFile,
-            @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup) {
+            @Nullable InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup,
+            @Nullable Path compactedFileToCleanup) {
+        this.bucketId = bucketId;
         this.pendingFile = pendingFile;
         this.inProgressFileToCleanup = inProgressFileToCleanup;
+        this.compactedFileToCleanup = compactedFileToCleanup;
+    }
+
+    public String getBucketId() {
+        return bucketId;
     }
 
     public boolean hasPendingFile() {
@@ -73,4 +100,47 @@ public class FileSinkCommittable implements Serializable {
     public InProgressFileWriter.InProgressFileRecoverable getInProgressFileToCleanup() {
         return inProgressFileToCleanup;
     }
+
+    public boolean hasCompactedFileToCleanup() {
+        return compactedFileToCleanup != null;
+    }
+
+    @Nullable
+    public Path getCompactedFileToCleanup() {
+        return compactedFileToCleanup;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FileSinkCommittable that = (FileSinkCommittable) o;
+        return Objects.equals(bucketId, that.bucketId)
+                && Objects.equals(pendingFile, that.pendingFile)
+                && Objects.equals(inProgressFileToCleanup, that.inProgressFileToCleanup)
+                && Objects.equals(compactedFileToCleanup, that.compactedFileToCleanup);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(bucketId, pendingFile, inProgressFileToCleanup, compactedFileToCleanup);
+    }
+
+    @Override
+    public String toString() {
+        return "FileSinkCommittable{"
+                + "bucketId='"
+                + bucketId
+                + ", pendingFile="
+                + pendingFile
+                + ", inProgressFileToCleanup="
+                + inProgressFileToCleanup
+                + ", compactedFileToCleanup="
+                + compactedFileToCleanup
+                + '}';
+    }
 }
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
index 99ef42f..febc9a5 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSinkCommittableSerializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -55,14 +56,14 @@ public class FileSinkCommittableSerializer
 
     @Override
     public int getVersion() {
-        return 1;
+        return 2;
     }
 
     @Override
     public byte[] serialize(FileSinkCommittable committable) throws IOException {
         DataOutputSerializer out = new DataOutputSerializer(256);
         out.writeInt(MAGIC_NUMBER);
-        serializeV1(committable, out);
+        serializeV2(committable, out);
         return out.getCopyOfBuffer();
     }
 
@@ -74,14 +75,17 @@ public class FileSinkCommittableSerializer
             case 1:
                 validateMagicNumber(in);
                 return deserializeV1(in);
+            case 2:
+                validateMagicNumber(in);
+                return deserializeV2(in);
             default:
                 throw new IOException("Unrecognized version or corrupt state: " + version);
         }
     }
 
-    private void serializeV1(FileSinkCommittable committable, DataOutputView dataOutputView)
+    private void serializeV2(FileSinkCommittable committable, DataOutputView dataOutputView)
             throws IOException {
-
+        dataOutputView.writeUTF(committable.getBucketId());
         if (committable.hasPendingFile()) {
             dataOutputView.writeBoolean(true);
             SimpleVersionedSerialization.writeVersionAndSerialize(
@@ -99,6 +103,13 @@ public class FileSinkCommittableSerializer
         } else {
             dataOutputView.writeBoolean(false);
         }
+
+        if (committable.hasCompactedFileToCleanup()) {
+            dataOutputView.writeBoolean(true);
+            dataOutputView.writeUTF(committable.getCompactedFileToCleanup().toUri().toString());
+        } else {
+            dataOutputView.writeBoolean(false);
+        }
     }
 
     private FileSinkCommittable deserializeV1(DataInputView dataInputView) throws IOException {
@@ -116,7 +127,32 @@ public class FileSinkCommittableSerializer
                             inProgressFileSerializer, dataInputView);
         }
 
-        return new FileSinkCommittable(pendingFile, inProgressFileToCleanup);
+        return new FileSinkCommittable("", pendingFile, inProgressFileToCleanup, null);
+    }
+
+    private FileSinkCommittable deserializeV2(DataInputView dataInputView) throws IOException {
+        String bucketId = dataInputView.readUTF();
+        InProgressFileWriter.PendingFileRecoverable pendingFile = null;
+        if (dataInputView.readBoolean()) {
+            pendingFile =
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            pendingFileSerializer, dataInputView);
+        }
+
+        InProgressFileWriter.InProgressFileRecoverable inProgressFileToCleanup = null;
+        if (dataInputView.readBoolean()) {
+            inProgressFileToCleanup =
+                    SimpleVersionedSerialization.readVersionAndDeSerialize(
+                            inProgressFileSerializer, dataInputView);
+        }
+
+        Path committedFileToCleanup = null;
+        if (dataInputView.readBoolean()) {
+            committedFileToCleanup = new Path(dataInputView.readUTF());
+        }
+
+        return new FileSinkCommittable(
+                bucketId, pendingFile, inProgressFileToCleanup, committedFileToCleanup);
     }
 
     private static void validateMagicNumber(DataInputView in) throws IOException {
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
index c72b399..7590178 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
@@ -22,8 +22,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Collection;
 
@@ -40,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class FileCommitter implements Committer<FileSinkCommittable> {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FileCommitter.class);
+
     private final BucketWriter<?, ?> bucketWriter;
 
     public FileCommitter(BucketWriter<?, ?> bucketWriter) {
@@ -60,6 +66,21 @@ public class FileCommitter implements Committer<FileSinkCommittable> {
                 bucketWriter.cleanupInProgressFileRecoverable(
                         committable.getInProgressFileToCleanup());
             }
+
+            if (committable.hasCompactedFileToCleanup()) {
+                Path committedFileToCleanup = committable.getCompactedFileToCleanup();
+                try {
+                    committedFileToCleanup.getFileSystem().delete(committedFileToCleanup, false);
+                } catch (Exception e) {
+                    // Try best to cleanup compacting files, skip if failed.
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Failed to cleanup a compacted file, the file will be remained and should not be visible: {}",
+                                committedFileToCleanup,
+                                e);
+                    }
+                }
+            }
         }
     }
 
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
index 3f5f5f9..8385fe9 100644
--- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
@@ -202,11 +202,12 @@ class FileWriterBucket<IN> {
         }
 
         List<FileSinkCommittable> committables = new ArrayList<>();
-        pendingFiles.forEach(pendingFile -> committables.add(new FileSinkCommittable(pendingFile)));
+        pendingFiles.forEach(
+                pendingFile -> committables.add(new FileSinkCommittable(bucketId, pendingFile)));
         pendingFiles.clear();
 
         if (inProgressFileToCleanup != null) {
-            committables.add(new FileSinkCommittable(inProgressFileToCleanup));
+            committables.add(new FileSinkCommittable(bucketId, inProgressFileToCleanup));
             inProgressFileToCleanup = null;
         }
 
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
index 523fde0..ffda5dd 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileCommittableSerializerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.file.sink;
 
 import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
+import org.apache.flink.core.fs.Path;
 
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -36,23 +37,43 @@ public class FileCommittableSerializerTest {
     @Test
     public void testCommittableWithPendingFile() throws IOException {
         FileSinkCommittable committable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestPendingFileRecoverable());
+                new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable());
         FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+        assertEquals(committable.getBucketId(), deserialized.getBucketId());
         assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
         assertEquals(
                 committable.getInProgressFileToCleanup(),
                 deserialized.getInProgressFileToCleanup());
+        assertEquals(
+                committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
     }
 
     @Test
     public void testCommittableWithInProgressFileToCleanup() throws IOException {
         FileSinkCommittable committable =
-                new FileSinkCommittable(new FileSinkTestUtils.TestInProgressFileRecoverable());
+                new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable());
+        FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+        assertEquals(committable.getBucketId(), deserialized.getBucketId());
+        assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
+        assertEquals(
+                committable.getInProgressFileToCleanup(),
+                deserialized.getInProgressFileToCleanup());
+        assertEquals(
+                committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
+    }
+
+    @Test
+    public void testCommittableWithCompactedFileToCleanup() throws IOException {
+        FileSinkCommittable committable =
+                new FileSinkCommittable("0", new Path("/tmp/mock_path_to_cleanup"));
         FileSinkCommittable deserialized = serializeAndDeserialize(committable);
+        assertEquals(committable.getBucketId(), deserialized.getBucketId());
         assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
         assertEquals(
                 committable.getInProgressFileToCleanup(),
                 deserialized.getInProgressFileToCleanup());
+        assertEquals(
+                committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
     }
 
     private FileSinkCommittable serializeAndDeserialize(FileSinkCommittable committable)
diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
index 0b07370..5b7e21b 100644
--- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
+++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/committer/FileCommitterTest.java
@@ -50,7 +50,7 @@ public class FileCommitterTest {
         MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
                 new MockCommitRequest<>(
                         new FileSinkCommittable(
-                                new FileSinkTestUtils.TestPendingFileRecoverable()));
+                                "0", new FileSinkTestUtils.TestPendingFileRecoverable()));
         fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(1, stubBucketWriter.getRecoveredPendingFiles().size());
@@ -67,7 +67,7 @@ public class FileCommitterTest {
         MockCommitRequest<FileSinkCommittable> fileSinkCommittable =
                 new MockCommitRequest<>(
                         new FileSinkCommittable(
-                                new FileSinkTestUtils.TestInProgressFileRecoverable()));
+                                "0", new FileSinkTestUtils.TestInProgressFileRecoverable()));
         fileCommitter.commit(Collections.singletonList(fileSinkCommittable));
 
         assertEquals(0, stubBucketWriter.getRecoveredPendingFiles().size());
@@ -83,15 +83,15 @@ public class FileCommitterTest {
         Collection<CommitRequest<FileSinkCommittable>> committables =
                 Stream.of(
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestInProgressFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestInProgressFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestPendingFileRecoverable()),
+                                        "0", new FileSinkTestUtils.TestPendingFileRecoverable()),
                                 new FileSinkCommittable(
-                                        new FileSinkTestUtils.TestInProgressFileRecoverable()))
+                                        "0", new FileSinkTestUtils.TestInProgressFileRecoverable()))
                         .map(MockCommitRequest::new)
                         .collect(Collectors.toList());
         fileCommitter.commit(committables);