You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/04 16:28:05 UTC

[flink] branch release-1.7 updated (1034f3e -> 5de5078)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 1034f3e  [FLINK-11044] [docs] Fix registerTableSink docs
     new 60f21cc  [hotfix][fs-connector] Refactor PartFileWriter to take stream.
     new 1ef6bf8  [hotfix][fs-connector] Refactor Bucket to statically import Preconditions.
     new 3514c52  [hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.
     new a029f33  [hotfix] Consolidated all S3 accesses under the S3AccessHelper.
     new e32e233  [hotfix] Method renaming in the RecoverableMultiPartUploadImpl.
     new 5de5078  [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files.

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/core/fs/RecoverableWriter.java    |  28 +++
 .../core/fs/local/LocalRecoverableWriter.java      |  10 +
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java   |  10 +
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |   6 +-
 .../flink/fs/s3/common/FlinkS3FileSystem.java      |  12 +-
 .../writer/RecoverableMultiPartUploadImpl.java     |  49 ++--
 ...3MultiPartUploader.java => S3AccessHelper.java} |  33 ++-
 .../flink/fs/s3/common/writer/S3Committer.java     |  12 +-
 .../S3RecoverableMultipartUploadFactory.java       |  53 +----
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  38 ++-
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |   4 +-
 .../s3/common/writer/IncompletePartPrefixTest.java |  12 +-
 .../writer/RecoverableMultiPartUploadImplTest.java |  20 +-
 ...PartUploader.java => HadoopS3AccessHelper.java} |  64 +++--
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |   6 +-
 .../s3hadoop/HadoopS3RecoverableWriterITCase.java  |  46 ++++
 .../flink/fs/s3presto/S3FileSystemFactory.java     |   4 +-
 .../api/functions/sink/filesystem/Bucket.java      | 114 ++++++---
 .../functions/sink/filesystem/BulkPartWriter.java  |  10 +-
 .../functions/sink/filesystem/PartFileWriter.java  |   8 +-
 .../sink/filesystem/RowWisePartWriter.java         |  10 +-
 .../api/functions/sink/filesystem/BucketTest.java  | 263 +++++++++++++++++++++
 22 files changed, 629 insertions(+), 183 deletions(-)
 rename flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/{S3MultiPartUploader.java => S3AccessHelper.java} (78%)
 rename flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/{HadoopS3MultiPartUploader.java => HadoopS3AccessHelper.java} (58%)
 create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java


[flink] 02/06: [hotfix][fs-connector] Refactor Bucket to statically import Preconditions.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ef6bf87952861c9643a280f4fe106f94a69ffa8
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 21 10:43:55 2018 +0100

    [hotfix][fs-connector] Refactor Bucket to statically import Preconditions.
---
 .../api/functions/sink/filesystem/Bucket.java      | 24 ++++++++++++----------
 1 file changed, 13 insertions(+), 11 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 042bcda..8ba35b8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -23,7 +23,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +37,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * A bucket is the directory organization of the output of the {@link StreamingFileSink}.
  *
@@ -84,13 +86,13 @@ public class Bucket<IN, BucketID> {
 			final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
 			final RollingPolicy<IN, BucketID> rollingPolicy) {
 
-		this.fsWriter = Preconditions.checkNotNull(fsWriter);
+		this.fsWriter = checkNotNull(fsWriter);
 		this.subtaskIndex = subtaskIndex;
-		this.bucketId = Preconditions.checkNotNull(bucketId);
-		this.bucketPath = Preconditions.checkNotNull(bucketPath);
+		this.bucketId = checkNotNull(bucketId);
+		this.bucketPath = checkNotNull(bucketPath);
 		this.partCounter = initialPartCounter;
-		this.partFileFactory = Preconditions.checkNotNull(partFileFactory);
-		this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
+		this.partFileFactory = checkNotNull(partFileFactory);
+		this.rollingPolicy = checkNotNull(rollingPolicy);
 
 		this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
 		this.pendingPartsPerCheckpoint = new HashMap<>();
@@ -158,8 +160,8 @@ public class Bucket<IN, BucketID> {
 	}
 
 	void merge(final Bucket<IN, BucketID> bucket) throws IOException {
-		Preconditions.checkNotNull(bucket);
-		Preconditions.checkState(Objects.equals(bucket.bucketPath, bucketPath));
+		checkNotNull(bucket);
+		checkState(Objects.equals(bucket.bucketPath, bucketPath));
 
 		// There should be no pending files in the "to-merge" states.
 		// The reason is that:
@@ -167,8 +169,8 @@ public class Bucket<IN, BucketID> {
 		//    So a snapshot, including the one we are recovering from, will never contain such files.
 		// 2) the files in pendingPartsPerCheckpoint are committed upon recovery (see commitRecoveredPendingFiles()).
 
-		Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
-		Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
+		checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
+		checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
 
 		RecoverableWriter.CommitRecoverable committable = bucket.closePartFile();
 		if (committable != null) {
@@ -257,7 +259,7 @@ public class Bucket<IN, BucketID> {
 	}
 
 	void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
-		Preconditions.checkNotNull(fsWriter);
+		checkNotNull(fsWriter);
 
 		Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
 				pendingPartsPerCheckpoint.entrySet().iterator();


[flink] 05/06: [hotfix] Method renaming in the RecoverableMultiPartUploadImpl.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e32e2338a66ef73e9f237315119325cf8c18fb94
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 22 10:54:05 2018 +0100

    [hotfix] Method renaming in the RecoverableMultiPartUploadImpl.
---
 .../s3/common/writer/RecoverableMultiPartUploadImpl.java   | 14 +++++++-------
 .../fs/s3/common/writer/IncompletePartPrefixTest.java      | 12 ++++++------
 2 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 9f0a811..787f286 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -84,7 +84,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		this.s3AccessHelper = checkNotNull(s3AccessHelper);
 		this.uploadThreadPool = checkNotNull(uploadThreadPool);
 		this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
-		this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName);
+		this.namePrefixForTempObjects = createIncompletePartObjectNamePrefix(objectName);
 		this.uploadsInProgress = new ArrayDeque<>();
 	}
 
@@ -171,7 +171,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		}
 
 		// first, upload the trailing data file. during that time, other in-progress uploads may complete.
-		final String incompletePartObjectName = createTmpObjectName();
+		final String incompletePartObjectName = createIncompletePartObjectName();
 		file.retain();
 		try (InputStream inputStream = file.getInputStream()) {
 
@@ -192,7 +192,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	// ------------------------------------------------------------------------
 
 	@VisibleForTesting
-	static String incompleteObjectNamePrefix(String objectName) {
+	static String createIncompletePartObjectNamePrefix(String objectName) {
 		checkNotNull(objectName);
 
 		final int lastSlash = objectName.lastIndexOf('/');
@@ -209,6 +209,10 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		return parent + (child.isEmpty() ? "" : '_') + child + "_tmp_";
 	}
 
+	private String createIncompletePartObjectName() {
+		return namePrefixForTempObjects + UUID.randomUUID().toString();
+	}
+
 	private void awaitPendingPartsUpload() throws IOException {
 		checkState(currentUploadInfo.getRemainingParts() == uploadsInProgress.size());
 
@@ -235,10 +239,6 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		return completedUploadEtag;
 	}
 
-	private String createTmpObjectName() {
-		return namePrefixForTempObjects + UUID.randomUUID().toString();
-	}
-
 	// ------------------------------------------------------------------------
 	//  factory methods
 	// ------------------------------------------------------------------------
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
index a3164f1..9cee040 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/IncompletePartPrefixTest.java
@@ -22,36 +22,36 @@ import org.junit.Assert;
 import org.junit.Test;
 
 /**
- * Tests for the {@link RecoverableMultiPartUploadImpl#incompleteObjectNamePrefix(String)}.
+ * Tests for the {@link RecoverableMultiPartUploadImpl#createIncompletePartObjectNamePrefix(String)}.
  */
 public class IncompletePartPrefixTest {
 
 	@Test(expected = NullPointerException.class)
 	public void nullObjectNameShouldThroughException() {
-		RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix(null);
+		RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix(null);
 	}
 
 	@Test
 	public void emptyInitialNameShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("");
 		Assert.assertEquals("_tmp_", objectNamePrefix);
 	}
 
 	@Test
 	public void nameWithoutSlashShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("no_slash_path");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("no_slash_path");
 		Assert.assertEquals("_no_slash_path_tmp_", objectNamePrefix);
 	}
 
 	@Test
 	public void nameWithOnlySlashShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("/");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("/");
 		Assert.assertEquals("/_tmp_", objectNamePrefix);
 	}
 
 	@Test
 	public void normalPathShouldSucceed() {
-		String objectNamePrefix = RecoverableMultiPartUploadImpl.incompleteObjectNamePrefix("/root/home/test-file");
+		String objectNamePrefix = RecoverableMultiPartUploadImpl.createIncompletePartObjectNamePrefix("/root/home/test-file");
 		Assert.assertEquals("/root/home/_test-file_tmp_", objectNamePrefix);
 	}
 }


