You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/09/03 13:20:06 UTC

[flink] branch release-1.8 updated: [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 94ca735  [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore.
94ca735 is described below

commit 94ca735b9d5a3861ccd7c5deb54c7ca6d67300c3
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Sep 2 14:35:57 2019 +0200

    [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore.
---
 .../api/functions/sink/filesystem/Bucket.java      | 19 +++++++++++--------
 .../api/functions/sink/filesystem/BucketTest.java  | 22 ----------------------
 2 files changed, 11 insertions(+), 30 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 3252d9c..cc6726b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -146,10 +146,6 @@ public class Bucket<IN, BucketID> {
 
 			fsWriter.recoverForCommit(resumable).commitAfterRecovery();
 		}
-
-		if (fsWriter.requiresCleanupOfRecoverableState()) {
-			fsWriter.cleanupRecoverableState(resumable);
-		}
 	}
 
 	private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
@@ -312,12 +308,19 @@ public class Bucket<IN, BucketID> {
 
 		while (it.hasNext()) {
 			final ResumeRecoverable recoverable = it.next().getValue();
-			final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
-			it.remove();
 
-			if (LOG.isDebugEnabled() && successfullyDeleted) {
-				LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
+			// this check is redundant, as we only put entries in the resumablesPerCheckpoint map
+			// list when the requiresCleanupOfRecoverableState() returns true, but having it makes
+			// the code more readable.
+
+			if (fsWriter.requiresCleanupOfRecoverableState()) {
+				final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
+
+				if (LOG.isDebugEnabled() && successfullyDeleted) {
+					LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
+				}
 			}
+			it.remove();
 		}
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 308bc31..a2e4582 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -99,28 +99,6 @@ public class BucketTest {
 	}
 
 	@Test
-	public void shouldCleanupResumableAfterRestoring() throws Exception {
-		final File outDir = TEMP_FOLDER.newFolder();
-		final Path path = new Path(outDir.toURI());
-
-		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
-		final Bucket<String, String> bucketUnderTest =
-				createBucket(recoverableWriter, path, 0, 0);
-
-		bucketUnderTest.write("test-element", 0L);
-
-		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
-		assertThat(state, hasActiveInProgressFile());
-
-		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
-
-		final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path);
-		restoreBucket(newRecoverableWriter, 0, 1, state);
-
-		assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1
-	}
-
-	@Test
 	public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
 		final File outDir = TEMP_FOLDER.newFolder();
 		final Path path = new Path(outDir.toURI());