You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/09/03 13:20:06 UTC
[flink] branch release-1.8 updated: [FLINK-13941][fs-connector] Do
not delete partial part files from S3 upon restore.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push:
new 94ca735 [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore.
94ca735 is described below
commit 94ca735b9d5a3861ccd7c5deb54c7ca6d67300c3
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Sep 2 14:35:57 2019 +0200
[FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore.
---
.../api/functions/sink/filesystem/Bucket.java | 19 +++++++++++--------
.../api/functions/sink/filesystem/BucketTest.java | 22 ----------------------
2 files changed, 11 insertions(+), 30 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 3252d9c..cc6726b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -146,10 +146,6 @@ public class Bucket<IN, BucketID> {
fsWriter.recoverForCommit(resumable).commitAfterRecovery();
}
-
- if (fsWriter.requiresCleanupOfRecoverableState()) {
- fsWriter.cleanupRecoverableState(resumable);
- }
}
private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
@@ -312,12 +308,19 @@ public class Bucket<IN, BucketID> {
while (it.hasNext()) {
final ResumeRecoverable recoverable = it.next().getValue();
- final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
- it.remove();
- if (LOG.isDebugEnabled() && successfullyDeleted) {
- LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
+ // this check is redundant, as we only put entries in the resumablesPerCheckpoint map
+ // list when the requiresCleanupOfRecoverableState() returns true, but having it makes
+ // the code more readable.
+
+ if (fsWriter.requiresCleanupOfRecoverableState()) {
+ final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
+
+ if (LOG.isDebugEnabled() && successfullyDeleted) {
+ LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
+ }
}
+ it.remove();
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 308bc31..a2e4582 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -99,28 +99,6 @@ public class BucketTest {
}
@Test
- public void shouldCleanupResumableAfterRestoring() throws Exception {
- final File outDir = TEMP_FOLDER.newFolder();
- final Path path = new Path(outDir.toURI());
-
- final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
- final Bucket<String, String> bucketUnderTest =
- createBucket(recoverableWriter, path, 0, 0);
-
- bucketUnderTest.write("test-element", 0L);
-
- final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
- assertThat(state, hasActiveInProgressFile());
-
- bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
-
- final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path);
- restoreBucket(newRecoverableWriter, 0, 1, state);
-
- assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1
- }
-
- @Test
public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
final Path path = new Path(outDir.toURI());