[flink] 03/06: [hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3514c52d7539dedac30df2d42fdc05b79f0e4551
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 21 11:11:10 2018 +0100

    [hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.
---
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |  6 ++---
 .../flink/fs/s3/common/FlinkS3FileSystem.java      | 12 ++++-----
 .../writer/RecoverableMultiPartUploadImpl.java     | 30 +++++++++++-----------
 ...3MultiPartUploader.java => S3AccessHelper.java} |  2 +-
 .../flink/fs/s3/common/writer/S3Committer.java     | 12 ++++-----
 .../S3RecoverableMultipartUploadFactory.java       | 10 ++++----
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  4 +--
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |  4 +--
 .../writer/RecoverableMultiPartUploadImplTest.java |  4 +--
 ...PartUploader.java => HadoopS3AccessHelper.java} | 26 +++++++++----------
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |  6 ++---
 .../flink/fs/s3presto/S3FileSystemFactory.java     |  4 +--
 12 files changed, 60 insertions(+), 60 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 318fd39..6ccdeae 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,7 +141,7 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 			final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
 			final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
 			final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
-			final S3MultiPartUploader s3AccessHelper = getS3AccessHelper(fs);
+			final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);
 
 			return new FlinkS3FileSystem(
 					fs,
@@ -166,6 +166,6 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 		URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
 
 	@Nullable
-	protected abstract S3MultiPartUploader getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
+	protected abstract S3AccessHelper getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
 }
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 553edde..5248e06 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -60,7 +60,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;
 
 	@Nullable
-	private final S3MultiPartUploader s3UploadHelper;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final Executor uploadThreadPool;
 
@@ -83,7 +83,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 			String localTmpDirectory,
 			@Nullable String entropyInjectionKey,
 			int entropyLength,
