You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2020/05/18 14:12:37 UTC
[flink] 03/04: [FLINK-17593][Connectors/FileSystem] Support
arbitrary recovery mechanism for PartFileWriter
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 220f7dabf24fc1e6fe77b83a00a7de47d75d5b18
Author: GuoWei Ma <gu...@gmail.com>
AuthorDate: Wed May 13 21:15:03 2020 +0800
[FLINK-17593][Connectors/FileSystem] Support arbitrary recovery mechanism for PartFileWriter
This change includes two things:
1. Make the PartFileWriter generic and decouple the PartFileWriter and
RecoverableStream. According to different pre-commit / commit methods,
this change allows us to extend different types of PartFileWriter.
2. Make the Bucket/Buckets depends on the PartFileFactory instead of
RecoverableWriter.
---
.../apache/flink/core/fs/RecoverableWriter.java | 6 +-
.../core/fs/local/LocalRecoverableWriter.java | 2 +-
.../runtime/fs/hdfs/HadoopRecoverableWriter.java | 2 +-
.../sink/filesystem/AbstractPartFileWriter.java | 58 ++++
.../api/functions/sink/filesystem/Bucket.java | 166 +++++-------
.../functions/sink/filesystem/BucketFactory.java | 7 +-
.../api/functions/sink/filesystem/BucketState.java | 32 +--
.../sink/filesystem/BucketStateSerializer.java | 146 ++++++----
.../functions/sink/filesystem/BucketWriter.java | 109 ++++++++
.../api/functions/sink/filesystem/Buckets.java | 37 +--
.../sink/filesystem/BulkBucketWriter.java | 72 +++++
.../functions/sink/filesystem/BulkPartWriter.java | 56 +---
.../sink/filesystem/DefaultBucketFactoryImpl.java | 13 +-
.../sink/filesystem/InProgressFileWriter.java | 70 +++++
.../OutputStreamBasedPartFileWriter.java | 296 +++++++++++++++++++++
.../functions/sink/filesystem/PartFileWriter.java | 141 ----------
.../sink/filesystem/RowWiseBucketWriter.java | 68 +++++
.../sink/filesystem/RowWisePartWriter.java | 50 +---
.../sink/filesystem/StreamingFileSink.java | 4 +-
.../sink/filesystem/WriterProperties.java | 67 +++++
.../sink/filesystem/BucketAssignerITCases.java | 3 +-
.../sink/filesystem/BucketStateSerializerTest.java | 35 +--
.../api/functions/sink/filesystem/BucketTest.java | 58 ++--
.../api/functions/sink/filesystem/BucketsTest.java | 19 +-
.../sink/filesystem/RollingPolicyTest.java | 3 +-
.../api/functions/sink/filesystem/TestUtils.java | 7 +-
.../filesystem/utils/NoOpRecoverableWriter.java | 2 +-
27 files changed, 1033 insertions(+), 496 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
index 7d54b11..b92da88 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -138,10 +138,8 @@ public interface RecoverableWriter {
* recover from a (potential) failure. These can be temporary files that were written
* to the filesystem or objects that were uploaded to S3.
*
- * <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already
- * been cleaned up and the resources have been freed. But the contract is that it will throw
- * an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter}
- * whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}.
+ * <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
+ * happen for any reason.
*
* @param resumable The {@link ResumeRecoverable} whose state we want to clean-up.
* @return {@code true} if the resources were successfully freed, {@code false} otherwise
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
index bae7314..2a97b85 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -77,7 +77,7 @@ public class LocalRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
- throw new UnsupportedOperationException();
+ return false;
}
@Override
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index d325f2c..91d76c6 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -95,7 +95,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
- throw new UnsupportedOperationException();
+ return false;
}
@Override
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java
new file mode 100644
index 0000000..0350a8f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/AbstractPartFileWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+/**
+ * An abstract writer for the currently open part file in a specific {@link Bucket}.
+ * @param <IN> the element type.
+ * @param <BucketID> the bucket id type.
+ */
+public abstract class AbstractPartFileWriter<IN, BucketID> implements InProgressFileWriter<IN, BucketID> {
+
+ private final BucketID bucketID;
+
+ private final long creationTime;
+
+ private long lastUpdateTime;
+
+ public AbstractPartFileWriter(final BucketID bucketID, final long createTime) {
+ this.bucketID = bucketID;
+ this.creationTime = createTime;
+ this.lastUpdateTime = createTime;
+ }
+
+ @Override
+ public BucketID getBucketId() {
+ return bucketID;
+ }
+
+ @Override
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ @Override
+ public long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ void markWrite(long now) {
+ this.lastUpdateTime = now;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index e7abd37..5e9a72b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,10 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
-import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,48 +56,44 @@ public class Bucket<IN, BucketID> {
private final int subtaskIndex;
- private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
-
- private final RecoverableWriter fsWriter;
+ private final BucketWriter<IN, BucketID> bucketWriter;
private final RollingPolicy<IN, BucketID> rollingPolicy;
- private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint;
+ private final NavigableMap<Long, InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverablesPerCheckpoint;
- private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
+ private final NavigableMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
private final OutputFileConfig outputFileConfig;
private long partCounter;
@Nullable
- private PartFileWriter<IN, BucketID> inProgressPart;
+ private InProgressFileWriter<IN, BucketID> inProgressPart;
- private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
+ private List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverablesForCurrentCheckpoint;
/**
* Constructor to create a new empty bucket.
*/
private Bucket(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
- this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = checkNotNull(bucketId);
this.bucketPath = checkNotNull(bucketPath);
this.partCounter = initialPartCounter;
- this.partFileFactory = checkNotNull(partFileFactory);
+ this.bucketWriter = checkNotNull(bucketWriter);
this.rollingPolicy = checkNotNull(rollingPolicy);
- this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
- this.pendingPartsPerCheckpoint = new TreeMap<>();
- this.resumablesPerCheckpoint = new TreeMap<>();
+ this.pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
+ this.pendingFileRecoverablesPerCheckpoint = new TreeMap<>();
+ this.inProgressFileRecoverablesPerCheckpoint = new TreeMap<>();
this.outputFileConfig = checkNotNull(outputFileConfig);
}
@@ -110,16 +102,14 @@ public class Bucket<IN, BucketID> {
* Constructor to restore a bucket from checkpointed state.
*/
private Bucket(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+ final BucketWriter<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
this(
- fsWriter,
subtaskIndex,
bucketState.getBucketId(),
bucketState.getBucketPath(),
@@ -133,31 +123,29 @@ public class Bucket<IN, BucketID> {
}
private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
- if (!state.hasInProgressResumableFile()) {
+ if (!state.hasInProgressFileRecoverable()) {
return;
}
// we try to resume the previous in-progress file
- final ResumeRecoverable resumable = state.getInProgressResumableFile();
+ final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
- if (fsWriter.supportsResume()) {
- final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
- inProgressPart = partFileFactory.resumeFrom(
- bucketId, stream, resumable, state.getInProgressFileCreationTime());
+ if (bucketWriter.getProperties().supportsResume()) {
+ inProgressPart = bucketWriter.resumeInProgressFileFrom(
+ bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
} else {
// if the writer does not support resume, then we close the
// in-progress part and commit it, as done in the case of pending files.
-
- fsWriter.recoverForCommit(resumable).commitAfterRecovery();
+ bucketWriter.recoverPendingFile(inProgressFileRecoverable).commitAfterRecovery();
}
}
private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
- for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
- for (CommitRecoverable committable: committables) {
- fsWriter.recoverForCommit(committable).commitAfterRecovery();
+ for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables: state.getPendingFileRecoverablesPerCheckpoint().values()) {
+ for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable: pendingFileRecoverables) {
+ bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
}
}
}
@@ -175,7 +163,7 @@ public class Bucket<IN, BucketID> {
}
boolean isActive() {
- return inProgressPart != null || !pendingPartsForCurrentCheckpoint.isEmpty() || !pendingPartsPerCheckpoint.isEmpty();
+ return inProgressPart != null || !pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || !pendingFileRecoverablesPerCheckpoint.isEmpty();
}
void merge(final Bucket<IN, BucketID> bucket) throws IOException {
@@ -184,16 +172,16 @@ public class Bucket<IN, BucketID> {
// There should be no pending files in the "to-merge" states.
// The reason is that:
- // 1) the pendingPartsForCurrentCheckpoint is emptied whenever we take a snapshot (see prepareBucketForCheckpointing()).
- // So a snapshot, including the one we are recovering from, will never contain such files.
- // 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
+ // 1) the pendingFileRecoverablesForCurrentCheckpoint is emptied whenever we take a Recoverable (see prepareBucketForCheckpointing()).
+ // So a Recoverable, including the one we are recovering from, will never contain such files.
+ // 2) the files in pendingFileRecoverablesPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
- checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
- checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+ checkState(bucket.pendingFileRecoverablesForCurrentCheckpoint.isEmpty());
+ checkState(bucket.pendingFileRecoverablesPerCheckpoint.isEmpty());
- CommitRecoverable committable = bucket.closePartFile();
- if (committable != null) {
- pendingPartsForCurrentCheckpoint.add(committable);
+ InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = bucket.closePartFile();
+ if (pendingFileRecoverable != null) {
+ pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
}
if (LOG.isDebugEnabled()) {
@@ -218,8 +206,7 @@ public class Bucket<IN, BucketID> {
closePartFile();
final Path partFilePath = assembleNewPartPath();
- final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
- inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
+ inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);
if (LOG.isDebugEnabled()) {
LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
@@ -233,14 +220,14 @@ public class Bucket<IN, BucketID> {
return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
}
- private CommitRecoverable closePartFile() throws IOException {
- CommitRecoverable committable = null;
+ private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
+ InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
if (inProgressPart != null) {
- committable = inProgressPart.closeForCommit();
- pendingPartsForCurrentCheckpoint.add(committable);
+ pendingFileRecoverable = inProgressPart.closeForCommit();
+ pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
inProgressPart = null;
}
- return committable;
+ return pendingFileRecoverable;
}
void disposePartFile() {
@@ -252,24 +239,16 @@ public class Bucket<IN, BucketID> {
BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
prepareBucketForCheckpointing(checkpointId);
- ResumeRecoverable inProgressResumable = null;
+ InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
long inProgressFileCreationTime = Long.MAX_VALUE;
if (inProgressPart != null) {
- inProgressResumable = inProgressPart.persist();
+ inProgressFileRecoverable = inProgressPart.persist();
inProgressFileCreationTime = inProgressPart.getCreationTime();
-
- // the following is an optimization so that writers that do not
- // require cleanup, they do not have to keep track of resumables
- // and later iterate over the active buckets.
- // (see onSuccessfulCompletionOfCheckpoint())
-
- if (fsWriter.requiresCleanupOfRecoverableState()) {
- this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable);
- }
+ this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
}
- return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
+ return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);
}
private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
@@ -280,49 +259,46 @@ public class Bucket<IN, BucketID> {
closePartFile();
}
- if (!pendingPartsForCurrentCheckpoint.isEmpty()) {
- pendingPartsPerCheckpoint.put(checkpointId, pendingPartsForCurrentCheckpoint);
- pendingPartsForCurrentCheckpoint = new ArrayList<>();
+ if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
+ pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
+ pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
}
}
void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
- checkNotNull(fsWriter);
+ checkNotNull(bucketWriter);
- Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
- pendingPartsPerCheckpoint.headMap(checkpointId, true)
+ Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
+ pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
.entrySet().iterator();
while (it.hasNext()) {
- Map.Entry<Long, List<CommitRecoverable>> entry = it.next();
+ Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
- for (CommitRecoverable committable : entry.getValue()) {
- fsWriter.recoverForCommit(committable).commit();
+ for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
+ bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
}
it.remove();
}
- cleanupOutdatedResumables(checkpointId);
+ cleanupInProgressFileRecoverables(checkpointId);
}
- private void cleanupOutdatedResumables(long checkpointId) throws IOException {
- Iterator<Map.Entry<Long, ResumeRecoverable>> it =
- resumablesPerCheckpoint.headMap(checkpointId, false)
+ private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
+ Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
+ inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
.entrySet().iterator();
while (it.hasNext()) {
- final ResumeRecoverable recoverable = it.next().getValue();
+ final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();
- // this check is redundant, as we only put entries in the resumablesPerCheckpoint map
- // list when the requiresCleanupOfRecoverableState() returns true, but having it makes
+ // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
+ // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
// the code more readable.
- if (fsWriter.requiresCleanupOfRecoverableState()) {
- final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
-
- if (LOG.isDebugEnabled() && successfullyDeleted) {
- LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
- }
+ final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
+ if (LOG.isDebugEnabled() && successfullyDeleted) {
+ LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
}
it.remove();
}
@@ -342,54 +318,51 @@ public class Bucket<IN, BucketID> {
// --------------------------- Testing Methods -----------------------------
@VisibleForTesting
- Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
- return pendingPartsPerCheckpoint;
+ Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
+ return pendingFileRecoverablesPerCheckpoint;
}
@Nullable
@VisibleForTesting
- PartFileWriter<IN, BucketID> getInProgressPart() {
+ InProgressFileWriter<IN, BucketID> getInProgressPart() {
return inProgressPart;
}
@VisibleForTesting
- List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
- return pendingPartsForCurrentCheckpoint;
+ List<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverablesForCurrentCheckpoint() {
+ return pendingFileRecoverablesForCurrentCheckpoint;
}
// --------------------------- Static Factory Methods -----------------------------
/**
* Creates a new empty {@code Bucket}.
- * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param bucketId the identifier of the bucket, as returned by the {@link BucketAssigner}.
* @param bucketPath the path to where the part files for the bucket will be written to.
* @param initialPartCounter the initial counter for the part files of the bucket.
- * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+ * @param bucketWriter the {@link BucketWriter} used to write part files in the bucket.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
* @param outputFileConfig the part file configuration.
* @return The new Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> getNew(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
- return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
+ return new Bucket<>(subtaskIndex, bucketId, bucketPath, initialPartCounter, bucketWriter, rollingPolicy, outputFileConfig);
}
/**
* Restores a {@code Bucket} from the state included in the provided {@link BucketState}.
- * @param fsWriter the filesystem-specific {@link RecoverableWriter}.
* @param subtaskIndex the index of the subtask creating the bucket.
* @param initialPartCounter the initial counter for the part files of the bucket.
- * @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
+ * @param bucketWriter the {@link BucketWriter} used to write part files in the bucket.
* @param bucketState the initial state of the restored bucket.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
@@ -397,13 +370,12 @@ public class Bucket<IN, BucketID> {
* @return The restored Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> restore(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
- return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
+ return new Bucket<>(subtaskIndex, initialPartCounter, bucketWriter, rollingPolicy, bucketState, outputFileConfig);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
index 260e82c..6423627 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketFactory.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
import java.io.Serializable;
@@ -32,20 +31,18 @@ import java.io.Serializable;
interface BucketFactory<IN, BucketID> extends Serializable {
Bucket<IN, BucketID> getNewBucket(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) throws IOException;
Bucket<IN, BucketID> restoreBucket(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
index 1829381..75c00b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketState.java
@@ -46,30 +46,30 @@ class BucketState<BucketID> {
private final long inProgressFileCreationTime;
/**
- * A {@link RecoverableWriter.ResumeRecoverable} for the currently open
+ * A {@link InProgressFileWriter.InProgressFileRecoverable} for the currently open
* part file, or null if there is no currently open part file.
*/
@Nullable
- private final RecoverableWriter.ResumeRecoverable inProgressResumableFile;
+ private final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable;
/**
* The {@link RecoverableWriter.CommitRecoverable files} pending to be
* committed, organized by checkpoint id.
*/
- private final Map<Long, List<RecoverableWriter.CommitRecoverable>> committableFilesPerCheckpoint;
+ private final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
BucketState(
final BucketID bucketId,
final Path bucketPath,
final long inProgressFileCreationTime,
- @Nullable final RecoverableWriter.ResumeRecoverable inProgressResumableFile,
- final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint
+ @Nullable final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable,
+ final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint
) {
this.bucketId = Preconditions.checkNotNull(bucketId);
this.bucketPath = Preconditions.checkNotNull(bucketPath);
this.inProgressFileCreationTime = inProgressFileCreationTime;
- this.inProgressResumableFile = inProgressResumableFile;
- this.committableFilesPerCheckpoint = Preconditions.checkNotNull(pendingCommittablesPerCheckpoint);
+ this.inProgressFileRecoverable = inProgressFileRecoverable;
+ this.pendingFileRecoverablesPerCheckpoint = Preconditions.checkNotNull(pendingFileRecoverablesPerCheckpoint);
}
BucketID getBucketId() {
@@ -84,17 +84,17 @@ class BucketState<BucketID> {
return inProgressFileCreationTime;
}
- boolean hasInProgressResumableFile() {
- return inProgressResumableFile != null;
+ boolean hasInProgressFileRecoverable() {
+ return inProgressFileRecoverable != null;
}
@Nullable
- RecoverableWriter.ResumeRecoverable getInProgressResumableFile() {
- return inProgressResumableFile;
+ InProgressFileWriter.InProgressFileRecoverable getInProgressFileRecoverable() {
+ return inProgressFileRecoverable;
}
- Map<Long, List<RecoverableWriter.CommitRecoverable>> getCommittableFilesPerCheckpoint() {
- return committableFilesPerCheckpoint;
+ Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
+ return pendingFileRecoverablesPerCheckpoint;
}
@Override
@@ -105,13 +105,13 @@ class BucketState<BucketID> {
.append("BucketState for bucketId=").append(bucketId)
.append(" and bucketPath=").append(bucketPath);
- if (hasInProgressResumableFile()) {
+ if (hasInProgressFileRecoverable()) {
strBuilder.append(", has open part file created @ ").append(inProgressFileCreationTime);
}
- if (!committableFilesPerCheckpoint.isEmpty()) {
+ if (!pendingFileRecoverablesPerCheckpoint.isEmpty()) {
strBuilder.append(", has pending files for checkpoints: {");
- for (long checkpointId: committableFilesPerCheckpoint.keySet()) {
+ for (long checkpointId: pendingFileRecoverablesPerCheckpoint.keySet()) {
strBuilder.append(checkpointId).append(' ');
}
strBuilder.append('}');
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
index 04de246..5863a03 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializer.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
@@ -45,119 +44,172 @@ class BucketStateSerializer<BucketID> implements SimpleVersionedSerializer<Bucke
private static final int MAGIC_NUMBER = 0x1e764b79;
- private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer;
+ private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
- private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer;
+ private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
private final SimpleVersionedSerializer<BucketID> bucketIdSerializer;
BucketStateSerializer(
- final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer,
- final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer,
+ final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
+ final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
final SimpleVersionedSerializer<BucketID> bucketIdSerializer
) {
- this.resumableSerializer = Preconditions.checkNotNull(resumableSerializer);
- this.commitableSerializer = Preconditions.checkNotNull(commitableSerializer);
+ this.inProgressFileRecoverableSerializer = Preconditions.checkNotNull(inProgressFileRecoverableSerializer);
+ this.pendingFileRecoverableSerializer = Preconditions.checkNotNull(pendingFileRecoverableSerializer);
this.bucketIdSerializer = Preconditions.checkNotNull(bucketIdSerializer);
}
@Override
public int getVersion() {
- return 1;
+ return 2;
}
@Override
public byte[] serialize(BucketState<BucketID> state) throws IOException {
DataOutputSerializer out = new DataOutputSerializer(256);
out.writeInt(MAGIC_NUMBER);
- serializeV1(state, out);
+ serializeV2(state, out);
return out.getCopyOfBuffer();
}
@Override
public BucketState<BucketID> deserialize(int version, byte[] serialized) throws IOException {
+ final DataInputDeserializer in = new DataInputDeserializer(serialized);
+
switch (version) {
case 1:
- DataInputDeserializer in = new DataInputDeserializer(serialized);
validateMagicNumber(in);
return deserializeV1(in);
+ case 2:
+ validateMagicNumber(in);
+ return deserializeV2(in);
default:
throw new IOException("Unrecognized version or corrupt state: " + version);
}
}
- @VisibleForTesting
- void serializeV1(BucketState<BucketID> state, DataOutputView out) throws IOException {
- SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), out);
- out.writeUTF(state.getBucketPath().toString());
- out.writeLong(state.getInProgressFileCreationTime());
+ private void serializeV2(BucketState<BucketID> state, DataOutputView dataOutputView) throws IOException {
+ SimpleVersionedSerialization.writeVersionAndSerialize(bucketIdSerializer, state.getBucketId(), dataOutputView);
+ dataOutputView.writeUTF(state.getBucketPath().toString());
+ dataOutputView.writeLong(state.getInProgressFileCreationTime());
// put the current open part file
- if (state.hasInProgressResumableFile()) {
- final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
- out.writeBoolean(true);
- SimpleVersionedSerialization.writeVersionAndSerialize(resumableSerializer, resumable, out);
- }
- else {
- out.writeBoolean(false);
+ if (state.hasInProgressFileRecoverable()) {
+ final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
+ dataOutputView.writeBoolean(true);
+ SimpleVersionedSerialization.writeVersionAndSerialize(inProgressFileRecoverableSerializer, inProgressFileRecoverable, dataOutputView);
+ } else {
+ dataOutputView.writeBoolean(false);
}
// put the map of pending files per checkpoint
- final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommitters = state.getCommittableFilesPerCheckpoint();
+ final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverables = state.getPendingFileRecoverablesPerCheckpoint();
- // manually keep the version here to safe some bytes
- out.writeInt(commitableSerializer.getVersion());
+ dataOutputView.writeInt(pendingFileRecoverableSerializer.getVersion());
- out.writeInt(pendingCommitters.size());
- for (Entry<Long, List<RecoverableWriter.CommitRecoverable>> resumablesForCheckpoint : pendingCommitters.entrySet()) {
- List<RecoverableWriter.CommitRecoverable> resumables = resumablesForCheckpoint.getValue();
+ dataOutputView.writeInt(pendingFileRecoverables.size());
- out.writeLong(resumablesForCheckpoint.getKey());
- out.writeInt(resumables.size());
+ for (Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFilesForCheckpoint : pendingFileRecoverables.entrySet()) {
+ final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableList = pendingFilesForCheckpoint.getValue();
- for (RecoverableWriter.CommitRecoverable resumable : resumables) {
- byte[] serialized = commitableSerializer.serialize(resumable);
- out.writeInt(serialized.length);
- out.write(serialized);
+ dataOutputView.writeLong(pendingFilesForCheckpoint.getKey());
+ dataOutputView.writeInt(pendingFileRecoverableList.size());
+
+ for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : pendingFileRecoverableList) {
+ byte[] serialized = pendingFileRecoverableSerializer.serialize(pendingFileRecoverable);
+ dataOutputView.writeInt(serialized.length);
+ dataOutputView.write(serialized);
}
}
}
- @VisibleForTesting
- BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+ private BucketState<BucketID> deserializeV1(DataInputView in) throws IOException {
+
+ final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer = getCommitableSerializer();
+ final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer = getResumableSerializer();
+
final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, in);
final String bucketPathStr = in.readUTF();
final long creationTime = in.readLong();
// then get the current resumable stream
- RecoverableWriter.ResumeRecoverable current = null;
+ InProgressFileWriter.InProgressFileRecoverable current = null;
if (in.readBoolean()) {
- current = SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in);
+ current =
+ new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(
+ SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, in));
}
final int committableVersion = in.readInt();
final int numCheckpoints = in.readInt();
- final HashMap<Long, List<RecoverableWriter.CommitRecoverable>> resumablesPerCheckpoint = new HashMap<>(numCheckpoints);
+ final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablePerCheckpoint = new HashMap<>(numCheckpoints);
for (int i = 0; i < numCheckpoints; i++) {
final long checkpointId = in.readLong();
final int noOfResumables = in.readInt();
- final List<RecoverableWriter.CommitRecoverable> resumables = new ArrayList<>(noOfResumables);
+ final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(noOfResumables);
for (int j = 0; j < noOfResumables; j++) {
final byte[] bytes = new byte[in.readInt()];
in.readFully(bytes);
- resumables.add(commitableSerializer.deserialize(committableVersion, bytes));
+ pendingFileRecoverables.add(
+ new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(commitableSerializer.deserialize(committableVersion, bytes)));
}
- resumablesPerCheckpoint.put(checkpointId, resumables);
+ pendingFileRecoverablePerCheckpoint.put(checkpointId, pendingFileRecoverables);
}
return new BucketState<>(
- bucketId,
- new Path(bucketPathStr),
- creationTime,
- current,
- resumablesPerCheckpoint);
+ bucketId,
+ new Path(bucketPathStr),
+ creationTime,
+ current,
+ pendingFileRecoverablePerCheckpoint);
+ }
+
+ private BucketState<BucketID> deserializeV2(DataInputView dataInputView) throws IOException {
+ final BucketID bucketId = SimpleVersionedSerialization.readVersionAndDeSerialize(bucketIdSerializer, dataInputView);
+ final String bucketPathStr = dataInputView.readUTF();
+ final long creationTime = dataInputView.readLong();
+
+ // then get the current resumable stream
+ InProgressFileWriter.InProgressFileRecoverable current = null;
+ if (dataInputView.readBoolean()) {
+ current = SimpleVersionedSerialization.readVersionAndDeSerialize(inProgressFileRecoverableSerializer, dataInputView);
+ }
+
+ final int pendingFileRecoverableSerializerVersion = dataInputView.readInt();
+ final int numCheckpoints = dataInputView.readInt();
+ final HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint = new HashMap<>(numCheckpoints);
+
+ for (int i = 0; i < numCheckpoints; i++) {
+ final long checkpointId = dataInputView.readLong();
+ final int numOfPendingFileRecoverables = dataInputView.readInt();
+
+ final List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<>(numOfPendingFileRecoverables);
+ for (int j = 0; j < numOfPendingFileRecoverables; j++) {
+ final byte[] bytes = new byte[dataInputView.readInt()];
+ dataInputView.readFully(bytes);
+ pendingFileRecoverables.add(pendingFileRecoverableSerializer.deserialize(pendingFileRecoverableSerializerVersion, bytes));
+ }
+ pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables);
+ }
+
+ return new BucketState<>(bucketId, new Path(bucketPathStr), creationTime, current, pendingFileRecoverablesPerCheckpoint);
+ }
+
+ private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumableSerializer() {
+ final OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer
+ outputStreamBasedInProgressFileRecoverableSerializer =
+ (OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer) inProgressFileRecoverableSerializer;
+ return outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer();
+ }
+
+ private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitableSerializer() {
+ final OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer
+ outputStreamBasedPendingFileRecoverableSerializer =
+ (OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer) pendingFileRecoverableSerializer;
+ return outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer();
}
private static void validateMagicNumber(DataInputView in) throws IOException {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
new file mode 100644
index 0000000..ed3a0e2
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * An interface for factories that create the different {@link InProgressFileWriter writers}.
+ */
+@Internal
+interface BucketWriter<IN, BucketID> {
+
+ /**
+ * Used to create a new {@link InProgressFileWriter}.
+ * @param bucketID the id of the bucket this writer is writing to.
+ * @param path the path this writer will write to.
+ * @param creationTime the creation time of the file.
+ * @return the new {@link InProgressFileWriter}
+ * @throws IOException Thrown if creating a writer fails.
+ */
+ InProgressFileWriter<IN, BucketID> openNewInProgressFile(
+ final BucketID bucketID,
+ final Path path,
+ final long creationTime) throws IOException;
+
+ /**
+ * Used to resume a {@link InProgressFileWriter} from a {@link InProgressFileWriter.InProgressFileRecoverable}.
+ * @param bucketID the id of the bucket this writer is writing to.
+ * @param inProgressFileSnapshot the state of the part file.
+ * @param creationTime the creation time of the file.
+ * @return the resumed {@link InProgressFileWriter}
+ * @throws IOException Thrown if resuming a writer fails.
+ */
+ InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(
+ final BucketID bucketID,
+ final InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot,
+ final long creationTime) throws IOException;
+
+ /**
+ * @return the property of the {@link BucketWriter}
+ */
+ WriterProperties getProperties();
+
+ /**
+ * Recovers a pending file for finalizing and committing.
+ * @param pendingFileRecoverable The handle with the recovery information.
+ * @return A pending file
+ * @throws IOException Thrown if recovering a pending file fails.
+ */
+ PendingFile recoverPendingFile(final InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException;
+
+ /**
+ * Frees up any resources that were previously occupied in order to be able to
+ * recover from a (potential) failure.
+ *
+ * <p><b>NOTE:</b> This operation should not throw an exception, but return false if the cleanup did not
+ * happen for any reason.
+ *
+ * @param inProgressFileRecoverable the {@link InProgressFileWriter.InProgressFileRecoverable} whose state we want to clean-up.
+ * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+ * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+ * @throws IOException if an I/O error occurs
+ */
+ boolean cleanupInProgressFileRecoverable(final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
+
+ /**
+ * This represents the file that can not write any data to.
+ */
+ interface PendingFile {
+ /**
+ * Commits the pending file, making it visible. The file will contain the exact data
+ * as when the pending file was created.
+ *
+ * @throws IOException Thrown if committing fails.
+ */
+ void commit() throws IOException;
+
+ /**
+ * Commits the pending file, making it visible. The file will contain the exact data
+ * as when the pending file was created.
+ *
+ * <p>This method tolerates situations where the file was already committed and
+ * will not raise an exception in that case. This is important for idempotent
+ * commit retries as they need to happen after recovery.
+ *
+ * @throws IOException Thrown if committing fails.
+ */
+ void commitAfterRecovery() throws IOException;
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
index f055798..0c9b73f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java
@@ -21,9 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Preconditions;
@@ -61,7 +59,7 @@ public class Buckets<IN, BucketID> {
private final BucketAssigner<IN, BucketID> bucketAssigner;
- private final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory;
+ private final BucketWriter<IN, BucketID> bucketWriter;
private final RollingPolicy<IN, BucketID> rollingPolicy;
@@ -78,8 +76,6 @@ public class Buckets<IN, BucketID> {
private long maxPartCounter;
- private final RecoverableWriter fsWriter;
-
private final OutputFileConfig outputFileConfig;
// --------------------------- State Related Fields -----------------------------
@@ -92,23 +88,23 @@ public class Buckets<IN, BucketID> {
* @param basePath The base path for our buckets.
* @param bucketAssigner The {@link BucketAssigner} provided by the user.
* @param bucketFactory The {@link BucketFactory} to be used to create buckets.
- * @param partFileWriterFactory The {@link PartFileWriter.PartFileFactory} to be used when writing data.
+ * @param bucketWriter The {@link BucketWriter} to be used when writing data.
* @param rollingPolicy The {@link RollingPolicy} as specified by the user.
*/
Buckets(
final Path basePath,
final BucketAssigner<IN, BucketID> bucketAssigner,
final BucketFactory<IN, BucketID> bucketFactory,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
@Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
final int subtaskIndex,
- final OutputFileConfig outputFileConfig) throws IOException {
+ final OutputFileConfig outputFileConfig) {
this.basePath = Preconditions.checkNotNull(basePath);
this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
- this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory);
+ this.bucketWriter = Preconditions.checkNotNull(bucketWriter);
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
this.bucketLifeCycleListener = bucketLifeCycleListener;
this.subtaskIndex = subtaskIndex;
@@ -118,19 +114,10 @@ public class Buckets<IN, BucketID> {
this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
- try {
- this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter();
- } catch (IOException e) {
- LOG.error("Unable to create filesystem for path: {}", basePath);
- throw e;
- }
-
this.bucketStateSerializer = new BucketStateSerializer<>(
- fsWriter.getResumeRecoverableSerializer(),
- fsWriter.getCommitRecoverableSerializer(),
- bucketAssigner.getSerializer()
- );
-
+ bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+ bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+ bucketAssigner.getSerializer());
this.maxPartCounter = 0L;
}
@@ -185,10 +172,9 @@ public class Buckets<IN, BucketID> {
final Bucket<IN, BucketID> restoredBucket = bucketFactory
.restoreBucket(
- fsWriter,
subtaskIndex,
maxPartCounter,
- partFileWriterFactory,
+ bucketWriter,
rollingPolicy,
recoveredState,
outputFileConfig
@@ -238,7 +224,7 @@ public class Buckets<IN, BucketID> {
final ListState<Long> partCounterStateContainer) throws Exception {
Preconditions.checkState(
- fsWriter != null && bucketStateSerializer != null,
+ bucketWriter != null && bucketStateSerializer != null,
"sink has not been initialized");
LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
@@ -308,12 +294,11 @@ public class Buckets<IN, BucketID> {
if (bucket == null) {
final Path bucketPath = assembleBucketPath(bucketId);
bucket = bucketFactory.getNewBucket(
- fsWriter,
subtaskIndex,
bucketId,
bucketPath,
maxPartCounter,
- partFileWriterFactory,
+ bucketWriter,
rollingPolicy,
outputFileConfig);
activeBuckets.put(bucketId, bucket);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
new file mode 100644
index 0000000..0f3cb9c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkBucketWriter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A factory that creates {@link BulkPartWriter BulkPartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
+ */
+@Internal
+class BulkBucketWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
+
+ private final BulkWriter.Factory<IN> writerFactory;
+
+ BulkBucketWriter(final RecoverableWriter recoverableWriter, BulkWriter.Factory<IN> writerFactory) throws IOException {
+ super(recoverableWriter);
+ this.writerFactory = writerFactory;
+ }
+
+ @Override
+ public InProgressFileWriter<IN, BucketID> resumeFrom(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream stream,
+ final RecoverableWriter.ResumeRecoverable resumable,
+ final long creationTime) throws IOException {
+
+ Preconditions.checkNotNull(stream);
+ Preconditions.checkNotNull(resumable);
+
+ final BulkWriter<IN> writer = writerFactory.create(stream);
+ return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+ }
+
+ @Override
+ public InProgressFileWriter<IN, BucketID> openNew(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream stream,
+ final Path path,
+ final long creationTime) throws IOException {
+
+ Preconditions.checkNotNull(stream);
+ Preconditions.checkNotNull(path);
+
+ final BulkWriter<IN> writer = writerFactory.create(stream);
+ return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index a44b0e8..b1b7864 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -20,23 +20,21 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
- * A {@link PartFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
+ * A {@link InProgressFileWriter} for bulk-encoding formats that use an {@link BulkPartWriter}.
* This also implements the {@link PartFileInfo}.
*/
@Internal
-final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+final class BulkPartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
private final BulkWriter<IN> writer;
- private BulkPartWriter(
+ BulkPartWriter(
final BucketID bucketId,
final RecoverableFsDataOutputStream currentPartStream,
final BulkWriter<IN> writer,
@@ -46,62 +44,20 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
}
@Override
- void write(IN element, long currentTime) throws IOException {
+ public void write(IN element, long currentTime) throws IOException {
writer.addElement(element);
markWrite(currentTime);
}
@Override
- RecoverableWriter.ResumeRecoverable persist() {
+ public InProgressFileRecoverable persist() {
throw new UnsupportedOperationException("Bulk Part Writers do not support \"pause and resume\" operations.");
}
@Override
- RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
+ public PendingFileRecoverable closeForCommit() throws IOException {
writer.flush();
writer.finish();
return super.closeForCommit();
}
-
- /**
- * A factory that creates {@link BulkPartWriter BulkPartWriters}.
- * @param <IN> The type of input elements.
- * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
- */
- static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
-
- private final BulkWriter.Factory<IN> writerFactory;
-
- Factory(BulkWriter.Factory<IN> writerFactory) {
- this.writerFactory = writerFactory;
- }
-
- @Override
- public PartFileWriter<IN, BucketID> resumeFrom(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream stream,
- final RecoverableWriter.ResumeRecoverable resumable,
- final long creationTime) throws IOException {
-
- Preconditions.checkNotNull(stream);
- Preconditions.checkNotNull(resumable);
-
- final BulkWriter<IN> writer = writerFactory.create(stream);
- return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
- }
-
- @Override
- public PartFileWriter<IN, BucketID> openNew(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream stream,
- final Path path,
- final long creationTime) throws IOException {
-
- Preconditions.checkNotNull(stream);
- Preconditions.checkNotNull(path);
-
- final BulkWriter<IN> writer = writerFactory.create(stream);
- return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
- }
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
index 529b93a..bb20b97 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/DefaultBucketFactoryImpl.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
import java.io.IOException;
@@ -34,41 +33,37 @@ class DefaultBucketFactoryImpl<IN, BucketID> implements BucketFactory<IN, Bucket
@Override
public Bucket<IN, BucketID> getNewBucket(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final BucketID bucketId,
final Path bucketPath,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final OutputFileConfig outputFileConfig) {
return Bucket.getNew(
- fsWriter,
subtaskIndex,
bucketId,
bucketPath,
initialPartCounter,
- partFileWriterFactory,
+ bucketWriter,
rollingPolicy,
outputFileConfig);
}
@Override
public Bucket<IN, BucketID> restoreBucket(
- final RecoverableWriter fsWriter,
final int subtaskIndex,
final long initialPartCounter,
- final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
+ final BucketWriter<IN, BucketID> bucketWriter,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final OutputFileConfig outputFileConfig) throws IOException {
return Bucket.restore(
- fsWriter,
subtaskIndex,
initialPartCounter,
- partFileWriterFactory,
+ bucketWriter,
rollingPolicy,
bucketState,
outputFileConfig);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
new file mode 100644
index 0000000..60798d1
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/InProgressFileWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.IOException;
+
+/**
+ * The {@link Bucket} uses the {@link InProgressFileWriter} to write element to a part file.
+ */
+@Internal
+interface InProgressFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
+
+ /**
+ * Write a element to the part file.
+ * @param element the element to be written.
+ * @param currentTime the writing time.
+ * @throws IOException Thrown if writing the element fails.
+ */
+ void write(final IN element, final long currentTime) throws IOException;
+
+ /**
+ * @return The state of the current part file.
+ * @throws IOException Thrown if persisting the part file fails.
+ */
+ InProgressFileRecoverable persist() throws IOException;
+
+
+ /**
+ * @return The state of the pending part file. {@link Bucket} uses this to commit the pending file.
+ * @throws IOException Thrown if an I/O error occurs.
+ */
+ PendingFileRecoverable closeForCommit() throws IOException;
+
+ /**
+ * Dispose the part file.
+ */
+ void dispose();
+
+ // ------------------------------------------------------------------------
+
+
+ /**
+ * A handle can be used to recover in-progress file..
+ */
+ interface InProgressFileRecoverable extends PendingFileRecoverable {}
+
+
+ /**
+ * The handle can be used to recover pending file.
+ */
+ interface PendingFileRecoverable {}
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
new file mode 100644
index 0000000..2d8c423
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedPartFileWriter.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+
+/**
+ * The base class for all the part file writer that use {@link org.apache.flink.core.fs.RecoverableFsDataOutputStream}.
+ * @param <IN> the element type
+ * @param <BucketID> the bucket type
+ */
+public abstract class OutputStreamBasedPartFileWriter<IN, BucketID> extends AbstractPartFileWriter<IN, BucketID> {
+
+ final RecoverableFsDataOutputStream currentPartStream;
+
+ OutputStreamBasedPartFileWriter(
+ final BucketID bucketID,
+ final RecoverableFsDataOutputStream recoverableFsDataOutputStream,
+ final long createTime) {
+ super(bucketID, createTime);
+ this.currentPartStream = recoverableFsDataOutputStream;
+ }
+
+ @Override
+ public InProgressFileRecoverable persist() throws IOException {
+ return new OutputStreamBasedInProgressFileRecoverable(currentPartStream.persist());
+ }
+
+ @Override
+ public PendingFileRecoverable closeForCommit() throws IOException {
+ return new OutputStreamBasedPendingFileRecoverable(currentPartStream.closeForCommit().getRecoverable());
+ }
+
+ @Override
+ public void dispose() {
+ // we can suppress exceptions here, because we do not rely on close() to
+ // flush or persist any data
+ IOUtils.closeQuietly(currentPartStream);
+ }
+
+ @Override
+ public long getSize() throws IOException {
+ return currentPartStream.getPos();
+ }
+
+ abstract static class OutputStreamBasedBucketWriter<IN, BucketID> implements BucketWriter<IN, BucketID> {
+
+ private final RecoverableWriter recoverableWriter;
+
+ OutputStreamBasedBucketWriter(final RecoverableWriter recoverableWriter) {
+ this.recoverableWriter = recoverableWriter;
+ }
+
+ @Override
+ public InProgressFileWriter<IN, BucketID> openNewInProgressFile(final BucketID bucketID, final Path path, final long creationTime) throws IOException {
+ return openNew(bucketID, recoverableWriter.open(path), path, creationTime);
+ }
+
+ @Override
+ public InProgressFileWriter<IN, BucketID> resumeInProgressFileFrom(final BucketID bucketID, final InProgressFileRecoverable inProgressFileRecoverable, final long creationTime) throws IOException {
+ final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable;
+ return resumeFrom(
+ bucketID,
+ recoverableWriter.recover(outputStreamBasedInProgressRecoverable.getResumeRecoverable()),
+ outputStreamBasedInProgressRecoverable.getResumeRecoverable(),
+ creationTime);
+ }
+
+ @Override
+ public PendingFile recoverPendingFile(final PendingFileRecoverable pendingFileRecoverable) throws IOException {
+ final RecoverableWriter.CommitRecoverable commitRecoverable;
+
+ if (pendingFileRecoverable instanceof OutputStreamBasedPendingFileRecoverable) {
+ commitRecoverable = ((OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable).getCommitRecoverable();
+ } else if (pendingFileRecoverable instanceof OutputStreamBasedInProgressFileRecoverable) {
+ commitRecoverable = ((OutputStreamBasedInProgressFileRecoverable) pendingFileRecoverable).getResumeRecoverable();
+ } else {
+ throw new IllegalArgumentException("can not recover from the pendingFileRecoverable");
+ }
+ return new OutputStreamBasedPendingFile(recoverableWriter.recoverForCommit(commitRecoverable));
+ }
+
+ @Override
+ public boolean cleanupInProgressFileRecoverable(InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
+ final RecoverableWriter.ResumeRecoverable resumeRecoverable =
+ ((OutputStreamBasedInProgressFileRecoverable) inProgressFileRecoverable).getResumeRecoverable();
+ return recoverableWriter.cleanupRecoverableState(resumeRecoverable);
+ }
+
+ @Override
+ public WriterProperties getProperties() {
+ return new WriterProperties(
+ new OutputStreamBasedInProgressFileRecoverableSerializer(recoverableWriter.getResumeRecoverableSerializer()),
+ new OutputStreamBasedPendingFileRecoverableSerializer(recoverableWriter.getCommitRecoverableSerializer()),
+ recoverableWriter.supportsResume());
+ }
+
+ public abstract InProgressFileWriter<IN, BucketID> openNew(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream stream,
+ final Path path,
+ final long creationTime) throws IOException;
+
+ public abstract InProgressFileWriter<IN, BucketID> resumeFrom(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream stream,
+ final RecoverableWriter.ResumeRecoverable resumable,
+ final long creationTime) throws IOException;
+ }
+
+ static final class OutputStreamBasedPendingFileRecoverable implements PendingFileRecoverable {
+
+ private final RecoverableWriter.CommitRecoverable commitRecoverable;
+
+ OutputStreamBasedPendingFileRecoverable(final RecoverableWriter.CommitRecoverable commitRecoverable) {
+ this.commitRecoverable = commitRecoverable;
+ }
+
+ RecoverableWriter.CommitRecoverable getCommitRecoverable() {
+ return commitRecoverable;
+ }
+ }
+
+ static final class OutputStreamBasedInProgressFileRecoverable implements InProgressFileRecoverable {
+
+ private final RecoverableWriter.ResumeRecoverable resumeRecoverable;
+
+ OutputStreamBasedInProgressFileRecoverable(final RecoverableWriter.ResumeRecoverable resumeRecoverable) {
+ this.resumeRecoverable = resumeRecoverable;
+ }
+
+ RecoverableWriter.ResumeRecoverable getResumeRecoverable() {
+ return resumeRecoverable;
+ }
+ }
+
+ static final class OutputStreamBasedPendingFile implements BucketWriter.PendingFile {
+
+ private final RecoverableFsDataOutputStream.Committer committer;
+
+ OutputStreamBasedPendingFile(final RecoverableFsDataOutputStream.Committer committer) {
+ this.committer = committer;
+ }
+
+ @Override
+ public void commit() throws IOException {
+ committer.commit();
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+ committer.commitAfterRecovery();
+ }
+ }
+
+ static class OutputStreamBasedInProgressFileRecoverableSerializer implements SimpleVersionedSerializer<InProgressFileRecoverable> {
+
+ private static final int MAGIC_NUMBER = 0xb3a4073d;
+
+ private final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer;
+
+ OutputStreamBasedInProgressFileRecoverableSerializer(SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumeSerializer) {
+ this.resumeSerializer = resumeSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(InProgressFileRecoverable inProgressRecoverable) throws IOException {
+ OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable = (OutputStreamBasedInProgressFileRecoverable) inProgressRecoverable;
+ DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
+ dataOutputSerializer.writeInt(MAGIC_NUMBER);
+ serializeV1(outputStreamBasedInProgressRecoverable, dataOutputSerializer);
+ return dataOutputSerializer.getCopyOfBuffer();
+ }
+
+ @Override
+ public InProgressFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ DataInputView dataInputView = new DataInputDeserializer(serialized);
+ validateMagicNumber(dataInputView);
+ return deserializeV1(dataInputView);
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeSerializer() {
+ return resumeSerializer;
+ }
+
+ private void serializeV1(final OutputStreamBasedInProgressFileRecoverable outputStreamBasedInProgressRecoverable, final DataOutputView dataOutputView) throws IOException {
+ SimpleVersionedSerialization.writeVersionAndSerialize(resumeSerializer, outputStreamBasedInProgressRecoverable.getResumeRecoverable(), dataOutputView);
+ }
+
+ private OutputStreamBasedInProgressFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
+ return new OutputStreamBasedInProgressFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(resumeSerializer, dataInputView));
+ }
+
+ private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
+ final int magicNumber = dataInputView.readInt();
+ if (magicNumber != MAGIC_NUMBER) {
+ throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+ }
+ }
+ }
+
+ static class OutputStreamBasedPendingFileRecoverableSerializer implements SimpleVersionedSerializer<PendingFileRecoverable> {
+
+ private static final int MAGIC_NUMBER = 0x2c853c89;
+
+ private final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer;
+
+ OutputStreamBasedPendingFileRecoverableSerializer(final SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitSerializer) {
+ this.commitSerializer = commitSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(PendingFileRecoverable pendingFileRecoverable) throws IOException {
+ OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable = (OutputStreamBasedPendingFileRecoverable) pendingFileRecoverable;
+ DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
+ dataOutputSerializer.writeInt(MAGIC_NUMBER);
+ serializeV1(outputStreamBasedPendingFileRecoverable, dataOutputSerializer);
+ return dataOutputSerializer.getCopyOfBuffer();
+ }
+
+ @Override
+ public PendingFileRecoverable deserialize(int version, byte[] serialized) throws IOException {
+ switch (version) {
+ case 1:
+ DataInputDeserializer in = new DataInputDeserializer(serialized);
+ validateMagicNumber(in);
+ return deserializeV1(in);
+
+ default:
+ throw new IOException("Unrecognized version or corrupt state: " + version);
+ }
+ }
+
+ SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitSerializer() {
+ return this.commitSerializer;
+ }
+
+ private void serializeV1(final OutputStreamBasedPendingFileRecoverable outputStreamBasedPendingFileRecoverable, final DataOutputView dataOutputView) throws IOException {
+ SimpleVersionedSerialization.writeVersionAndSerialize(commitSerializer, outputStreamBasedPendingFileRecoverable.getCommitRecoverable(), dataOutputView);
+ }
+
+ private OutputStreamBasedPendingFileRecoverable deserializeV1(final DataInputView dataInputView) throws IOException {
+ return new OutputStreamBasedPendingFileRecoverable(SimpleVersionedSerialization.readVersionAndDeSerialize(commitSerializer, dataInputView));
+ }
+
+ private static void validateMagicNumber(final DataInputView dataInputView) throws IOException {
+ final int magicNumber = dataInputView.readInt();
+ if (magicNumber != MAGIC_NUMBER) {
+ throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
+ }
+ }
+ }
+
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
deleted file mode 100644
index 95a2978a..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.sink.filesystem;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-
-/**
- * An abstract writer for the currently open part file in a specific {@link Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- * <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- * <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
- */
-@Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
-
- private final BucketID bucketId;
-
- private final long creationTime;
-
- protected final RecoverableFsDataOutputStream currentPartStream;
-
- private long lastUpdateTime;
-
- protected PartFileWriter(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream currentPartStream,
- final long creationTime) {
-
- Preconditions.checkArgument(creationTime >= 0L);
- this.bucketId = Preconditions.checkNotNull(bucketId);
- this.currentPartStream = Preconditions.checkNotNull(currentPartStream);
- this.creationTime = creationTime;
- this.lastUpdateTime = creationTime;
- }
-
- abstract void write(IN element, long currentTime) throws IOException;
-
- RecoverableWriter.ResumeRecoverable persist() throws IOException {
- return currentPartStream.persist();
- }
-
- RecoverableWriter.CommitRecoverable closeForCommit() throws IOException {
- return currentPartStream.closeForCommit().getRecoverable();
- }
-
- void dispose() {
- // we can suppress exceptions here, because we do not rely on close() to
- // flush or persist any data
- IOUtils.closeQuietly(currentPartStream);
- }
-
- void markWrite(long now) {
- this.lastUpdateTime = now;
- }
-
- @Override
- public BucketID getBucketId() {
- return bucketId;
- }
-
- @Override
- public long getCreationTime() {
- return creationTime;
- }
-
- @Override
- public long getSize() throws IOException {
- return currentPartStream.getPos();
- }
-
- @Override
- public long getLastUpdateTime() {
- return lastUpdateTime;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * An interface for factories that create the different {@link PartFileWriter writers}.
- */
- interface PartFileFactory<IN, BucketID> {
-
- /**
- * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
- * @param bucketId the id of the bucket this writer is writing to.
- * @param stream the filesystem-specific output stream to use when writing to the filesystem.
- * @param resumable the state of the stream we are resurrecting.
- * @param creationTime the creation time of the stream.
- * @return the recovered {@link PartFileWriter writer}.
- * @throws IOException
- */
- PartFileWriter<IN, BucketID> resumeFrom(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream stream,
- final RecoverableWriter.ResumeRecoverable resumable,
- final long creationTime) throws IOException;
-
- /**
- * Used to create a new {@link PartFileWriter writer}.
- * @param bucketId the id of the bucket this writer is writing to.
- * @param stream the filesystem-specific output stream to use when writing to the filesystem.
- * @param path the part this writer will write to.
- * @param creationTime the creation time of the stream.
- * @return the new {@link PartFileWriter writer}.
- * @throws IOException
- */
- PartFileWriter<IN, BucketID> openNew(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream stream,
- final Path path,
- final long creationTime) throws IOException;
- }
-}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
new file mode 100644
index 0000000..784f8be
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWiseBucketWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
+ * @param <IN> The type of input elements.
+ * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
+ */
+@Internal
+class RowWiseBucketWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter.OutputStreamBasedBucketWriter<IN, BucketID> {
+
+ private final Encoder<IN> encoder;
+
+ RowWiseBucketWriter(final RecoverableWriter recoverableWriter, final Encoder<IN> encoder) {
+ super(recoverableWriter);
+ this.encoder = encoder;
+ }
+
+ @Override
+ public InProgressFileWriter<IN, BucketID> resumeFrom(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream stream,
+ final RecoverableWriter.ResumeRecoverable resumable,
+ final long creationTime) {
+
+ Preconditions.checkNotNull(stream);
+ Preconditions.checkNotNull(resumable);
+
+ return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+ }
+
+ @Override
+ public InProgressFileWriter<IN, BucketID> openNew(
+ final BucketID bucketId,
+ final RecoverableFsDataOutputStream stream,
+ final Path path,
+ final long creationTime) {
+
+ Preconditions.checkNotNull(stream);
+ Preconditions.checkNotNull(path);
+
+ return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 05c160c..bed9ec7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -20,23 +20,21 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
/**
- * A {@link PartFileWriter} for row-wise formats that use an {@link Encoder}.
+ * A {@link InProgressFileWriter} for row-wise formats that use an {@link Encoder}.
* This also implements the {@link PartFileInfo}.
*/
@Internal
-final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
+final class RowWisePartWriter<IN, BucketID> extends OutputStreamBasedPartFileWriter<IN, BucketID> {
private final Encoder<IN> encoder;
- private RowWisePartWriter(
+ RowWisePartWriter(
final BucketID bucketId,
final RecoverableFsDataOutputStream currentPartStream,
final Encoder<IN> encoder,
@@ -46,48 +44,8 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
}
@Override
- void write(IN element, long currentTime) throws IOException {
+ public void write(final IN element, final long currentTime) throws IOException {
encoder.encode(element, currentPartStream);
markWrite(currentTime);
}
-
- /**
- * A factory that creates {@link RowWisePartWriter RowWisePartWriters}.
- * @param <IN> The type of input elements.
- * @param <BucketID> The type of ids for the buckets, as returned by the {@link BucketAssigner}.
- */
- static class Factory<IN, BucketID> implements PartFileWriter.PartFileFactory<IN, BucketID> {
-
- private final Encoder<IN> encoder;
-
- Factory(Encoder<IN> encoder) {
- this.encoder = encoder;
- }
-
- @Override
- public PartFileWriter<IN, BucketID> resumeFrom(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream stream,
- final RecoverableWriter.ResumeRecoverable resumable,
- final long creationTime) throws IOException {
-
- Preconditions.checkNotNull(stream);
- Preconditions.checkNotNull(resumable);
-
- return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
- }
-
- @Override
- public PartFileWriter<IN, BucketID> openNew(
- final BucketID bucketId,
- final RecoverableFsDataOutputStream stream,
- final Path path,
- final long creationTime) throws IOException {
-
- Preconditions.checkNotNull(stream);
- Preconditions.checkNotNull(path);
-
- return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
- }
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
index cb58529..64e4418 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
@@ -277,7 +277,7 @@ public class StreamingFileSink<IN>
basePath,
bucketAssigner,
bucketFactory,
- new RowWisePartWriter.Factory<>(encoder),
+ new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
@@ -397,7 +397,7 @@ public class StreamingFileSink<IN>
basePath,
bucketAssigner,
bucketFactory,
- new BulkPartWriter.Factory<>(writerFactory),
+ new BulkBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), writerFactory),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java
new file mode 100644
index 0000000..4fee03c
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/WriterProperties.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class describes the property of the {@link BucketWriter}.
+ */
+@Internal
+public class WriterProperties {
+
+ private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
+
+ private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;
+
+ private final boolean supportsResume;
+
+ WriterProperties(
+ SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer,
+ SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer,
+ boolean supportsResume) {
+ this.inProgressFileRecoverableSerializer = checkNotNull(inProgressFileRecoverableSerializer);
+ this.pendingFileRecoverableSerializer = checkNotNull(pendingFileRecoverableSerializer);
+ this.supportsResume = supportsResume;
+ }
+
+ /**
+ * @return Whether the {@link BucketWriter} support appending data to the restored the in-progress file or not.
+ */
+ boolean supportsResume() {
+ return supportsResume;
+ }
+
+ /**
+ * @return the serializer for the {@link InProgressFileWriter.PendingFileRecoverable}.
+ */
+ SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverableSerializer() {
+ return pendingFileRecoverableSerializer;
+ }
+
+ /**
+ * @return the serializer for the {@link InProgressFileWriter.InProgressFileRecoverable}.
+ */
+ SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> getInProgressFileRecoverableSerializer() {
+ return inProgressFileRecoverableSerializer;
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
index f48e467..ff2cc5a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
@@ -54,7 +55,7 @@ public class BucketAssignerITCases {
basePath,
new BasePathBucketAssigner<>(),
new DefaultBucketFactoryImpl<>(),
- new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+ new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicy,
null,
0,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
index 81c5766..cbd18c4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketStateSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
@@ -136,7 +135,7 @@ public class BucketStateSerializerTest {
Assert.assertEquals(testBucketPath, bucket.getBucketPath());
Assert.assertNull(bucket.getInProgressPart());
- Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+ Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
}
@Test
@@ -266,8 +265,8 @@ public class BucketStateSerializerTest {
final int noOfPendingCheckpoints = 5;
// there are 5 checkpoint does not complete.
- final Map<Long, List<RecoverableWriter.CommitRecoverable>>
- pendingFileRecoverables = recoveredState.getCommittableFilesPerCheckpoint();
+ final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>>
+ pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint();
Assert.assertEquals(5L, pendingFileRecoverables.size());
final Set<String> beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
@@ -283,7 +282,7 @@ public class BucketStateSerializerTest {
// recover and commit
final Bucket bucket = restoreBucket(noOfPendingCheckpoints + 1, recoveredState);
Assert.assertEquals(testBucketPath, bucket.getBucketPath());
- Assert.assertEquals(0, bucket.getPendingPartsPerCheckpoint().size());
+ Assert.assertEquals(0, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
final Set<String> afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID))
.map(file -> file.getFileName().toString())
@@ -313,27 +312,37 @@ public class BucketStateSerializerTest {
private static Bucket<String, String> createNewBucket(final Path bucketPath) throws IOException {
return Bucket.getNew(
- FileSystem.getLocalFileSystem().createRecoverableWriter(),
0,
BUCKET_ID,
bucketPath,
0,
- new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+ createBucketWriter(),
DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
OutputFileConfig.builder().build());
}
private static Bucket<String, String> restoreBucket(final int initialPartCounter, final BucketState<String> bucketState) throws IOException {
return Bucket.restore(
- FileSystem.getLocalFileSystem().createRecoverableWriter(),
0,
initialPartCounter,
- new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+ createBucketWriter(),
DefaultRollingPolicy.builder().withMaxPartSize(10).build(),
bucketState,
OutputFileConfig.builder().build());
}
+ private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
+ return new RowWiseBucketWriter<>(FileSystem.getLocalFileSystem().createRecoverableWriter(), new SimpleStringEncoder<>());
+ }
+
+ private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
+ final RowWiseBucketWriter bucketWriter = createBucketWriter();
+ return new BucketStateSerializer<>(
+ bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+ bucketWriter.getProperties().getPendingFileRecoverableSerializer(),
+ SimpleVersionedStringSerializer.INSTANCE);
+ }
+
private static BucketState<String> readBucketState(final String scenarioName, final int version) throws IOException {
byte[] bytes = Files.readAllBytes(getSnapshotPath(scenarioName, version));
return SimpleVersionedSerialization.readVersionAndDeSerialize(bucketStateSerializer(), bytes);
@@ -351,14 +360,6 @@ public class BucketStateSerializerTest {
return readBucketState(scenarioName, version);
}
- private static SimpleVersionedSerializer<BucketState<String>> bucketStateSerializer() throws IOException {
- RecoverableWriter recoverableWriter = FileSystem.getLocalFileSystem().createRecoverableWriter();
- return new BucketStateSerializer<>(
- recoverableWriter.getResumeRecoverableSerializer(),
- recoverableWriter.getCommitRecoverableSerializer(),
- SimpleVersionedStringSerializer.INSTANCE);
- }
-
private static void moveToTemplateDirectory(java.nio.file.Path scenarioPath) throws IOException {
FileUtils.copy(new Path(scenarioPath.toString()), new Path(scenarioPath.toString() + "-template"), false);
FileUtils.deleteDirectory(scenarioPath.toFile());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index ee85e55..a4d9a09 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
+import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -171,7 +172,7 @@ public class BucketTest {
return new TypeSafeMatcher<BucketState<String>>() {
@Override
protected boolean matchesSafely(BucketState<String> state) {
- return state.getInProgressResumableFile() != null;
+ return state.getInProgressFileRecoverable() != null;
}
@Override
@@ -185,7 +186,7 @@ public class BucketTest {
return new TypeSafeMatcher<BucketState<String>>() {
@Override
protected boolean matchesSafely(BucketState<String> state) {
- return state.getInProgressResumableFile() == null;
+ return state.getInProgressFileRecoverable() == null;
}
@Override
@@ -200,7 +201,7 @@ public class BucketTest {
return new TypeSafeMatcher<Bucket<String, String>>() {
@Override
protected boolean matchesSafely(Bucket<String, String> bucket) {
- final PartFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
+ final InProgressFileWriter<String, String> inProgressPart = bucket.getInProgressPart();
return isNull == (inProgressPart == null);
}
@@ -349,23 +350,21 @@ public class BucketTest {
private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.builder().build();
- private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
- new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
+ private static final Encoder ENCODER = new SimpleStringEncoder<>();
private static Bucket<String, String> createBucket(
final RecoverableWriter writer,
final Path bucketPath,
final int subtaskIdx,
final int initialPartCounter,
- final OutputFileConfig outputFileConfig) {
+ final OutputFileConfig outputFileConfig) throws IOException {
return Bucket.getNew(
- writer,
subtaskIdx,
bucketId,
bucketPath,
initialPartCounter,
- partFileFactory,
+ new RowWiseBucketWriter<>(writer, ENCODER),
rollingPolicy,
outputFileConfig);
}
@@ -378,10 +377,9 @@ public class BucketTest {
final OutputFileConfig outputFileConfig) throws Exception {
return Bucket.restore(
- writer,
subtaskIndex,
initialPartCounter,
- partFileFactory,
+ new RowWiseBucketWriter<>(writer, ENCODER),
rollingPolicy,
bucketState,
outputFileConfig);
@@ -402,24 +400,46 @@ public class BucketTest {
private Bucket<String, String> getRestoredBucketWithOnlyInProgressPart(final BaseStubWriter writer) throws IOException {
final BucketState<String> stateWithOnlyInProgressFile =
- new BucketState<>("test", new Path(), 12345L, new NoOpRecoverable(), new HashMap<>());
- return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, stateWithOnlyInProgressFile, OutputFileConfig.builder().build());
+ new BucketState<>(
+ "test",
+ new Path(),
+ 12345L,
+ new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(new NoOpRecoverable()),
+ new HashMap<>());
+
+ return Bucket.restore(
+ 0,
+ 1L,
+ new RowWiseBucketWriter<>(writer, ENCODER),
+ rollingPolicy,
+ stateWithOnlyInProgressFile,
+ OutputFileConfig.builder().build());
}
private Bucket<String, String> getRestoredBucketWithOnlyPendingParts(final BaseStubWriter writer, final int numberOfPendingParts) throws IOException {
- final Map<Long, List<RecoverableWriter.CommitRecoverable>> completePartsPerCheckpoint =
+ final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint =
createPendingPartsPerCheckpoint(numberOfPendingParts);
final BucketState<String> initStateWithOnlyInProgressFile =
- new BucketState<>("test", new Path(), 12345L, null, completePartsPerCheckpoint);
- return Bucket.restore(writer, 0, 1L, partFileFactory, rollingPolicy, initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
+ new BucketState<>(
+ "test",
+ new Path(),
+ 12345L,
+ null,
+ completePartsPerCheckpoint);
+ return Bucket.restore(
+ 0,
+ 1L,
+ new RowWiseBucketWriter<>(writer, ENCODER),
+ rollingPolicy,
+ initStateWithOnlyInProgressFile, OutputFileConfig.builder().build());
}
- private Map<Long, List<RecoverableWriter.CommitRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
- final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
+ private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
+ final Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<>();
for (int checkpointId = 0; checkpointId < noOfCheckpoints; checkpointId++) {
- final List<RecoverableWriter.CommitRecoverable> pending = new ArrayList<>();
- pending.add(new NoOpRecoverable());
+ final List<InProgressFileWriter.PendingFileRecoverable> pending = new ArrayList<>();
+ pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable()));
pendingCommittablesPerCheckpoint.put((long) checkpointId, pending);
}
return pendingCommittablesPerCheckpoint;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
index e996444..8e2117a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState;
@@ -93,8 +94,8 @@ public class BucketsTest {
return bucket.getBucketId().equals(bucketId) &&
bucket.getBucketPath().equals(new Path(testTmpPath, bucketId)) &&
bucket.getInProgressPart() == null &&
- bucket.getPendingPartsForCurrentCheckpoint().isEmpty() &&
- bucket.getPendingPartsPerCheckpoint().size() == 1;
+ bucket.getPendingFileRecoverablesForCurrentCheckpoint().isEmpty() &&
+ bucket.getPendingFileRecoverablesPerCheckpoint().size() == 1;
}
@Override
@@ -145,7 +146,7 @@ public class BucketsTest {
Assert.assertEquals(2L, bucketsTwo.getMaxPartCounter());
// make sure we have one in-progress file here and a pending
- Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingPartsPerCheckpoint().size());
+ Assert.assertEquals(1L, bucketsTwo.getActiveBuckets().get("test1").getPendingFileRecoverablesPerCheckpoint().size());
Assert.assertNotNull(bucketsTwo.getActiveBuckets().get("test1").getInProgressPart());
final ListState<byte[]> mergedBucketStateContainer = new MockListState<>();
@@ -175,10 +176,10 @@ public class BucketsTest {
// this is due to the Bucket#merge(). The in progress file of one
// of the previous tasks is put in the list of pending files.
- Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
+ Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
// we commit the pending for previous checkpoints
- Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+ Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
}
@Test
@@ -210,8 +211,8 @@ public class BucketsTest {
Assert.assertEquals("test", bucket.getBucketId());
Assert.assertNull(bucket.getInProgressPart());
- Assert.assertEquals(1L, bucket.getPendingPartsForCurrentCheckpoint().size());
- Assert.assertTrue(bucket.getPendingPartsPerCheckpoint().isEmpty());
+ Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
+ Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
}
@Test
@@ -321,7 +322,7 @@ public class BucketsTest {
path,
new VerifyingBucketAssigner(timestamp, watermark, processingTime),
new DefaultBucketFactoryImpl<>(),
- new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+ new RowWiseBucketWriter<>(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
DefaultRollingPolicy.builder().build(),
null,
2,
@@ -458,7 +459,7 @@ public class BucketsTest {
basePath,
new TestUtils.StringIdentityBucketAssigner(),
new DefaultBucketFactoryImpl<>(),
- new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+ new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicy,
bucketLifeCycleListener,
subtaskIdx,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
index 59ea627..2a4da34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/RollingPolicyTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions.sink.filesystem;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
@@ -201,7 +202,7 @@ public class RollingPolicyTest {
basePath,
new TestUtils.StringIdentityBucketAssigner(),
new DefaultBucketFactoryImpl<>(),
- new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
+ new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), new SimpleStringEncoder<>()),
rollingPolicyToTest,
null,
0,
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
index c540dc73..df678c5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/TestUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -50,6 +49,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
+
/**
* Utilities for the {@link StreamingFileSink} tests.
*/
@@ -158,7 +159,7 @@ public class TestUtils {
.forBulkFormat(new Path(outDir.toURI()), writer)
.withBucketAssigner(bucketer)
.withBucketCheckInterval(bucketCheckInterval)
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .withRollingPolicy(build())
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)
.build();
@@ -199,7 +200,7 @@ public class TestUtils {
StreamingFileSink<Tuple2<String, Integer>> sink = StreamingFileSink
.forBulkFormat(new Path(outDir.toURI()), writer)
.withNewBucketAssigner(bucketer)
- .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .withRollingPolicy(build())
.withBucketCheckInterval(bucketCheckInterval)
.withBucketFactory(bucketFactory)
.withOutputFileConfig(outputFileConfig)
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
index e21da2a..6260a2c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/utils/NoOpRecoverableWriter.java
@@ -50,7 +50,7 @@ public class NoOpRecoverableWriter implements RecoverableWriter {
@Override
public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
- throw new UnsupportedOperationException();
+ return false;
}
@Override