You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2019/11/15 12:45:56 UTC
[flink] branch master updated: [FLINK-13850] refactor part file
configurations into a single method
This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 22b2a88 [FLINK-13850] refactor part file configurations into a single method
22b2a88 is described below
commit 22b2a8856307c310b4c75b32eeed33ba66c0206e
Author: Joao Boto <bo...@boto.pro>
AuthorDate: Thu Nov 14 20:14:02 2019 +0100
[FLINK-13850] refactor part file configurations into a single method
Closes #9533
---
docs/dev/connectors/streamfile_sink.md | 14 ++++
.../api/functions/sink/filesystem/Bucket.java | 24 +++---
.../functions/sink/filesystem/BucketFactory.java | 4 +-
.../api/functions/sink/filesystem/Buckets.java | 10 +--
.../sink/filesystem/DefaultBucketFactoryImpl.java | 8 +-
.../sink/filesystem/OutputFileConfig.java | 98 ++++++++++++++++++++++
.../functions/sink/filesystem/PartFileConfig.java | 53 ------------
.../sink/filesystem/StreamingFileSink.java | 58 +++++--------
.../sink/filesystem/BucketAssignerITCases.java | 2 +-
.../api/functions/sink/filesystem/BucketTest.java | 32 +++++--
.../api/functions/sink/filesystem/BucketsTest.java | 6 +-
.../functions/sink/filesystem/BulkWriterTest.java | 3 +-
.../sink/filesystem/RollingPolicyTest.java | 2 +-
.../api/functions/sink/filesystem/TestUtils.java | 9 +-
14 files changed, 192 insertions(+), 131 deletions(-)
diff --git a/docs/dev/connectors/streamfile_sink.md b/docs/dev/connectors/streamfile_sink.md
index c59e646..378a0f4 100644
--- a/docs/dev/connectors/streamfile_sink.md
+++ b/docs/dev/connectors/streamfile_sink.md
@@ -133,6 +133,20 @@ New buckets are created as dictated by the bucketing policy, and this doesn't af
Old buckets can still receive new records as the bucketing policy is evaluated on a per-record basis.
+### Part file configuration
+
+The filenames of the part files could be defined using `OutputFileConfig`, this configuration contain a part prefix and part suffix,
+that will be used with the parallel subtask index of the sink and a rolling counter.
+For example for a prefix "prefix" and a suffix ".ext" the file create:
+
+```
+└── 2019-08-25--12
+ ├── prefix-0-0.ext
+ ├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
+ ├── prefix-1-0.ext
+ └── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
+```
+
## File Formats
The `StreamingFileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http://parquet.apache.org).
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 4a996e7..7efd2ba 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -70,7 +70,7 @@ public class Bucket<IN, BucketID> {
private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
- private final PartFileConfig partFileConfig;
+ private final OutputFileConfig outputFileConfig;
private long partCounter;
@@ -90,7 +90,7 @@ public class Bucket<IN, BucketID> {
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
- final PartFileConfig partFileConfig) {
+ final OutputFileConfig outputFileConfig) {
this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = checkNotNull(bucketId);
@@ -103,7 +103,7 @@ public class Bucket<IN, BucketID> {
this.pendingPartsPerCheckpoint = new TreeMap<>();
this.resumablesPerCheckpoint = new TreeMap<>();
- this.partFileConfig = checkNotNull(partFileConfig);
+ this.outputFileConfig = checkNotNull(outputFileConfig);
}
/**
@@ -116,7 +116,7 @@ public class Bucket<IN, BucketID> {
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
- final PartFileConfig partFileConfig) throws IOException {
+ final OutputFileConfig outputFileConfig) throws IOException {
this(
fsWriter,
@@ -126,7 +126,7 @@ public class Bucket<IN, BucketID> {
initialPartCounter,
partFileFactory,
rollingPolicy,
- partFileConfig);
+ outputFileConfig);
restoreInProgressFile(bucketState);
commitRecoveredPendingFiles(bucketState);
@@ -230,7 +230,7 @@ public class Bucket<IN, BucketID> {
}
private Path assembleNewPartPath() {
- return new Path(bucketPath, partFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + partFileConfig.getPartSuffix());
+ return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
}
private CommitRecoverable closePartFile() throws IOException {
@@ -369,7 +369,7 @@ public class Bucket<IN, BucketID> {
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
- * @param partFileConfig the part file configuration.
+ * @param outputFileConfig the part file configuration.
* @return The new Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> getNew(
@@ -380,8 +380,8 @@ public class Bucket<IN, BucketID> {
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
- final PartFileConfig partFileConfig) {
- return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, partFileConfig);
+ final OutputFileConfig outputFileConfig) {
+ return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
}
/**
@@ -393,7 +393,7 @@ public class Bucket<IN, BucketID> {
* @param bucketState the initial state of the restored bucket.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
- * @param partFileConfig the part file configuration.
+ * @param outputFileConfig the part file configuration.
* @return The restored Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> restore(
@@ -403,7 +403,7 @@ public class Bucket<IN, BucketID> {
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
- final PartFileConfig partFileConfig) throws IOException {
- return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, partFileConfig);
+ final OutputFileConfig outputFileConfig) throws IOException {
+ return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index c318ef2..260e82c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -39,7 +39,7 @@ interface BucketFactory<IN, BucketID> extends Serializable {
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
- final PartFileConfig partFileConfig) throws IOException;
+ final OutputFileConfig outputFileConfig) throws IOException;
Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
@@ -48,5 +48,5 @@ interface BucketFactory<IN, BucketID> extends Serializable {
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
- final PartFileConfig partFileConfig) throws IOException;
+ final OutputFileConfig outputFileConfig) throws IOException;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index fa7a72b..eb61d18 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -77,7 +77,7 @@ public class Buckets<IN, BucketID> {
private final RecoverableWriter fsWriter;
- private final PartFileConfig partFileConfig;
+ private final OutputFileConfig outputFileConfig;
// --------------------------- State Related Fields -----------------------------
@@ -99,7 +99,7 @@ public class Buckets<IN, BucketID> {
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final int subtaskIndex,
- final PartFileConfig partFileConfig) throws IOException {
+ final OutputFileConfig outputFileConfig) throws IOException {
this.basePath = Preconditions.checkNotNull(basePath);
this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
@@ -108,7 +108,7 @@ public class Buckets<IN, BucketID> {
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
this.subtaskIndex = subtaskIndex;
- this.partFileConfig = Preconditions.checkNotNull(partFileConfig);
+ this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
@@ -186,7 +186,7 @@ public class Buckets<IN, BucketID> {
partFileWriterFactory,
rollingPolicy,
recoveredState,
- partFileConfig
+ outputFileConfig
);
updateActiveBucketId(bucketId, restoredBucket);
@@ -293,7 +293,7 @@ public class Buckets<IN, BucketID> {
maxPartCounter,
partFileWriterFactory,
rollingPolicy,
- partFileConfig);
+ outputFileConfig);
activeBuckets.put(bucketId, bucket);
}
return bucket;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index dac2a5a..529b93a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -41,7 +41,7 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
- final PartFileConfig partFileConfig) {
+ final OutputFileConfig outputFileConfig) {
return Bucket.getNew(
fsWriter,
@@ -51,7 +51,7 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
initialPartCounter,
partFileWriterFactory,
rollingPolicy,
- partFileConfig);
+ outputFileConfig);
}
@Override
@@ -62,7 +62,7 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
- final PartFileConfig partFileConfig) throws IOException {
+ final OutputFileConfig outputFileConfig) throws IOException {
return Bucket.restore(
fsWriter,
@@ -71,6 +71,6 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
partFileWriterFactory,
rollingPolicy,
bucketState,
- partFileConfig);
+ outputFileConfig);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
new file mode 100644
index 0000000..98b84d1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputFileConfig.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * Part file name configuration.
+ * This allow to define a prefix and a suffix to the part file name.
+ */
+public class OutputFileConfig implements Serializable {
+
+ private final String partPrefix;
+
+ private final String partSuffix;
+
+ /**
+ * Initiates the {@code PartFileConfig} with values passed as parameters.
+ *
+ * @param partPrefix - the beginning of part file name
+ * @param partSuffix - the ending of part file name
+ */
+ public OutputFileConfig(final String partPrefix, final String partSuffix) {
+ this.partPrefix = Preconditions.checkNotNull(partPrefix);
+ this.partSuffix = Preconditions.checkNotNull(partSuffix);
+ }
+
+ /**
+ * The prefix for the part name.
+ */
+ String getPartPrefix() {
+ return partPrefix;
+ }
+
+ /**
+ * The suffix for the part name.
+ */
+ String getPartSuffix() {
+ return partSuffix;
+ }
+
+ public static OutputFileConfigBuilder builder() {
+ return new OutputFileConfigBuilder();
+ }
+
+ /**
+ * A builder to create the part file configuration.
+ */
+ @PublicEvolving
+ public static class OutputFileConfigBuilder {
+
+ private static final String DEFAULT_PART_PREFIX = "part";
+
+ private static final String DEFAULT_PART_SUFFIX = "";
+
+ private String partPrefix;
+
+ private String partSuffix;
+
+ private OutputFileConfigBuilder() {
+ this.partPrefix = DEFAULT_PART_PREFIX;
+ this.partSuffix = DEFAULT_PART_SUFFIX;
+ }
+
+ public OutputFileConfigBuilder withPartPrefix(String prefix) {
+ this.partPrefix = prefix;
+ return this;
+ }
+
+ public OutputFileConfigBuilder withPartSuffix(String suffix) {
+ this.partSuffix = suffix;
+ return this;
+ }
+
+ public OutputFileConfig build() {
+ return new OutputFileConfig(partPrefix, partSuffix);
+ }
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileConfig.java
deleted file mode 100644
index f3eaa59..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.util.Preconditions;
-
-/**
- * Part file name configuration.
- * This allow to define a prefix and a suffix to the part file name.
- */
-class PartFileConfig {
-
- public static final String DEFAULT_PART_PREFIX = "part";
-
- public static final String DEFAULT_PART_SUFFIX = "";
-
- private final String partPrefix;
-
- private final String partSuffix;
-
- PartFileConfig() {
- this(DEFAULT_PART_PREFIX, DEFAULT_PART_SUFFIX);
- }
-
- PartFileConfig(final String partPrefix, final String partSuffix) {
- this.partPrefix = Preconditions.checkNotNull(partPrefix);
- this.partSuffix = Preconditions.checkNotNull(partSuffix);
- }
-
- String getPartPrefix() {
- return partPrefix;
- }
-
- String getPartSuffix() {
- return partSuffix;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index ed7e926..cd4afc2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -67,9 +67,11 @@ import java.io.Serializable;
* {@link StreamingFileSink#forBulkFormat(Path, BulkWriter.Factory)}.
*
*
- * <p>The filenames of the part files contain the part prefix, "part-", the parallel subtask index of the sink
- * and a rolling counter. For example the file {@code "part-1-17"} contains the data from
- * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask.
+ * <p>The filenames of the part files could be defined using {@link OutputFileConfig}, this configuration contain
+ * a part prefix and part suffix, that will be used with the parallel subtask index of the sink
+ * and a rolling counter. For example for a prefix "prefix" and a suffix ".ext" the file create will have a name
+ * {@code "prefix-1-17.ext"} containing the data from {@code subtask 1} of the sink and is the {@code 17th} bucket
+ * created by that subtask.
* Part files roll based on the user-specified {@link RollingPolicy}. By default, a {@link DefaultRollingPolicy}
* is used.
*
@@ -217,12 +219,10 @@ public class StreamingFileSink<IN>
private BucketFactory<IN, BucketID> bucketFactory;
- private String partFilePrefix;
-
- private String partFileSuffix;
+ private OutputFileConfig outputFileConfig;
protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
- this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
+ this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
}
protected RowFormatBuilder(
@@ -232,16 +232,14 @@ public class StreamingFileSink<IN>
RollingPolicy<IN, BucketID> policy,
long bucketCheckInterval,
BucketFactory<IN, BucketID> bucketFactory,
- String partFilePrefix,
- String partFileSuffix) {
+ OutputFileConfig outputFileConfig) {
this.basePath = Preconditions.checkNotNull(basePath);
this.encoder = Preconditions.checkNotNull(encoder);
this.bucketAssigner = Preconditions.checkNotNull(assigner);
this.rollingPolicy = Preconditions.checkNotNull(policy);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
- this.partFilePrefix = Preconditions.checkNotNull(partFilePrefix);
- this.partFileSuffix = Preconditions.checkNotNull(partFileSuffix);
+ this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
}
public long getBucketCheckInterval() {
@@ -263,19 +261,14 @@ public class StreamingFileSink<IN>
return self();
}
- public T withPartFilePrefix(final String partPrefix) {
- this.partFilePrefix = partPrefix;
- return self();
- }
-
- public T withPartFileSuffix(final String partSuffix) {
- this.partFileSuffix = partSuffix;
+ public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+ this.outputFileConfig = outputFileConfig;
return self();
}
public <ID> StreamingFileSink.RowFormatBuilder<IN, ID, ? extends RowFormatBuilder<IN, ID, ?>> withNewBucketAssignerAndPolicy(final BucketAssigner<IN, ID> assigner, final RollingPolicy<IN, ID> policy) {
Preconditions.checkState(bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssignerAndPolicy() cannot be called after specifying a customized bucket factory");
- return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), partFilePrefix, partFileSuffix);
+ return new RowFormatBuilder<>(basePath, encoder, Preconditions.checkNotNull(assigner), Preconditions.checkNotNull(policy), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
}
/** Creates the actual sink. */
@@ -298,7 +291,7 @@ public class StreamingFileSink<IN>
new RowWisePartWriter.Factory<>(encoder),
rollingPolicy,
subtaskIndex,
- new PartFileConfig(partFilePrefix, partFileSuffix));
+ outputFileConfig);
}
}
@@ -320,12 +313,10 @@ public class StreamingFileSink<IN>
private BucketFactory<IN, BucketID> bucketFactory;
- private String partFilePrefix;
-
- private String partFileSuffix;
+ private OutputFileConfig outputFileConfig;
protected BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, BucketID> assigner) {
- this(basePath, writerFactory, assigner, DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), PartFileConfig.DEFAULT_PART_PREFIX, PartFileConfig.DEFAULT_PART_SUFFIX);
+ this(basePath, writerFactory, assigner, DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
}
protected BulkFormatBuilder(
@@ -334,15 +325,13 @@ public class StreamingFileSink<IN>
BucketAssigner<IN, BucketID> assigner,
long bucketCheckInterval,
BucketFactory<IN, BucketID> bucketFactory,
- String partFilePrefix,
- String partFileSuffix) {
+ OutputFileConfig outputFileConfig) {
this.basePath = Preconditions.checkNotNull(basePath);
this.writerFactory = writerFactory;
this.bucketAssigner = Preconditions.checkNotNull(assigner);
this.bucketCheckInterval = bucketCheckInterval;
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
- this.partFilePrefix = Preconditions.checkNotNull(partFilePrefix);
- this.partFileSuffix = Preconditions.checkNotNull(partFileSuffix);
+ this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
}
public long getBucketCheckInterval() {
@@ -363,21 +352,16 @@ public class StreamingFileSink<IN>
T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
this.bucketFactory = Preconditions.checkNotNull(factory);
return self();
- }
-
- public T withPartFilePrefix(final String partPrefix) {
- this.partFilePrefix = partPrefix;
- return self();
}
- public T withPartFileSuffix(final String partSuffix) {
- this.partFileSuffix = partSuffix;
+ public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
+ this.outputFileConfig = outputFileConfig;
return self();
}
public <ID> StreamingFileSink.BulkFormatBuilder<IN, ID, ? extends BulkFormatBuilder<IN, ID, ?>> withNewBucketAssigner(final BucketAssigner<IN, ID> assigner) {
Preconditions.checkState(bucketFactory.getClass() == DefaultBucketFactoryImpl.class, "newBuilderWithBucketAssigner() cannot be called after specifying a customized bucket factory");
- return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(assigner), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), partFilePrefix, partFileSuffix);
+ return new BulkFormatBuilder<>(basePath, writerFactory, Preconditions.checkNotNull(assigner), bucketCheckInterval, new DefaultBucketFactoryImpl<>(), outputFileConfig);
}
/** Creates the actual sink. */
@@ -394,7 +378,7 @@ public class StreamingFileSink<IN>
new BulkPartWriter.Factory<>(writerFactory),
OnCheckpointRollingPolicy.build(),
subtaskIndex,
- new PartFileConfig(partFilePrefix, partFileSuffix));
+ outputFileConfig);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index cc91a33..02c1264 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -57,7 +57,7 @@ public class BucketAssignerITCases {
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicy,
0,
- new PartFileConfig()
+ PartFileConfig.builder().build()
);
Bucket<String, String> bucket =
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 583bacf..2bcb881 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -64,7 +64,7 @@ public class BucketTest {
final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
final Bucket<String, String> bucketUnderTest =
- createBucket(recoverableWriter, path, 0, 0, new PartFileConfig());
+ createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
bucketUnderTest.write("test-element", 0L);
@@ -82,7 +82,7 @@ public class BucketTest {
final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
final Bucket<String, String> bucketUnderTest =
- createBucket(recoverableWriter, path, 0, 0, new PartFileConfig());
+ createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
bucketUnderTest.write("test-element", 0L);
@@ -99,13 +99,35 @@ public class BucketTest {
}
@Test
+ public void shouldCleanupResumableAfterRestoring() throws Exception {
+ final File outDir = TEMP_FOLDER.newFolder();
+ final Path path = new Path(outDir.toURI());
+
+ final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+ final Bucket<String, String> bucketUnderTest =
+ createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
+
+ bucketUnderTest.write("test-element", 0L);
+
+ final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+ assertThat(state, hasActiveInProgressFile());
+
+ bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+ final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path);
+ restoreBucket(newRecoverableWriter, 0, 1, state, PartFileConfig.builder().build());
+
+ assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1
+ }
+
+ @Test
public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
final Path path = new Path(outDir.toURI());
final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
final Bucket<String, String> bucketUnderTest =
- createBucket(recoverableWriter, path, 0, 0, new PartFileConfig());
+ createBucket(recoverableWriter, path, 0, 0, PartFileConfig.builder().build());
final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
assertThat(state, hasNoActiveInProgressFile());
@@ -403,7 +425,7 @@ public class BucketTest {
private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
final BucketState<String> stateWithOnlyInProgressFile =
new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
- return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, new PartFileConfig());
+ return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, PartFileConfig.builder().build());
}
private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
@@ -412,7 +434,7 @@ public class BucketTest {
final BucketState<String> initStateWithOnlyInProgressFile =
new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
- return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, new PartFileConfig());
+ return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, PartFileConfig.builder().build());
}
private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index ac7071d..edbed9b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -318,7 +318,7 @@ public class BucketsTest {
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
DefaultRollingPolicy.builder().build(),
2,
- new PartFileConfig()
+ PartFileConfig.builder().build()
);
buckets.onElement(
@@ -377,7 +377,7 @@ public class BucketsTest {
basePath,
rollingPolicy,
subtaskIdx,
- new PartFileConfig());
+ PartFileConfig.builder().build());
}
private static Buckets<String, String> createBuckets(
@@ -408,7 +408,7 @@ public class BucketsTest {
subtaskIdx,
bucketState,
partCounterState,
- new PartFileConfig());
+ PartFileConfig.builder().build());
}
private static Buckets<String, String> restoreBuckets(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
index bab18aa..f416d8a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkWriterTest.java
@@ -101,8 +101,7 @@ public class BulkWriterTest extends TestLogger {
new TestUtils.TupleToStringBucketer(),
new TestBulkWriterFactory(),
new DefaultBucketFactoryImpl<>(),
- "prefix",
- ".ext")
+ PartFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".ext").build())
) {
testPartFilesWithStringBucketer(testHarness, outDir, ".prefix-0-0.ext.inprogress", ".prefix-0-1.ext.inprogress");
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index be9db3e..2fe1943 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -204,7 +204,7 @@ public class RollingPolicyTest {
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicyToTest,
0,
- new PartFileConfig()
+ PartFileConfig.builder().build()
);
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index c17a2d6..8b3ab28 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -140,8 +140,7 @@ public class TestUtils {
bucketer,
writer,
bucketFactory,
- PartFileConfig.DEFAULT_PART_PREFIX,
- PartFileConfig.DEFAULT_PART_SUFFIX);
+ PartFileConfig.builder().build());
}
static OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Object> createTestSinkWithBulkEncoder(
@@ -152,16 +151,14 @@ public class TestUtils {
final BucketAssigner<Tuple2<String, Integer>, String> bucketer,
final BulkWriter.Factory<Tuple2<String, Integer>> writer,
final BucketFactory<Tuple2<String, Integer>, String> bucketFactory,
- final String partFilePrefix,
- final String partFileSuffix) throws Exception {
+ final PartFileConfig partFileConfig) throws Exception {
StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), writer)
.withBucketAssigner(bucketer)
.withBucketCheckInterval(bucketCheckInterval)
.withBucketFactory(bucketFactory)
- .withPartFilePrefix(partFilePrefix)
- .withPartFileSuffix(partFileSuffix)
+ .withPartFileConfig(partFileConfig)
.build();
return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), MAX_PARALLELISM, totalParallelism, taskIdx);