You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2022/02/16 17:26:24 UTC
[flink] 02/06: [FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.
This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 51adf0fcbdc63520a58246100b5a585b9f563ba6
Author: Gen Luo <lu...@gmail.com>
AuthorDate: Tue Jan 25 18:23:27 2022 +0800
[FLINK-25583][connectors/filesystem] Introduce CompactingFileWriter, implement in implementations of InProgressFileWriter.
---
.../7602816f-5c01-4b7a-9e3e-235dfedec245 | 1 -
.../functions/sink/filesystem/BucketWriter.java | 26 ++++++++++++
.../functions/sink/filesystem/BulkPartWriter.java | 1 +
...ssFileWriter.java => CompactingFileWriter.java} | 47 +++++++++-------------
.../sink/filesystem/InProgressFileWriter.java | 10 ++++-
... => OutputStreamBasedCompactingFileWriter.java} | 35 ++++++----------
.../OutputStreamBasedPartFileWriter.java | 36 ++++++++++++++++-
...er.java => RecordWiseCompactingFileWriter.java} | 33 +++++----------
.../sink/filesystem/RowWisePartWriter.java | 8 ++--
9 files changed, 117 insertions(+), 80 deletions(-)
diff --git a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245 b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
index 4bfb283..f61d7d9 100644
--- a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
+++ b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245
@@ -225,7 +225,6 @@ org.apache.flink.streaming.api.functions.sink.filesystem.AbstractPartFileWriter
org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter$PendingFile does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$InProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
-org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter$PendingFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedInProgressFileRecoverable does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
index 88ad598..00c7890 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
@@ -40,6 +40,32 @@ public interface BucketWriter<IN, BucketID> {
final BucketID bucketID, final Path path, final long creationTime) throws IOException;
/**
+ * Used to create a new {@link CompactingFileWriter} of the requesting type. Requesting a writer
+ * of an unsupported type will result in UnsupportedOperationException. By default, only
+ * RECORD_WISE type is supported, for which a {@link InProgressFileWriter} will be created.
+ *
+ * @param type the type of this writer.
+ * @param bucketID the id of the bucket this writer is writing to.
+ * @param path the path this writer will write to.
+ * @param creationTime the creation time of the file.
+ * @return the new {@link InProgressFileWriter}
+ * @throws IOException Thrown if creating a writer fails.
+ * @throws UnsupportedOperationException Thrown if the bucket writer doesn't support the
+ * requesting type.
+ */
+ default CompactingFileWriter openNewCompactingFile(
+ final CompactingFileWriter.Type type,
+ final BucketID bucketID,
+ final Path path,
+ final long creationTime)
+ throws IOException {
+ if (type == CompactingFileWriter.Type.RECORD_WISE) {
+ return openNewInProgressFile(bucketID, path, creationTime);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Used to resume a {@link InProgressFileWriter} from a {@link
* InProgressFileWriter.InProgressFileRecoverable}.
*
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index b5a12de..758296d 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -45,6 +45,7 @@ final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter
@Override
public void write(IN element, long currentTime) throws IOException {
+ ensureWriteType(Type.RECORD_WISE);
writer.addElement(element);
markWrite(currentTime);
}
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
similarity index 51%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
index c316254..23033aa 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.java
@@ -18,44 +18,35 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
import java.io.IOException;
-/** The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file. */
-@Internal
-public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+/**
+ * The file sink compactors use the {@link CompactingFileWriter} to write a compacting file.
+ *
+ * <p>A class should not directly implement the {@link CompactingFileWriter}, but to implement the
+ * {@link RecordWiseCompactingFileWriter}, or the {@link OutputStreamBasedCompactingFileWriter}, or
+ * both. If an class implements both interfaces, once the write method of either interface is
+ * called, the write method in the other one should be disabled.
+ */
+@PublicEvolving
+public interface CompactingFileWriter {
/**
- * Write an element to the part file.
+ * Closes the writer and gets the {@link PendingFileRecoverable} of the written compacting file.
*
- * @param element the element to be written.
- * @param currentTime the writing time.
- * @throws IOException Thrown if writing the element fails.
- */
- void write(final IN element, final long currentTime) throws IOException;
-
- /**
- * @return The state of the current part file.
- * @throws IOException Thrown if persisting the part file fails.
- */
- InProgressFileRecoverable persist() throws IOException;
-
- /**
* @return The state of the pending part file. {@link Bucket} uses this to commit the pending
* file.
* @throws IOException Thrown if an I/O error occurs.
*/
PendingFileRecoverable closeForCommit() throws IOException;
- /** Dispose the part file. */
- void dispose();
-
- // ------------------------------------------------------------------------
-
- /** A handle can be used to recover in-progress file.. */
- interface InProgressFileRecoverable extends PendingFileRecoverable {}
-
- /** The handle can be used to recover pending file. */
- interface PendingFileRecoverable {}
+ /** Enum defining the types of {@link CompactingFileWriter}. */
+ @PublicEvolving
+ enum Type {
+ RECORD_WISE,
+ OUTPUT_STREAM
+ }
}
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
index c316254..f633023 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -19,12 +19,14 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
import java.io.IOException;
/** The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file. */
@Internal
-public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+public interface InProgressFileWriter<IN, BucketID>
+ extends PartFileInfo<BucketID>, RecordWiseCompactingFileWriter<IN> {
/**
* Write an element to the part file.
@@ -51,11 +53,17 @@ public interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketI
/** Dispose the part file. */
void dispose();
+ @Override
+ default void write(IN element) throws IOException {
+ write(element, System.currentTimeMillis());
+ }
+
// ------------------------------------------------------------------------
/** A handle can be used to recover in-progress file.. */
interface InProgressFileRecoverable extends PendingFileRecoverable {}
/** The handle can be used to recover pending file. */
+ @PublicEvolving
interface PendingFileRecoverable {}
}
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
similarity index 50%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
index aeee8ee..82970595 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
@@ -19,33 +19,22 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.util.Preconditions;
import java.io.IOException;
+import java.io.OutputStream;
/**
- * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * The compactors use the {@link OutputStreamBasedCompactingFileWriter} to directly write a
+ * compacting file as an {@link OutputStream}.
*/
@Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
-
- private final Encoder<IN> encoder;
-
- RowWisePartWriter(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream currentPartStream,
- final Encoder<IN> encoder,
- final long creationTime) {
- super(bucketId, currentPartStream, creationTime);
- this.encoder = Preconditions.checkNotNull(encoder);
- }
-
- @Override
- public void write(final IN element, final long currentTime) throws IOException {
- encoder.encode(element, currentPartStream);
- markWrite(currentTime);
- }
+public interface OutputStreamBasedCompactingFileWriter extends CompactingFileWriter {
+ /**
+ * Gets the output stream underlying the writer. The close method of the returned stream should
+ * never be called.
+ *
+ * @return The output stream to write the compacting file.
+ * @throws IOException Thrown if acquiring the stream fails.
+ */
+ OutputStream asOutputStream() throws IOException;
}
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
index 666f82b..13ceae5 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -30,6 +30,8 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.IOUtils;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
/**
* The base class for all the part file writer that use {@link
@@ -39,10 +41,13 @@ import java.io.IOException;
* @param <BucketID> the bucket type
*/
public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
- extends AbstractPartFileWriter<IN, BucketID> {
+ extends AbstractPartFileWriter<IN, BucketID>
+ implements OutputStreamBasedCompactingFileWriter {
final RecoverableFsDataOutputStream currentPartStream;
+ private CompactingFileWriter.Type writeType = null;
+
OutputStreamBasedPartFileWriter(
final BucketID bucketID,
final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
@@ -74,6 +79,27 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
return currentPartStream.getPos();
}
+ @Override
+ public OutputStream asOutputStream() throws IOException {
+ ensureWriteType(Type.OUTPUT_STREAM);
+ return currentPartStream;
+ }
+
+ protected void ensureWriteType(Type type) {
+ if (type != this.writeType) {
+ if (this.writeType == null) {
+ this.writeType = type;
+ } else {
+ throw new IllegalStateException(
+ "Writer has already been opened as "
+ + writeType
+ + " type, but trying to reopen it as "
+ + type
+ + " type.");
+ }
+ }
+ }
+
abstract static class OutputStreamBasedBucketWriter<IN, BucketID>
implements BucketWriter<IN, BucketID> {
@@ -91,6 +117,14 @@ public abstract class OutputStreamBasedPartFileWriter<IN, BucketID>
}
@Override
+ public CompactingFileWriter openNewCompactingFile(
+ CompactingFileWriter.Type type, BucketID bucketID, Path path, long creationTime)
+ throws IOException {
+ // Both types are supported, overwrite to avoid UnsupportedOperationException.
+ return openNewInProgressFile(bucketID, path, creationTime);
+ }
+
+ @Override
public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
final BucketID bucketID,
final InProgressFileRecoverable inProgressFileRecoverable,
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
similarity index 50%
copy from flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
copy to flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
index aeee8ee..01b48ed 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RecordWiseCompactingFileWriter.java
@@ -19,33 +19,20 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
- * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * The compactors use the {@link RecordWiseCompactingFileWriter} to write elements to a compacting
+ * file.
*/
@Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
-
- private final Encoder<IN> encoder;
-
- RowWisePartWriter(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream currentPartStream,
- final Encoder<IN> encoder,
- final long creationTime) {
- super(bucketId, currentPartStream, creationTime);
- this.encoder = Preconditions.checkNotNull(encoder);
- }
-
- @Override
- public void write(final IN element, final long currentTime) throws IOException {
- encoder.encode(element, currentPartStream);
- markWrite(currentTime);
- }
+public interface RecordWiseCompactingFileWriter<IN> extends CompactingFileWriter {
+ /**
+ * Write an element to the compacting file.
+ *
+ * @param element the element to be written.
+ * @throws IOException Thrown if writing the element fails.
+ */
+ void write(IN element) throws IOException;
}
diff --git a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index aeee8ee..f2b473c 100644
--- a/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -27,14 +27,15 @@ import java.io.IOException;
/**
* A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}. This also
- * implements the {@link PartFileInfo}.
+ * implements the {@link PartFileInfo} and the {@link OutputStreamBasedCompactingFileWriter}.
*/
@Internal
-final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
+public final class RowWisePartWriter<IN, BucketID>
+ extends OutputStreamBasedPartFileWriter<IN, BucketID> {
private final Encoder<IN> encoder;
- RowWisePartWriter(
+ public RowWisePartWriter(
final BucketID bucketId,
final RecoverableFsDataOutputStream currentPartStream,
final Encoder<IN> encoder,
@@ -45,6 +46,7 @@ final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWri
@Override
public void write(final IN element, final long currentTime) throws IOException {
+ ensureWriteType(Type.RECORD_WISE);
encoder.encode(element, currentPartStream);
markWrite(currentTime);
}