You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:24 UTC

[flink] 02/06: [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.

This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 51adf0fcbdc63520a58246100b5a585b9f563ba6
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jan 25 18:23:27 2022 +0800

    [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.
---
 .../7602816f-5c01-4b7a-9e3e-235dfedec245           |  1 -
 .../functions/sink/filesystem/BucketWriter.java    | 26 ++++++++++++
 .../functions/sink/filesystem/BulkPartWriter.java  |  1 +
 ...ssFileWriter.java => CompactingFileWriter.java} | 47 +++++++++-------------
 .../sink/filesystem/InProgressFileWriter.java      | 10 ++++-
 ... => OutputStreamBasedCompactingFileWriter.java} | 35 ++++++----------
 .../OutputStreamBasedPartFileWriter.java           | 36 ++++++++++++++++-
 ...er.java => RecordWiseCompactingFileWriter.java} | 33 +++++----------
 .../sink/filesystem/RowWisePartWriter.java         |  8 ++--
 9 files changed, 117 insertions(+), 80 deletions(-)

diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
index 4bfb283..f61d7d9 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
+++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
@@ -225,7 +225,6 @@ org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter
 org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
-org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
 org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
index 88ad598..00c7890 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
@@ -40,6 +40,32 @@ public interface BucketWriter<IN, BucketID> {
             final BucketID bucketID, final Path path, final long creationTime) throws IOException;
 
     /**
+     * Used to create a new {@link CompactingFileWriter} of the requesting type. Requesting a writer
+     * of an unsupported type will result in UnsupportedOperationException. By default, only
+     * RECORD_WISE type is supported, for which a {@link InProgressFileWriter} will be created.
+     *
+     * @param type the type of this writer.
+     * @param bucketID the id of the bucket this writer is writing to.
+     * @param path the path this writer will write to.
+     * @param creationTime the creation time of the file.
+     * @return the new {@link InProgressFileWriter}
+     * @throws IOException Thrown if creating a writer fails.
+     * @throws UnsupportedOperationException Thrown if the bucket writer doesn't support the
+     *     requesting type.
+     */
+    default CompactingFileWriter openNewCompactingFile(
+            final CompactingFileWriter.Type type,
+            final BucketID bucketID,
+            final Path path,
+            final long creationTime)
+            throws IOException {
+        if (type == CompactingFileWriter.Type.RECORD_WISE) {
+            return openNewInProgressFile(bucketID, path, creationTime);
+        }
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * Used to resume a {@link InProgressFileWriter} from a {@link
      * InProgressFileWriter.InProgressFileRecoverable}.
      *
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index b5a12de..758296d 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -45,6 +45,7 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter
 
     @Override
     public void write(IN element, long currentTime) throws IOException {
+        ensureWriteType(Type.RECORD_WISE);
         writer.addElement(element);
         markWrite(currentTime);
     }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
similarity index 51%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
index c316254..23033aa 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
@@ -18,44 +18,35 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
 
 import java.io.IOException;
 
-/** The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file. */
-@Internal
-public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+/**
+ * The file sink compactors use the {@link CompactingFileWriter} to write a compacting file.
+ *
+ * <p>A class should not directly implement the {@link CompactingFileWriter}, but to implement the
+ * {@link RecordWiseCompactingFileWriter}, or the {@link OutputStreamBasedCompactingFileWriter}, or
+ * both. If an class implements both interfaces, once the write method of either interface is
+ * called, the write method in the other one should be disabled.
+ */
+@PublicEvolving
+public interface CompactingFileWriter {
 
     /**
-     * Write an element to the part file.
+     * Closes the writer and gets the {@link PendingFileRecoverable} of the written compacting file.
      *
-     * @param element the element to be written.
-     * @param currentTime the writing time.
-     * @throws IOException Thrown if writing the element fails.
-     */
-    void write(final IN element, final long currentTime) throws IOException;
-
-    /**
-     * @return The state of the current part file.
-     * @throws IOException Thrown if persisting the part file fails.
-     */
-    InProgressFileRecoverable persist() throws IOException;
-
-    /**
      * @return The state of the pending part file. {@link Bucket} uses this to commit the pending
      *     file.
      * @throws IOException Thrown if an I/O error occurs.
      */
     PendingFileRecoverable closeForCommit() throws IOException;
 
-    /** Dispose the part file. */
-    void dispose();
-
-    // ------------------------------------------------------------------------
-
-    /** A handle can be used to recover in-progress file.. */
-    interface InProgressFileRecoverable extends PendingFileRecoverable {}
-
-    /** The handle can be used to recover pending file. */
-    interface PendingFileRecoverable {}
+    /** Enum defining the types of {@link CompactingFileWriter}. */
+    @PublicEvolving
+    enum Type {
+        RECORD_WISE,
+        OUTPUT_STREAM
+    }
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
index c316254..f633023 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -19,12 +19,14 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.IOException;
 
 /** The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file. */
 @Internal
-public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+public interface InProgressFileWriter<IN, BucketID>
+        extends PartFileInfo<BucketID>, RecordWiseCompactingFileWriter<IN> {
 
     /**
      * Write an element to the part file.
@@ -51,11 +53,17 @@ public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketI
     /** Dispose the part file. */
     void dispose();
 
+    @Override
+    default void write(IN element) throws IOException {
+        write(element, System.currentTimeMillis());
+    }
+
     // ------------------------------------------------------------------------
 
     /** A handle can be used to recover in-progress file.. */
     interface InProgressFileRecoverable extends PendingFileRecoverable {}
 
     /** The handle can be used to recover pending file. */
+    @PublicEvolving
     interface PendingFileRecoverable {}
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
similarity index 50%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
index aeee8ee..82970595 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
@@ -19,33 +19,22 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
+import java.io.OutputStream;
 
 /**
- * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * The compactors use the {@link OutputStreamBasedCompactingFileWriter} to directly write a
+ * compacting file as an {@link OutputStream}.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
-
-    private final Encoder<IN> encoder;
-
-    RowWisePartWriter(
-            final BucketID bucketId,
-            final RecoverableFsDataOutputStream currentPartStream,
-            final Encoder<IN> encoder,
-            final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
-        this.encoder = Preconditions.checkNotNull(encoder);
-    }
-
-    @Override
-    public void write(final IN element, final long currentTime) throws IOException {
-        encoder.encode(element, currentPartStream);
-        markWrite(currentTime);
-    }
+public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter {
+    /**
+     * Gets the output stream underlying the writer. The close method of the returned stream should
+     * never be called.
+     *
+     * @return The output stream to write the compacting file.
+     * @throws IOException Thrown if acquiring the stream fails.
+     */
+    OutputStream asOutputStream() throws IOException;
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
index 666f82b..13ceae5 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -30,6 +30,8 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.util.IOUtils;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
 
 /**
  * The base class for all the part file writer that use {@link
@@ -39,10 +41,13 @@ import java.io.IOException;
  * @param <BucketID> the bucket type
  */
 public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
-        extends AbstractPartFileWriter<IN, BucketID> {
+        extends AbstractPartFileWriter<IN, BucketID>
+        implements OutputStreamBasedCompactingFileWriter {
 
     final RecoverableFsDataOutputStream currentPartStream;
 
+    private CompactingFileWriter.Type writeType = null;
+
     OutputStreamBasedPartFileWriter(
             final BucketID bucketID,
             final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
@@ -74,6 +79,27 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
         return currentPartStream.getPos();
     }
 
+    @Override
+    public OutputStream asOutputStream() throws IOException {
+        ensureWriteType(Type.OUTPUT_STREAM);
+        return currentPartStream;
+    }
+
+    protected void ensureWriteType(Type type) {
+        if (type != this.writeType) {
+            if (this.writeType == null) {
+                this.writeType = type;
+            } else {
+                throw new IllegalStateException(
+                        "Writer has already been opened as "
+                                + writeType
+                                + " type, but trying to reopen it as "
+                                + type
+                                + " type.");
+            }
+        }
+    }
+
     abstract static class OutputStreamBasedBucketWriter<IN, BucketID>
             implements BucketWriter<IN, BucketID> {
 
@@ -91,6 +117,14 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
         }
 
         @Override
+        public CompactingFileWriter openNewCompactingFile(
+                CompactingFileWriter.Type type, BucketID bucketID, Path path, long creationTime)
+                throws IOException {
+            // Both types are supported, overwrite to avoid UnsupportedOperationException.
+            return openNewInProgressFile(bucketID, path, creationTime);
+        }
+
+        @Override
         public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
                 final BucketID bucketID,
                 final InProgressFileRecoverable inProgressFileRecoverable,
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
similarity index 50%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
index aeee8ee..01b48ed 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
@@ -19,33 +19,20 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 
 /**
- * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * The compactors use the {@link RecordWiseCompactingFileWriter} to write elements to a compacting
+ * file.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
-
-    private final Encoder<IN> encoder;
-
-    RowWisePartWriter(
-            final BucketID bucketId,
-            final RecoverableFsDataOutputStream currentPartStream,
-            final Encoder<IN> encoder,
-            final long creationTime) {
-        super(bucketId, currentPartStream, creationTime);
-        this.encoder = Preconditions.checkNotNull(encoder);
-    }
-
-    @Override
-    public void write(final IN element, final long currentTime) throws IOException {
-        encoder.encode(element, currentPartStream);
-        markWrite(currentTime);
-    }
+public interface RecordWiseCompactingFileWriter<IN> extends CompactingFileWriter {
+    /**
+     * Write an element to the compacting file.
+     *
+     * @param element the element to be written.
+     * @throws IOException Thrown if writing the element fails.
+     */
+    void write(IN element) throws IOException;
 }
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index aeee8ee..f2b473c 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -27,14 +27,15 @@ import java.io.IOException;
 
 /**
  * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * implements the {@link PartFileInfo} and the {@link OutputStreamBasedCompactingFileWriter}.
  */
 @Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
+public final class RowWisePartWriter<IN, BucketID>
+        extends OutputStreamBasedPartFileWriter<IN, BucketID> {
 
     private final Encoder<IN> encoder;
 
-    RowWisePartWriter(
+    public RowWisePartWriter(
             final BucketID bucketId,
             final RecoverableFsDataOutputStream currentPartStream,
             final Encoder<IN> encoder,
@@ -45,6 +46,7 @@ final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWri
 
     @Override
     public void write(final IN element, final long currentTime) throws IOException {
+        ensureWriteType(Type.RECORD_WISE);
         encoder.encode(element, currentPartStream);
         markWrite(currentTime);
     }