-			@Nullable S3MultiPartUploader s3UploadHelper,
+			@Nullable S3AccessHelper s3UploadHelper,
 			long s3uploadPartSize,
 			int maxConcurrentUploadsPerStream) {
 
@@ -99,7 +99,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 		// recoverable writer parameter configuration initialization
 		this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
 		this.tmpFileCreator = RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
-		this.s3UploadHelper = s3UploadHelper;
+		this.s3AccessHelper = s3UploadHelper;
 		this.uploadThreadPool = Executors.newCachedThreadPool();
 
 		Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
@@ -131,7 +131,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 
 	@Override
 	public RecoverableWriter createRecoverableWriter() throws IOException {
-		if (s3UploadHelper == null) {
+		if (s3AccessHelper == null) {
 			// this is the case for Presto
 			throw new UnsupportedOperationException("This s3 file system implementation does not support recoverable writers.");
 		}
@@ -139,7 +139,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 		return S3RecoverableWriter.writer(
 				getHadoopFileSystem(),
 				tmpFileCreator,
-				s3UploadHelper,
+				s3AccessHelper,
 				uploadThreadPool,
 				s3uploadPartSize,
 				maxConcurrentUploadsPerStream);
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 80042ce..fe2a4cd 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -58,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 @NotThreadSafe
 final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload {
 
-	private final S3MultiPartUploader s3MPUploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final Executor uploadThreadPool;
 
@@ -71,7 +71,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	// ------------------------------------------------------------------------
 
 	private RecoverableMultiPartUploadImpl(
-			S3MultiPartUploader s3uploader,
+			S3AccessHelper s3AccessHelper,
 			Executor uploadThreadPool,
 			String uploadId,
 			String objectName,
@@ -81,7 +81,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	) {
 		checkArgument(numBytes >= 0L);
 
-		this.s3MPUploader = checkNotNull(s3uploader);
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
 		this.uploadThreadPool = checkNotNull(uploadThreadPool);
 		this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
 		this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName);
@@ -111,7 +111,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		currentUploadInfo.registerNewPart(partLength);
 
 		file.retain(); // keep the file while the async upload still runs
-		uploadThreadPool.execute(new UploadTask(s3MPUploader, currentUploadInfo, file, future));
+		uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
 	}
 
 	@Override
@@ -124,7 +124,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		final S3Recoverable snapshot = snapshotAndGetRecoverable(null);
 
 		return new S3Committer(
-				s3MPUploader,
+				s3AccessHelper,
 				snapshot.getObjectName(),
 				snapshot.uploadId(),
 				snapshot.parts(),
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			// they do not fall under the user's global TTL on S3.
 			// Figure out a way to clean them.
 
-			s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+			s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
 		}
 		finally {
 			file.release();
@@ -244,14 +244,14 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	// ------------------------------------------------------------------------
 
 	public static RecoverableMultiPartUploadImpl newUpload(
-			final S3MultiPartUploader s3uploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final String objectName) throws IOException {
 
-		final String multiPartUploadId = s3uploader.startMultiPartUpload(objectName);
+		final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName);
 
 		return new RecoverableMultiPartUploadImpl(
-				s3uploader,
+				s3AccessHelper,
 				uploadThreadPool,
 				multiPartUploadId,
 				objectName,
@@ -261,7 +261,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	}
 
 	public static RecoverableMultiPartUploadImpl recoverUpload(
-			final S3MultiPartUploader s3uploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final String multipartUploadId,
 			final String objectName,
@@ -270,7 +270,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			final Optional<File> incompletePart) {
 
 		return new RecoverableMultiPartUploadImpl(
-				s3uploader,
+				s3AccessHelper,
 				uploadThreadPool,
 				multipartUploadId,
 				objectName,
@@ -286,7 +286,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 
 	private static class UploadTask implements Runnable {
 
-		private final S3MultiPartUploader s3uploader;
+		private final S3AccessHelper s3AccessHelper;
 
 		private final String objectName;
 
@@ -299,7 +299,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		private final CompletableFuture<PartETag> future;
 
 		UploadTask(
-				final S3MultiPartUploader s3uploader,
+				final S3AccessHelper s3AccessHelper,
 				final MultiPartUploadInfo currentUpload,
 				final RefCountedFSOutputStream file,
 				final CompletableFuture<PartETag> future) {
@@ -313,7 +313,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			// these are limits put by Amazon
 			checkArgument(partNumber >= 1  && partNumber <= 10_000);
 
-			this.s3uploader = checkNotNull(s3uploader);
+			this.s3AccessHelper = checkNotNull(s3AccessHelper);
 			this.file = checkNotNull(file);
 			this.future = checkNotNull(future);
 		}
@@ -321,7 +321,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		@Override
 		public void run() {
 			try (final InputStream inputStream = file.getInputStream()) {
-				final UploadPartResult result = s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
+				final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
 				future.complete(new PartETag(result.getPartNumber(), result.getETag()));
 				file.release();
 			}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
similarity index 99%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index da227a4..57920a5 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * the upload with all its parts will be either committed or discarded.
  */
 @Internal
-public interface S3MultiPartUploader {
+public interface S3AccessHelper {
 
 	/**
 	 * Initializes a Multi-Part Upload.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
index 1fc8bf1..5fbc5bb 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
@@ -40,7 +40,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 
 	private static final Logger LOG = LoggerFactory.getLogger(S3Committer.class);
 
-	private final S3MultiPartUploader s3uploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final String uploadId;
 
@@ -50,8 +50,8 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 
 	private final long totalLength;
 
-	S3Committer(S3MultiPartUploader s3uploader, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
-		this.s3uploader = checkNotNull(s3uploader);
+	S3Committer(S3AccessHelper s3AccessHelper, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
 		this.objectName = checkNotNull(objectName);
 		this.uploadId = checkNotNull(uploadId);
 		this.parts = checkNotNull(parts);
@@ -64,7 +64,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 			LOG.info("Committing {} with MPU ID {}", objectName, uploadId);
 
 			final AtomicInteger errorCount = new AtomicInteger();
-			s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
+			s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
 
 			if (errorCount.get() == 0) {
 				LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId);
@@ -82,14 +82,14 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 			LOG.info("Trying to commit after recovery {} with MPU ID {}", objectName, uploadId);
 
 			try {
-				s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
+				s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
 			} catch (IOException e) {
 				LOG.info("Failed to commit after recovery {} with MPU ID {}. " +
 						"Checking if file was committed before...", objectName, uploadId);
 				LOG.trace("Exception when committing:", e);
 
 				try {
-					ObjectMetadata metadata = s3uploader.getObjectMetadata(objectName);
+					ObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName);
 					if (totalLength != metadata.getContentLength()) {
 						String message = String.format("Inconsistent result for object %s: conflicting lengths. " +
 										"Recovered committer for upload %s indicates %s bytes, present object is %s bytes",
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index b201981..9a171ae 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {
 
 	private final org.apache.hadoop.fs.FileSystem fs;
 
-	private final S3MultiPartUploader twoPhaseUploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;
 
@@ -53,7 +53,7 @@ final class S3RecoverableMultipartUploadFactory {
 
 	S3RecoverableMultipartUploadFactory(
 			final FileSystem fs,
-			final S3MultiPartUploader twoPhaseUploader,
+			final S3AccessHelper s3AccessHelper,
 			final int maxConcurrentUploadsPerStream,
 			final Executor executor,
 			final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {
@@ -61,14 +61,14 @@ final class S3RecoverableMultipartUploadFactory {
 		this.fs = Preconditions.checkNotNull(fs);
 		this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
 		this.executor = executor;
-		this.twoPhaseUploader = twoPhaseUploader;
+		this.s3AccessHelper = s3AccessHelper;
 		this.tmpFileSupplier = tmpFileSupplier;
 	}
 
 	RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws IOException {
 
 		return RecoverableMultiPartUploadImpl.newUpload(
-				twoPhaseUploader,
+				s3AccessHelper,
 				limitedExecutor(),
 				pathToObjectName(path));
 	}
@@ -77,7 +77,7 @@ final class S3RecoverableMultipartUploadFactory {
 		final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
 
 		return RecoverableMultiPartUploadImpl.recoverUpload(
-				twoPhaseUploader,
+				s3AccessHelper,
 				limitedExecutor(),
 				recoverable.uploadId(),
 				recoverable.getObjectName(),
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 2a84308..698f65f 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -129,7 +129,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
 	public static S3RecoverableWriter writer(
 			final FileSystem fs,
 			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
-			final S3MultiPartUploader twoPhaseUploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final long userDefinedMinPartSize,
 			final int maxConcurrentUploadsPerStream) {
@@ -139,7 +139,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
 		final S3RecoverableMultipartUploadFactory uploadFactory =
 				new S3RecoverableMultipartUploadFactory(
 						fs,
-						twoPhaseUploader,
+						s3AccessHelper,
 						maxConcurrentUploadsPerStream,
 						uploadThreadPool,
 						tempFileCreator);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index d3d25c3..5b15652 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -78,7 +78,7 @@ public class S3EntropyFsFactoryTest extends TestLogger {
 
 		@Nullable
 		@Override
-		protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+		protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 			return null;
 		}
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 72554e1..4c2f147 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -339,10 +339,10 @@ public class RecoverableMultiPartUploadImplTest {
 	}
 
 	/**
-	 * A {@link S3MultiPartUploader} that simulates uploading part files to S3 by
+	 * A {@link S3AccessHelper} that simulates uploading part files to S3 by
 	 * simply putting complete and incomplete part files in lists for further validation.
 	 */
-	private static class StubMultiPartUploader implements S3MultiPartUploader {
+	private static class StubMultiPartUploader implements S3AccessHelper {
 
 		private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
 		private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
similarity index 77%
rename from flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f446f70..f833471 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.fs.s3hadoop;
 
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.MathUtils;
 
 import com.amazonaws.SdkBaseException;
@@ -43,16 +43,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An implementation of the {@link S3MultiPartUploader} for the Hadoop S3A filesystem.
+ * An implementation of the {@link S3AccessHelper} for the Hadoop S3A filesystem.
  */
-public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
+public class HadoopS3AccessHelper implements S3AccessHelper {
 
 	private final S3AFileSystem s3a;
 
-	private final InternalWriteOperationHelper s3uploader;
+	private final InternalWriteOperationHelper s3accessHelper;
 
-	public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) {
-		this.s3uploader = new InternalWriteOperationHelper(
+	public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) {
+		this.s3accessHelper = new InternalWriteOperationHelper(
 				checkNotNull(s3a),
 				checkNotNull(conf)
 		);
@@ -61,25 +61,25 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
 
 	@Override
 	public String startMultiPartUpload(String key) throws IOException {
-		return s3uploader.initiateMultiPartUpload(key);
+		return s3accessHelper.initiateMultiPartUpload(key);
 	}
 
 	@Override
 	public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException {
-		final UploadPartRequest uploadRequest = s3uploader.newUploadPartRequest(
+		final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
 				key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L);
-		return s3uploader.uploadPart(uploadRequest);
+		return s3accessHelper.uploadPart(uploadRequest);
 	}
 
 	@Override
 	public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
-		final PutObjectRequest putRequest = s3uploader.createPutObjectRequest(key, inputStream, length);
-		return s3uploader.putObject(putRequest);
+		final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
+		return s3accessHelper.putObject(putRequest);
 	}
 
 	@Override
 	public CompleteMultipartUploadResult commitMultiPartUpload(String destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException {
-		return s3uploader.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
+		return s3accessHelper.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
 	}
 
 	@Override
@@ -94,7 +94,7 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
 
 	/**
 	 * Internal {@link WriteOperationHelper} that is wrapped so that it only exposes
-	 * the functionality we need for the {@link S3MultiPartUploader}.
+	 * the functionality we need for the {@link S3AccessHelper}.
 	 */
 	private static final class InternalWriteOperationHelper extends WriteOperationHelper {
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 897629f..2637e7b 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3hadoop;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -96,8 +96,8 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
 
 	@Nullable
 	@Override
-	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+	protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 		final S3AFileSystem s3Afs = (S3AFileSystem) fs;
-		return new HadoopS3MultiPartUploader(s3Afs, s3Afs.getConf());
+		return new HadoopS3AccessHelper(s3Afs, s3Afs.getConf());
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index b579d6e..0fb2857 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
@@ -92,7 +92,7 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
 
 	@Nullable
 	@Override
-	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+	protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 		return null;
 	}
 


[flink] 06/06: [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5de507835b2b9a93376820b79a435b8efe53b8a6
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 22 11:03:13 2018 +0100

    [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files.
---
 .../apache/flink/core/fs/RecoverableWriter.java    |  28 +++
 .../core/fs/local/LocalRecoverableWriter.java      |  10 +
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java   |  10 +
 .../writer/RecoverableMultiPartUploadImpl.java     |   5 -
 .../flink/fs/s3/common/writer/S3AccessHelper.java  |  10 +
 .../S3RecoverableMultipartUploadFactory.java       |   8 -
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  34 ++-
 .../writer/RecoverableMultiPartUploadImplTest.java |   5 +
 .../flink/fs/s3hadoop/HadoopS3AccessHelper.java    |  15 +-
 .../s3hadoop/HadoopS3RecoverableWriterITCase.java  |  46 ++++
 .../api/functions/sink/filesystem/Bucket.java      |  87 +++++--
 .../api/functions/sink/filesystem/BucketTest.java  | 263 +++++++++++++++++++++
 12 files changed, 473 insertions(+), 48 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
index e5bfdb8..7d54b11 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/RecoverableWriter.java
@@ -122,6 +122,34 @@ public interface RecoverableWriter {
 	RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException;
 
 	/**
+	 * Marks if the writer requires to do any additional cleanup/freeing of resources occupied
+	 * as part of a {@link ResumeRecoverable}, e.g. temporarily files created or objects uploaded
+	 * to external systems.
+	 *
+	 * <p>In case cleanup is required, then {@link #cleanupRecoverableState(ResumeRecoverable)} should
+	 * be called.
+	 *
+	 * @return {@code true} if cleanup is required, {@code false} otherwise.
+	 */
+	boolean requiresCleanupOfRecoverableState();
+
+	/**
+	 * Frees up any resources that were previously occupied in order to be able to
+	 * recover from a (potential) failure. These can be temporary files that were written
+	 * to the filesystem or objects that were uploaded to S3.
+	 *
+	 * <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already
+	 * been cleaned up and the resources have been freed. But the contract is that it will throw
+	 * an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter}
+	 * whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}.
+	 *
+	 * @param resumable The {@link ResumeRecoverable} whose state we want to clean-up.
+	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+	 * (e.g. the file to be deleted was not there for any reason - already deleted or never created).
+	 */
+	boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException;
+
+	/**
 	 * Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable
 	 * for finalizing and committing. This will publish the target file with exactly the data
 	 * that was written up to the point then the CommitRecoverable was created.
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
index a2f0485..a43e0b6 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalRecoverableWriter.java
@@ -71,6 +71,16 @@ public class LocalRecoverableWriter implements RecoverableWriter {
 	}
 
 	@Override
+	public boolean requiresCleanupOfRecoverableState() {
+		return false;
+	}
+
+	@Override
+	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
 		if (recoverable instanceof LocalRecoverable) {
 			return new LocalRecoverableFsDataOutputStream.LocalCommitter((LocalRecoverable) recoverable);
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index 305f8ee..03d741b 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -78,6 +78,16 @@ public class HadoopRecoverableWriter implements RecoverableWriter {
 	}
 
 	@Override
+	public boolean requiresCleanupOfRecoverableState() {
+		return false;
+	}
+
+	@Override
+	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
 	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
 		if (recoverable instanceof HadoopFsRecoverable) {
 			return new HadoopRecoverableFsDataOutputStream.HadoopFsCommitter(fs, (HadoopFsRecoverable) recoverable);
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 787f286..9d88e65 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -174,11 +174,6 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		final String incompletePartObjectName = createIncompletePartObjectName();
 		file.retain();
 		try (InputStream inputStream = file.getInputStream()) {
-
-			// TODO: staged incomplete parts are not cleaned up as
-			// they do not fall under the user's global TTL on S3.
-			// Figure out a way to clean them.
-
 			s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos());
 		}
 		finally {
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index dbc099a..bcdea3c 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -93,6 +93,16 @@ public interface S3AccessHelper {
 	CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException;
 
 	/**
+	 * Deletes the object associated with the provided key.
+	 *
+	 * @param key The key to be deleted.
+	 * @return {@code true} if the resources were successfully freed, {@code false} otherwise
+	 * (e.g. the file to be deleted was not there).
+	 * @throws IOException
+	 */
+	boolean deleteObject(String key) throws IOException;
+
+	/**
 	 * Gets the object associated with the provided {@code key} from S3 and
 	 * puts it in the provided {@code targetLocation}.
 	 *
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index ddb09ab..3727e25 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -96,14 +96,6 @@ final class S3RecoverableMultipartUploadFactory {
 		final File file = refCountedFile.getFile();
 		final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
-		// some sanity checks
-		if (numBytes != file.length()) {
-			throw new IOException(String.format("Error recovering writer: " +
-							"Downloading the last data chunk file gives incorrect length. " +
-							"File=%d bytes, Stream=%d bytes",
-					file.length(), numBytes));
-		}
-
 		if (numBytes != recoverable.incompleteObjectLength()) {
 			throw new IOException(String.format("Error recovering writer: " +
 							"Downloading the last data chunk file gives incorrect length." +
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 698f65f..ddb4443 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -26,7 +26,6 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream.Committer;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +36,7 @@ import java.util.concurrent.Executor;
 
 import static org.apache.flink.fs.s3.common.FlinkS3FileSystem.S3_MULTIPART_MIN_PART_SIZE;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of the {@link RecoverableWriter} against S3.
@@ -54,16 +54,20 @@ public class S3RecoverableWriter implements RecoverableWriter {
 
 	private final long userDefinedMinPartSize;
 
+	private final S3AccessHelper s3AccessHelper;
+
 	private final S3RecoverableMultipartUploadFactory uploadFactory;
 
 	@VisibleForTesting
 	S3RecoverableWriter(
+			final S3AccessHelper s3AccessHelper,
 			final S3RecoverableMultipartUploadFactory uploadFactory,
 			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
 			final long userDefinedMinPartSize) {
 
-		this.uploadFactory = Preconditions.checkNotNull(uploadFactory);
-		this.tempFileCreator = Preconditions.checkNotNull(tempFileCreator);
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
+		this.uploadFactory = checkNotNull(uploadFactory);
+		this.tempFileCreator = checkNotNull(tempFileCreator);
 		this.userDefinedMinPartSize = userDefinedMinPartSize;
 	}
 
@@ -78,14 +82,14 @@ public class S3RecoverableWriter implements RecoverableWriter {
 	}
 
 	@Override
-	public Committer recoverForCommit(RecoverableWriter.CommitRecoverable recoverable) throws IOException {
+	public Committer recoverForCommit(CommitRecoverable recoverable) throws IOException {
 		final S3Recoverable s3recoverable = castToS3Recoverable(recoverable);
 		final S3RecoverableFsDataOutputStream recovered = recover(s3recoverable);
 		return recovered.closeForCommit();
 	}
 
 	@Override
-	public S3RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable recoverable) throws IOException {
+	public S3RecoverableFsDataOutputStream recover(ResumeRecoverable recoverable) throws IOException {
 		final S3Recoverable s3recoverable = castToS3Recoverable(recoverable);
 
 		final RecoverableMultiPartUpload upload = uploadFactory.recoverRecoverableUpload(s3recoverable);
@@ -98,14 +102,26 @@ public class S3RecoverableWriter implements RecoverableWriter {
 	}
 
 	@Override
+	public boolean requiresCleanupOfRecoverableState() {
+		return true;
+	}
+
+	@Override
+	public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+		final S3Recoverable s3recoverable = castToS3Recoverable(resumable);
+		final String smallPartObjectToDelete = s3recoverable.incompleteObjectName();
+		return smallPartObjectToDelete != null && s3AccessHelper.deleteObject(smallPartObjectToDelete);
+	}
+
+	@Override
 	@SuppressWarnings({"rawtypes", "unchecked"})
-	public SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitRecoverableSerializer() {
+	public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
 		return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE;
 	}
 
 	@Override
 	@SuppressWarnings({"rawtypes", "unchecked"})
-	public SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumeRecoverableSerializer() {
+	public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
 		return (SimpleVersionedSerializer) S3RecoverableSerializer.INSTANCE;
 	}
 
@@ -116,7 +132,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
 
 	// --------------------------- Utils ---------------------------
 
-	private static S3Recoverable castToS3Recoverable(RecoverableWriter.CommitRecoverable recoverable) {
+	private static S3Recoverable castToS3Recoverable(CommitRecoverable recoverable) {
 		if (recoverable instanceof S3Recoverable) {
 			return (S3Recoverable) recoverable;
 		}
@@ -144,6 +160,6 @@ public class S3RecoverableWriter implements RecoverableWriter {
 						uploadThreadPool,
 						tempFileCreator);
 
-		return new S3RecoverableWriter(uploadFactory, tempFileCreator, userDefinedMinPartSize);
+		return new S3RecoverableWriter(s3AccessHelper, uploadFactory, tempFileCreator, userDefinedMinPartSize);
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index a986111..673796d 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -373,6 +373,11 @@ public class RecoverableMultiPartUploadImplTest {
 		}
 
 		@Override
+		public boolean deleteObject(String key) throws IOException {
+			return false;
+		}
+
+		@Override
 		public long getObject(String key, File targetLocation) throws IOException {
 			return 0;
 		}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index 473439c..b9612ad 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -86,6 +86,11 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
 	}
 
 	@Override
+	public boolean deleteObject(String key) throws IOException {
+		return s3a.delete(new org.apache.hadoop.fs.Path('/' + key), false);
+	}
+
+	@Override
 	public long getObject(String key, File targetLocation) throws IOException {
 		long numBytes = 0L;
 		try (
@@ -96,12 +101,20 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
 			final byte[] buffer = new byte[32 * 1024];
 
 			int numRead;
-			while ((numRead = inStream.read(buffer)) > 0) {
+			while ((numRead = inStream.read(buffer)) != -1) {
 				outStream.write(buffer, 0, numRead);
 				numBytes += numRead;
 			}
 		}
 
+		// some sanity checks
+		if (numBytes != targetLocation.length()) {
+			throw new IOException(String.format("Error recovering writer: " +
+							"Downloading the last data chunk file gives incorrect length. " +
+							"File=%d bytes, Stream=%d bytes",
+					targetLocation.length(), numBytes));
+		}
+
 		return numBytes;
 	}
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
index 17fb02b..6c8619d 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3RecoverableWriterITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.fs.s3.common.writer.S3Recoverable;
 import org.apache.flink.testutils.s3.S3TestCredentials;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.StringUtils;
@@ -42,6 +43,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.BufferedReader;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
@@ -213,6 +215,50 @@ public class HadoopS3RecoverableWriterITCase extends TestLogger {
 		Assert.assertEquals(testData1 + testData2, getContentsOfFile(path));
 	}
 
+	@Test(expected = FileNotFoundException.class)
+	public void testCleanupRecoverableState() throws Exception {
+		final RecoverableWriter writer = getRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(bytesOf(testData1));
+		S3Recoverable recoverable = (S3Recoverable) stream.persist();
+
+		stream.closeForCommit().commit();
+
+		// still the data is there as we have not deleted them from the tmp object
+		final String content = getContentsOfFile(new Path('/' + recoverable.incompleteObjectName()));
+		Assert.assertEquals(testData1, content);
+
+		boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable);
+		Assert.assertTrue(successfullyDeletedState);
+
+		// this should throw the exception as we deleted the file.
+		getContentsOfFile(new Path('/' + recoverable.incompleteObjectName()));
+	}
+
+	@Test
+	public void testCallingDeleteObjectTwiceDoesNotThroughException() throws Exception {
+		final RecoverableWriter writer = getRecoverableWriter();
+		final Path path = new Path(basePathForTest, "part-0");
+
+		final RecoverableFsDataOutputStream stream = writer.open(path);
+		stream.write(bytesOf(testData1));
+		S3Recoverable recoverable = (S3Recoverable) stream.persist();
+
+		stream.closeForCommit().commit();
+
+		// still the data is there as we have not deleted them from the tmp object
+		final String content = getContentsOfFile(new Path('/' + recoverable.incompleteObjectName()));
+		Assert.assertEquals(testData1, content);
+
+		boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable);
+		Assert.assertTrue(successfullyDeletedState);
+
+		boolean unsuccessfulDeletion = writer.cleanupRecoverableState(recoverable);
+		Assert.assertFalse(unsuccessfulDeletion);
+	}
+
 	// ----------------------- Test Recovery -----------------------
 
 	@Test
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 8ba35b8..b59c84e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -23,6 +23,8 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,11 +33,12 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.Objects;
+import java.util.TreeMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -65,14 +68,16 @@ public class Bucket<IN, BucketID> {
 
 	private final RollingPolicy<IN, BucketID> rollingPolicy;
 
-	private final Map<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint;
+	private final NavigableMap<Long, ResumeRecoverable> resumablesPerCheckpoint;
+
+	private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;
 
 	private long partCounter;
 
 	@Nullable
 	private PartFileWriter<IN, BucketID> inProgressPart;
 
-	private List<RecoverableWriter.CommitRecoverable> pendingPartsForCurrentCheckpoint;
+	private List<CommitRecoverable> pendingPartsForCurrentCheckpoint;
 
 	/**
 	 * Constructor to create a new empty bucket.
@@ -95,7 +100,8 @@ public class Bucket<IN, BucketID> {
 		this.rollingPolicy = checkNotNull(rollingPolicy);
 
 		this.pendingPartsForCurrentCheckpoint = new ArrayList<>();
-		this.pendingPartsPerCheckpoint = new HashMap<>();
+		this.pendingPartsPerCheckpoint = new TreeMap<>();
+		this.resumablesPerCheckpoint = new TreeMap<>();
 	}
 
 	/**
@@ -123,21 +129,26 @@ public class Bucket<IN, BucketID> {
 	}
 
 	private void restoreInProgressFile(final BucketState<BucketID> state) throws IOException {
+		if (!state.hasInProgressResumableFile()) {
+			return;
+		}
 
 		// we try to resume the previous in-progress file
-		if (state.hasInProgressResumableFile()) {
-			final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
-			final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
-			inProgressPart = partFileFactory.resumeFrom(
-					bucketId, stream, resumable, state.getInProgressFileCreationTime());
+		final ResumeRecoverable resumable = state.getInProgressResumableFile();
+		final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
+		inProgressPart = partFileFactory.resumeFrom(
+				bucketId, stream, resumable, state.getInProgressFileCreationTime());
+
+		if (fsWriter.requiresCleanupOfRecoverableState()) {
+			fsWriter.cleanupRecoverableState(resumable);
 		}
 	}
 
 	private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException {
 
 		// we commit pending files for checkpoints that precess the last successful one, from which we are recovering
-		for (List<RecoverableWriter.CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
-			for (RecoverableWriter.CommitRecoverable committable: committables) {
+		for (List<CommitRecoverable> committables: state.getCommittableFilesPerCheckpoint().values()) {
+			for (CommitRecoverable committable: committables) {
 				fsWriter.recoverForCommit(committable).commitAfterRecovery();
 			}
 		}
@@ -172,7 +183,7 @@ public class Bucket<IN, BucketID> {
 		checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
 		checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
 
-		RecoverableWriter.CommitRecoverable committable = bucket.closePartFile();
+		CommitRecoverable committable = bucket.closePartFile();
 		if (committable != null) {
 			pendingPartsForCurrentCheckpoint.add(committable);
 		}
@@ -214,8 +225,8 @@ public class Bucket<IN, BucketID> {
 		return new Path(bucketPath, PART_PREFIX + '-' + subtaskIndex + '-' + partCounter);
 	}
 
-	private RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
-		RecoverableWriter.CommitRecoverable committable = null;
+	private CommitRecoverable closePartFile() throws IOException {
+		CommitRecoverable committable = null;
 		if (inProgressPart != null) {
 			committable = inProgressPart.closeForCommit();
 			pendingPartsForCurrentCheckpoint.add(committable);
@@ -233,12 +244,21 @@ public class Bucket<IN, BucketID> {
 	BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
 		prepareBucketForCheckpointing(checkpointId);
 
-		RecoverableWriter.ResumeRecoverable inProgressResumable = null;
+		ResumeRecoverable inProgressResumable = null;
 		long inProgressFileCreationTime = Long.MAX_VALUE;
 
 		if (inProgressPart != null) {
 			inProgressResumable = inProgressPart.persist();
 			inProgressFileCreationTime = inProgressPart.getCreationTime();
+
+			// the following is an optimization so that writers that do not
+			// require cleanup, they do not have to keep track of resumables
+			// and later iterate over the active buckets.
+			// (see onSuccessfulCompletionOfCheckpoint())
+
+			if (fsWriter.requiresCleanupOfRecoverableState()) {
+				this.resumablesPerCheckpoint.put(checkpointId, inProgressResumable);
+			}
 		}
 
 		return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressResumable, pendingPartsPerCheckpoint);
@@ -261,17 +281,34 @@ public class Bucket<IN, BucketID> {
 	void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
 		checkNotNull(fsWriter);
 
-		Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it =
-				pendingPartsPerCheckpoint.entrySet().iterator();
+		Iterator<Map.Entry<Long, List<CommitRecoverable>>> it =
+				pendingPartsPerCheckpoint.headMap(checkpointId, true)
+						.entrySet().iterator();
+
+		while (it.hasNext()) {
+			Map.Entry<Long, List<CommitRecoverable>> entry = it.next();
+
+			for (CommitRecoverable committable : entry.getValue()) {
+				fsWriter.recoverForCommit(committable).commit();
+			}
+			it.remove();
+		}
+
+		cleanupOutdatedResumables(checkpointId);
+	}
+
+	private void cleanupOutdatedResumables(long checkpointId) throws IOException {
+		Iterator<Map.Entry<Long, ResumeRecoverable>> it =
+				resumablesPerCheckpoint.headMap(checkpointId, false)
+						.entrySet().iterator();
 
 		while (it.hasNext()) {
-			Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>> entry = it.next();
+			final ResumeRecoverable recoverable = it.next().getValue();
+			final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable);
+			it.remove();
 
-			if (entry.getKey() <= checkpointId) {
-				for (RecoverableWriter.CommitRecoverable committable : entry.getValue()) {
-					fsWriter.recoverForCommit(committable).commit();
-				}
-				it.remove();
+			if (LOG.isDebugEnabled() && successfullyDeleted) {
+				LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
 			}
 		}
 	}
@@ -290,7 +327,7 @@ public class Bucket<IN, BucketID> {
 	// --------------------------- Testing Methods -----------------------------
 
 	@VisibleForTesting
-	Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPartsPerCheckpoint() {
+	Map<Long, List<CommitRecoverable>> getPendingPartsPerCheckpoint() {
 		return pendingPartsPerCheckpoint;
 	}
 
@@ -301,7 +338,7 @@ public class Bucket<IN, BucketID> {
 	}
 
 	@VisibleForTesting
-	List<RecoverableWriter.CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
+	List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
 		return pendingPartsForCurrentCheckpoint;
 	}
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
new file mode 100644
index 0000000..f328fd7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.core.fs.local.LocalRecoverableWriter;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link Bucket}.
+ */
+public class BucketTest {
+
+	@ClassRule
+	public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+	@Test
+	public void shouldNotCleanupResumablesThatArePartOfTheAckedCheckpoint() throws IOException {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		bucketUnderTest.write("test-element", 0L);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasActiveInProgressFile());
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+		assertThat(recoverableWriter, hasCalledDiscard(0)); // it did not discard as this is still valid.
+	}
+
+	@Test
+	public void shouldCleanupOutdatedResumablesOnCheckpointAck() throws IOException {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		bucketUnderTest.write("test-element", 0L);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasActiveInProgressFile());
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+		bucketUnderTest.onReceptionOfCheckpoint(1L);
+		bucketUnderTest.onReceptionOfCheckpoint(2L);
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
+		assertThat(recoverableWriter, hasCalledDiscard(2)); // that is for checkpoints 0 and 1
+	}
+
+	@Test
+	public void shouldCleanupResumableAfterRestoring() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		bucketUnderTest.write("test-element", 0L);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasActiveInProgressFile());
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+
+		final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path);
+		restoreBucket(newRecoverableWriter, 0, 1, state);
+
+		assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1
+	}
+
+	@Test
+	public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception {
+		final File outDir = TEMP_FOLDER.newFolder();
+		final Path path = new Path(outDir.toURI());
+
+		final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
+		final Bucket<String, String> bucketUnderTest =
+				createBucket(recoverableWriter, path, 0, 0);
+
+		final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L);
+		assertThat(state, hasNoActiveInProgressFile());
+
+		bucketUnderTest.onReceptionOfCheckpoint(1L);
+		bucketUnderTest.onReceptionOfCheckpoint(2L);
+
+		bucketUnderTest.onSuccessfulCompletionOfCheckpoint(2L);
+		assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no in-progress file.
+	}
+
+	// ------------------------------- Matchers --------------------------------
+
+	private static TypeSafeMatcher<TestRecoverableWriter> hasCalledDiscard(int times) {
+		return new TypeSafeMatcher<TestRecoverableWriter>() {
+			@Override
+			protected boolean matchesSafely(TestRecoverableWriter writer) {
+				return writer.getCleanupCallCounter() == times;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description
+						.appendText("the TestRecoverableWriter to have called discardRecoverableState() ")
+						.appendValue(times)
+						.appendText(" times.");
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<BucketState<String>> hasActiveInProgressFile() {
+		return new TypeSafeMatcher<BucketState<String>>() {
+			@Override
+			protected boolean matchesSafely(BucketState<String> state) {
+				return state.getInProgressResumableFile() != null;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a BucketState with active in-progress file.");
+			}
+		};
+	}
+
+	private static TypeSafeMatcher<BucketState<String>> hasNoActiveInProgressFile() {
+		return new TypeSafeMatcher<BucketState<String>>() {
+			@Override
+			protected boolean matchesSafely(BucketState<String> state) {
+				return state.getInProgressResumableFile() == null;
+			}
+
+			@Override
+			public void describeTo(Description description) {
+				description.appendText("a BucketState with no active in-progress file.");
+			}
+		};
+	}
+
+	// ------------------------------- Mock Classes --------------------------------
+
+	private static class TestRecoverableWriter extends LocalRecoverableWriter {
+
+		private int cleanupCallCounter = 0;
+
+		TestRecoverableWriter(LocalFileSystem fs) {
+			super(fs);
+		}
+
+		int getCleanupCallCounter() {
+			return cleanupCallCounter;
+		}
+
+		@Override
+		public boolean requiresCleanupOfRecoverableState() {
+			// here we return true so that the cleanupRecoverableState() is called.
+			return true;
+		}
+
+		@Override
+		public boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException {
+			cleanupCallCounter++;
+			return false;
+		}
+
+		@Override
+		public String toString() {
+			return "TestRecoverableWriter has called discardRecoverableState() " + cleanupCallCounter + " times.";
+		}
+	}
+
+	// ------------------------------- Utility Methods --------------------------------
+
+	private static final String bucketId = "testing-bucket";
+
+	private static final RollingPolicy<String, String> rollingPolicy = DefaultRollingPolicy.create().build();
+
+	private static final PartFileWriter.PartFileFactory<String, String> partFileFactory =
+			new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>());
+
+	private static Bucket<String, String> createBucket(
+			final RecoverableWriter writer,
+			final Path bucketPath,
+			final int subtaskIdx,
+			final int initialPartCounter) {
+
+		return Bucket.getNew(
+				writer,
+				subtaskIdx,
+				bucketId,
+				bucketPath,
+				initialPartCounter,
+				partFileFactory,
+				rollingPolicy);
+	}
+
+	private static Bucket<String, String> restoreBucket(
+			final RecoverableWriter writer,
+			final int subtaskIndex,
+			final long initialPartCounter,
+			final BucketState<String> bucketState) throws Exception {
+
+		return Bucket.restore(
+				writer,
+				subtaskIndex,
+				initialPartCounter,
+				partFileFactory,
+				rollingPolicy,
+				bucketState
+		);
+	}
+
+	private static TestRecoverableWriter getRecoverableWriter(Path path) {
+		try {
+			final FileSystem fs = FileSystem.get(path.toUri());
+			if (!(fs instanceof LocalFileSystem)) {
+				fail("Expected Local FS but got a " + fs.getClass().getName() + " for path: " + path);
+			}
+			return new TestRecoverableWriter((LocalFileSystem) fs);
+		} catch (IOException e) {
+			fail();
+		}
+		return null;
+	}
+}


[flink] 04/06: [hotfix] Consolidated all S3 accesses under the S3AccessHelper.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a029f3363b3b15caeba608c88ebafa2d8c8edd87
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 22 10:46:10 2018 +0100

    [hotfix] Consolidated all S3 accesses under the S3AccessHelper.
---
 .../writer/RecoverableMultiPartUploadImpl.java     |  2 +-
 .../flink/fs/s3/common/writer/S3AccessHelper.java  | 21 +++++++++---
 .../S3RecoverableMultipartUploadFactory.java       | 37 ++++++----------------
 .../writer/RecoverableMultiPartUploadImplTest.java | 11 +++++--
 .../flink/fs/s3hadoop/HadoopS3AccessHelper.java    | 25 ++++++++++++++-
 5 files changed, 58 insertions(+), 38 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index fe2a4cd..9f0a811 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			// they do not fall under the user's global TTL on S3.
 			// Figure out a way to clean them.
 
-			s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+			s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos());
 		}
 		finally {
 			file.release();
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index 57920a5..dbc099a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -26,6 +26,7 @@ import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.UploadPartResult;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -66,10 +67,9 @@ public interface S3AccessHelper {
 	UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException;
 
 	/**
-	 * Uploads a part and associates it with the MPU with the provided {@code uploadId}.
-	 *
-	 * <p>Contrary to the {@link #uploadIncompletePart(String, InputStream, long)}, this part can
-	 * be smaller than the minimum part size imposed by S3.
+	 * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method,
+	 * this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection
+	 * policies specified for your S3 bucket.
 	 *
 	 * @param key the key used to identify this part.
 	 * @param file the (local) file holding the data to be uploaded.
@@ -77,7 +77,7 @@ public interface S3AccessHelper {
 	 * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part.
 	 * @throws IOException
 	 */
-	PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException;
+	PutObjectResult putObject(String key, InputStream file, long length) throws IOException;
 
 	/**
 	 * Finalizes a Multi-Part Upload.
@@ -93,6 +93,17 @@ public interface S3AccessHelper {
 	CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException;
 
 	/**
+	 * Gets the object associated with the provided {@code key} from S3 and
+	 * puts it in the provided {@code targetLocation}.
+	 *
+	 * @param key the key of the object to fetch.
+	 * @param targetLocation the file to read the object to.
+	 * @return The number of bytes read.
+	 * @throws IOException
+	 */
+	long getObject(String key, File targetLocation) throws IOException;
+
+	/**
 	 * Fetches the metadata associated with a given key on S3.
 	 *
 	 * @param key the key.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index 9a171ae..ddb09ab 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -19,10 +19,8 @@
 package org.apache.flink.fs.s3.common.writer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.OffsetAwareOutputStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -74,7 +72,7 @@ final class S3RecoverableMultipartUploadFactory {
 	}
 
 	RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) throws IOException {
-		final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
+		final Optional<File> incompletePart = recoverInProgressPart(recoverable);
 
 		return RecoverableMultiPartUploadImpl.recoverUpload(
 				s3AccessHelper,
@@ -86,36 +84,20 @@ final class S3RecoverableMultipartUploadFactory {
 				incompletePart);
 	}
 
-	@VisibleForTesting
-	Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws IOException {
+	private Optional<File> recoverInProgressPart(S3Recoverable recoverable) throws IOException {
 
-		final String objectName = recoverable.incompleteObjectName();
-		if (objectName == null) {
+		final String objectKey = recoverable.incompleteObjectName();
+		if (objectKey == null) {
 			return Optional.empty();
 		}
 
 		// download the file (simple way)
-		final RefCountedFile fileAndStream = tmpFileSupplier.apply(null);
-		final File file = fileAndStream.getFile();
-
-		long numBytes = 0L;
-
-		try (
-				final OffsetAwareOutputStream outStream = fileAndStream.getStream();
-				final org.apache.hadoop.fs.FSDataInputStream inStream =
-						fs.open(new org.apache.hadoop.fs.Path('/' + objectName))
-		) {
-			final byte[] buffer = new byte[32 * 1024];
-
-			int numRead;
-			while ((numRead = inStream.read(buffer)) > 0) {
-				outStream.write(buffer, 0, numRead);
-				numBytes += numRead;
-			}
-		}
+		final RefCountedFile refCountedFile = tmpFileSupplier.apply(null);
+		final File file = refCountedFile.getFile();
+		final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
 		// some sanity checks
-		if (numBytes != file.length() || numBytes != fileAndStream.getStream().getLength()) {
+		if (numBytes != file.length()) {
 			throw new IOException(String.format("Error recovering writer: " +
 							"Downloading the last data chunk file gives incorrect length. " +
 							"File=%d bytes, Stream=%d bytes",
@@ -132,8 +114,7 @@ final class S3RecoverableMultipartUploadFactory {
 		return Optional.of(file);
 	}
 
-	@VisibleForTesting
-	String pathToObjectName(final Path path) {
+	private String pathToObjectName(final Path path) {
 		org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
 		if (!hadoopPath.isAbsolute()) {
 			hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 4c2f147..a986111 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -347,11 +347,11 @@ public class RecoverableMultiPartUploadImplTest {
 		private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
 		private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
 
-		public List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
+		List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
 			return completePartsUploaded;
 		}
 
-		public List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
+		List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
 			return incompletePartsUploaded;
 		}
 
@@ -367,12 +367,17 @@ public class RecoverableMultiPartUploadImplTest {
 		}
 
 		@Override
-		public PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException {
+		public PutObjectResult putObject(String key, InputStream file, long length) throws IOException {
 			final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
 			return storeAndGetPutObjectResult(key, content);
 		}
 
 		@Override
+		public long getObject(String key, File targetLocation) throws IOException {
+			return 0;
+		}
+
+		@Override
 		public CompleteMultipartUploadResult commitMultiPartUpload(
 				String key,
 				String uploadId,
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f833471..473439c 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -35,8 +35,11 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -72,7 +75,7 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
 	}
 
 	@Override
-	public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
+	public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException {
 		final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
 		return s3accessHelper.putObject(putRequest);
 	}
@@ -83,6 +86,26 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
 	}
 
 	@Override
+	public long getObject(String key, File targetLocation) throws IOException {
+		long numBytes = 0L;
+		try (
+				final OutputStream outStream = new FileOutputStream(targetLocation);
+				final org.apache.hadoop.fs.FSDataInputStream inStream =
+						s3a.open(new org.apache.hadoop.fs.Path('/' + key))
+		) {
+			final byte[] buffer = new byte[32 * 1024];
+
+			int numRead;
+			while ((numRead = inStream.read(buffer)) > 0) {
+				outStream.write(buffer, 0, numRead);
+				numBytes += numRead;
+			}
+		}
+
+		return numBytes;
+	}
+
+	@Override
 	public ObjectMetadata getObjectMetadata(String key) throws IOException {
 		try {
 			return s3a.getObjectMetadata(new Path('/' + key));


[flink] 01/06: [hotfix][fs-connector] Refactor PartFileWriter to take stream.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60f21cc235104b11ee61440d092f3986336f4dc3
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 21 10:35:58 2018 +0100

    [hotfix][fs-connector] Refactor PartFileWriter to take stream.
---
 .../flink/streaming/api/functions/sink/filesystem/Bucket.java  |  7 +++++--
 .../api/functions/sink/filesystem/BulkPartWriter.java          | 10 ++++------
 .../api/functions/sink/filesystem/PartFileWriter.java          |  8 ++++----
 .../api/functions/sink/filesystem/RowWisePartWriter.java       | 10 ++++------
 4 files changed, 17 insertions(+), 18 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 65a7628..042bcda 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
@@ -124,8 +125,9 @@ public class Bucket<IN, BucketID> {
 		// we try to resume the previous in-progress file
 		if (state.hasInProgressResumableFile()) {
 			final RecoverableWriter.ResumeRecoverable resumable = state.getInProgressResumableFile();
+			final RecoverableFsDataOutputStream stream = fsWriter.recover(resumable);
 			inProgressPart = partFileFactory.resumeFrom(
-					bucketId, fsWriter, resumable, state.getInProgressFileCreationTime());
+					bucketId, stream, resumable, state.getInProgressFileCreationTime());
 		}
 	}
 
@@ -195,7 +197,8 @@ public class Bucket<IN, BucketID> {
 		closePartFile();
 
 		final Path partFilePath = assembleNewPartPath();
-		inProgressPart = partFileFactory.openNew(bucketId, fsWriter, partFilePath, currentTime);
+		final RecoverableFsDataOutputStream stream = fsWriter.open(partFilePath);
+		inProgressPart = partFileFactory.openNew(bucketId, stream, partFilePath, currentTime);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
index 005ae4e..a44b0e8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BulkPartWriter.java
@@ -79,14 +79,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
 		@Override
 		public PartFileWriter<IN, BucketID> resumeFrom(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final RecoverableWriter.ResumeRecoverable resumable,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(resumable);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
 			final BulkWriter<IN> writer = writerFactory.create(stream);
 			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 		}
@@ -94,14 +93,13 @@ final class BulkPartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID> {
 		@Override
 		public PartFileWriter<IN, BucketID> openNew(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final Path path,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(path);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
 			final BulkWriter<IN> writer = writerFactory.create(stream);
 			return new BulkPartWriter<>(bucketId, stream, writer, creationTime);
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
index 662454b..95a2978a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
@@ -111,7 +111,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
 		/**
 		 * Used upon recovery from a failure to recover a {@link PartFileWriter writer}.
 		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
 		 * @param resumable the state of the stream we are resurrecting.
 		 * @param creationTime the creation time of the stream.
 		 * @return the recovered {@link PartFileWriter writer}.
@@ -119,14 +119,14 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
 		 */
 		PartFileWriter<IN, BucketID> resumeFrom(
 			final BucketID bucketId,
-			final RecoverableWriter fileSystemWriter,
+			final RecoverableFsDataOutputStream stream,
 			final RecoverableWriter.ResumeRecoverable resumable,
 			final long creationTime) throws IOException;
 
 		/**
 		 * Used to create a new {@link PartFileWriter writer}.
 		 * @param bucketId the id of the bucket this writer is writing to.
-		 * @param fileSystemWriter the filesystem-specific writer to use when writing to the filesystem.
+		 * @param stream the filesystem-specific output stream to use when writing to the filesystem.
 		 * @param path the part this writer will write to.
 		 * @param creationTime the creation time of the stream.
 		 * @return the new {@link PartFileWriter writer}.
@@ -134,7 +134,7 @@ abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
 		 */
 		PartFileWriter<IN, BucketID> openNew(
 			final BucketID bucketId,
-			final RecoverableWriter fileSystemWriter,
+			final RecoverableFsDataOutputStream stream,
 			final Path path,
 			final long creationTime) throws IOException;
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
index 2478b79..05c160c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/RowWisePartWriter.java
@@ -67,28 +67,26 @@ final class RowWisePartWriter<IN, BucketID> extends PartFileWriter<IN, BucketID>
 		@Override
 		public PartFileWriter<IN, BucketID> resumeFrom(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final RecoverableWriter.ResumeRecoverable resumable,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(resumable);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.recover(resumable);
 			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
 		}
 
 		@Override
 		public PartFileWriter<IN, BucketID> openNew(
 				final BucketID bucketId,
-				final RecoverableWriter fileSystemWriter,
+				final RecoverableFsDataOutputStream stream,
 				final Path path,
 				final long creationTime) throws IOException {
 
-			Preconditions.checkNotNull(fileSystemWriter);
+			Preconditions.checkNotNull(stream);
 			Preconditions.checkNotNull(path);
 
-			final RecoverableFsDataOutputStream stream = fileSystemWriter.open(path);
 			return new RowWisePartWriter<>(bucketId, stream, encoder, creationTime);
 		}
 	}