You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/09/03 13:28:42 UTC
[flink] branch master updated: [FLINK-13941][fs-connector] Do not
delete partial part files from S3 upon restore.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0147cf6 [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore.
0147cf6 is described below
commit 0147cf601701c87dd330898f68b47939f2ef1226
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Sep 2 14:35:57 2019 +0200
[FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore.
---
.../api/functions/sink/filesystem/Bucket.java | 19 +++++++++++--------
.../api/functions/sink/filesystem/BucketTest.java | 22 ----------------------
2 files changed, 11 insertions(+), 30 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 5fe535d..4a996e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -150,10 +150,6 @@ public class Bucket<IN, BucketID> {
fsWriter.recoverForCommit(resumable).commitAfterRecovery();
}
-
- if (fsWriter.requiresCleanupOfRecoverableState()) {
- fsWriter.cleanupRecoverableState(resumable);
- }
}
private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
@@ -316,12 +312,19 @@ public class Bucket<IN, BucketID> {
while (it.hasNext()) {
final ResumeRecoverable recoverable = it.next().getValue();
- final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
- it.remove();
- if (LOG.isDebugEnabled() && successfullyDeleted) {
- LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
+ // this check is redundant, as we only put entries in the resumablesPerCheckpoint map
+ // list when the requiresCleanupOfRecoverableState() returns true, but having it makes
+ // the code more readable.
+
+ if (fsWriter.requiresCleanupOfRecoverableState()) {
+ final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
+
+ if (LOG.isDebugEnabled() && successfullyDeleted) {
+ LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
+ }
}
+ it.remove();
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 546a08c..583bacf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -99,28 +99,6 @@ public class BucketTest {
}
@Test
- public void shouldCleanupResumableAfterRestoring() throws Exception {
- final File outDir = TEMP_FOLDER.newFolder();
- final Path path = new Path(outDir.toURI());
-
- final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
- final Bucket<String, String> bucketUnderTest =
- createBucket(recoverableWriter, path, 0, 0, new PartFileConfig());
-
- bucketUnderTest.write("test-element", 0L);
-
- final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
- assertThat(state, hasActiveInProgressFile());
-
- bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
-
- final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path);
- restoreBucket(newRecoverableWriter, 0, 1, state, new PartFileConfig());
-
- assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1
- }
-
- @Test
public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
final Path path = new Path(outDir.toURI());