You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/04 16:21:52 UTC

[flink] 03/06: [hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c791cff57aa6ca0167c4967d2e69fdfca988fd4
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 21 11:11:10 2018 +0100

    [hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.
---
 .../fs/s3/common/AbstractS3FileSystemFactory.java  |  6 ++---
 .../flink/fs/s3/common/FlinkS3FileSystem.java      | 12 ++++-----
 .../writer/RecoverableMultiPartUploadImpl.java     | 30 +++++++++++-----------
 ...3MultiPartUploader.java => S3AccessHelper.java} |  2 +-
 .../flink/fs/s3/common/writer/S3Committer.java     | 12 ++++-----
 .../S3RecoverableMultipartUploadFactory.java       | 10 ++++----
 .../fs/s3/common/writer/S3RecoverableWriter.java   |  4 +--
 .../flink/fs/s3/common/S3EntropyFsFactoryTest.java |  4 +--
 .../writer/RecoverableMultiPartUploadImplTest.java |  4 +--
 ...PartUploader.java => HadoopS3AccessHelper.java} | 26 +++++++++----------
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |  6 ++---
 .../flink/fs/s3presto/S3FileSystemFactory.java     |  4 +--
 12 files changed, 60 insertions(+), 60 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 318fd39..6ccdeae 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,7 +141,7 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 			final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
 			final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
 			final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
-			final S3MultiPartUploader s3AccessHelper = getS3AccessHelper(fs);
+			final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);
 
 			return new FlinkS3FileSystem(
 					fs,
@@ -166,6 +166,6 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
 		URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
 
 	@Nullable
-	protected abstract S3MultiPartUploader getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
+	protected abstract S3AccessHelper getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
 }
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 553edde..5248e06 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -60,7 +60,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;
 
 	@Nullable
-	private final S3MultiPartUploader s3UploadHelper;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final Executor uploadThreadPool;
 
@@ -83,7 +83,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 			String localTmpDirectory,
 			@Nullable String entropyInjectionKey,
 			int entropyLength,
-			@Nullable S3MultiPartUploader s3UploadHelper,
+			@Nullable S3AccessHelper s3UploadHelper,
 			long s3uploadPartSize,
 			int maxConcurrentUploadsPerStream) {
 
@@ -99,7 +99,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 		// recoverable writer parameter configuration initialization
 		this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
 		this.tmpFileCreator = RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
-		this.s3UploadHelper = s3UploadHelper;
+		this.s3AccessHelper = s3UploadHelper;
 		this.uploadThreadPool = Executors.newCachedThreadPool();
 
 		Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
@@ -131,7 +131,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 
 	@Override
 	public RecoverableWriter createRecoverableWriter() throws IOException {
-		if (s3UploadHelper == null) {
+		if (s3AccessHelper == null) {
 			// this is the case for Presto
 			throw new UnsupportedOperationException("This s3 file system implementation does not support recoverable writers.");
 		}
@@ -139,7 +139,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
 		return S3RecoverableWriter.writer(
 				getHadoopFileSystem(),
 				tmpFileCreator,
-				s3UploadHelper,
+				s3AccessHelper,
 				uploadThreadPool,
 				s3uploadPartSize,
 				maxConcurrentUploadsPerStream);
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 80042ce..fe2a4cd 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -58,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 @NotThreadSafe
 final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload {
 
-	private final S3MultiPartUploader s3MPUploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final Executor uploadThreadPool;
 
@@ -71,7 +71,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	// ------------------------------------------------------------------------
 
 	private RecoverableMultiPartUploadImpl(
-			S3MultiPartUploader s3uploader,
+			S3AccessHelper s3AccessHelper,
 			Executor uploadThreadPool,
 			String uploadId,
 			String objectName,
@@ -81,7 +81,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	) {
 		checkArgument(numBytes >= 0L);
 
-		this.s3MPUploader = checkNotNull(s3uploader);
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
 		this.uploadThreadPool = checkNotNull(uploadThreadPool);
 		this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
 		this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName);
@@ -111,7 +111,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		currentUploadInfo.registerNewPart(partLength);
 
 		file.retain(); // keep the file while the async upload still runs
-		uploadThreadPool.execute(new UploadTask(s3MPUploader, currentUploadInfo, file, future));
+		uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
 	}
 
 	@Override
@@ -124,7 +124,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		final S3Recoverable snapshot = snapshotAndGetRecoverable(null);
 
 		return new S3Committer(
-				s3MPUploader,
+				s3AccessHelper,
 				snapshot.getObjectName(),
 				snapshot.uploadId(),
 				snapshot.parts(),
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			// they do not fall under the user's global TTL on S3.
 			// Figure out a way to clean them.
 
-			s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+			s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
 		}
 		finally {
 			file.release();
@@ -244,14 +244,14 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	// ------------------------------------------------------------------------
 
 	public static RecoverableMultiPartUploadImpl newUpload(
-			final S3MultiPartUploader s3uploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final String objectName) throws IOException {
 
-		final String multiPartUploadId = s3uploader.startMultiPartUpload(objectName);
+		final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName);
 
 		return new RecoverableMultiPartUploadImpl(
-				s3uploader,
+				s3AccessHelper,
 				uploadThreadPool,
 				multiPartUploadId,
 				objectName,
@@ -261,7 +261,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 	}
 
 	public static RecoverableMultiPartUploadImpl recoverUpload(
-			final S3MultiPartUploader s3uploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final String multipartUploadId,
 			final String objectName,
@@ -270,7 +270,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			final Optional<File> incompletePart) {
 
 		return new RecoverableMultiPartUploadImpl(
-				s3uploader,
+				s3AccessHelper,
 				uploadThreadPool,
 				multipartUploadId,
 				objectName,
@@ -286,7 +286,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 
 	private static class UploadTask implements Runnable {
 
-		private final S3MultiPartUploader s3uploader;
+		private final S3AccessHelper s3AccessHelper;
 
 		private final String objectName;
 
@@ -299,7 +299,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		private final CompletableFuture<PartETag> future;
 
 		UploadTask(
-				final S3MultiPartUploader s3uploader,
+				final S3AccessHelper s3AccessHelper,
 				final MultiPartUploadInfo currentUpload,
 				final RefCountedFSOutputStream file,
 				final CompletableFuture<PartETag> future) {
@@ -313,7 +313,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			// these are limits put by Amazon
 			checkArgument(partNumber >= 1  && partNumber <= 10_000);
 
-			this.s3uploader = checkNotNull(s3uploader);
+			this.s3AccessHelper = checkNotNull(s3AccessHelper);
 			this.file = checkNotNull(file);
 			this.future = checkNotNull(future);
 		}
@@ -321,7 +321,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		@Override
 		public void run() {
 			try (final InputStream inputStream = file.getInputStream()) {
-				final UploadPartResult result = s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
+				final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
 				future.complete(new PartETag(result.getPartNumber(), result.getETag()));
 				file.release();
 			}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
similarity index 99%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index da227a4..57920a5 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * the upload with all its parts will be either committed or discarded.
  */
 @Internal
-public interface S3MultiPartUploader {
+public interface S3AccessHelper {
 
 	/**
 	 * Initializes a Multi-Part Upload.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
index 1fc8bf1..5fbc5bb 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
@@ -40,7 +40,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 
 	private static final Logger LOG = LoggerFactory.getLogger(S3Committer.class);
 
-	private final S3MultiPartUploader s3uploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final String uploadId;
 
@@ -50,8 +50,8 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 
 	private final long totalLength;
 
-	S3Committer(S3MultiPartUploader s3uploader, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
-		this.s3uploader = checkNotNull(s3uploader);
+	S3Committer(S3AccessHelper s3AccessHelper, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
+		this.s3AccessHelper = checkNotNull(s3AccessHelper);
 		this.objectName = checkNotNull(objectName);
 		this.uploadId = checkNotNull(uploadId);
 		this.parts = checkNotNull(parts);
@@ -64,7 +64,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 			LOG.info("Committing {} with MPU ID {}", objectName, uploadId);
 
 			final AtomicInteger errorCount = new AtomicInteger();
-			s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
+			s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
 
 			if (errorCount.get() == 0) {
 				LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId);
@@ -82,14 +82,14 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
 			LOG.info("Trying to commit after recovery {} with MPU ID {}", objectName, uploadId);
 
 			try {
-				s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
+				s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
 			} catch (IOException e) {
 				LOG.info("Failed to commit after recovery {} with MPU ID {}. " +
 						"Checking if file was committed before...", objectName, uploadId);
 				LOG.trace("Exception when committing:", e);
 
 				try {
-					ObjectMetadata metadata = s3uploader.getObjectMetadata(objectName);
+					ObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName);
 					if (totalLength != metadata.getContentLength()) {
 						String message = String.format("Inconsistent result for object %s: conflicting lengths. " +
 										"Recovered committer for upload %s indicates %s bytes, present object is %s bytes",
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index b201981..9a171ae 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {
 
 	private final org.apache.hadoop.fs.FileSystem fs;
 
-	private final S3MultiPartUploader twoPhaseUploader;
+	private final S3AccessHelper s3AccessHelper;
 
 	private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;
 
@@ -53,7 +53,7 @@ final class S3RecoverableMultipartUploadFactory {
 
 	S3RecoverableMultipartUploadFactory(
 			final FileSystem fs,
-			final S3MultiPartUploader twoPhaseUploader,
+			final S3AccessHelper s3AccessHelper,
 			final int maxConcurrentUploadsPerStream,
 			final Executor executor,
 			final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {
@@ -61,14 +61,14 @@ final class S3RecoverableMultipartUploadFactory {
 		this.fs = Preconditions.checkNotNull(fs);
 		this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
 		this.executor = executor;
-		this.twoPhaseUploader = twoPhaseUploader;
+		this.s3AccessHelper = s3AccessHelper;
 		this.tmpFileSupplier = tmpFileSupplier;
 	}
 
 	RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws IOException {
 
 		return RecoverableMultiPartUploadImpl.newUpload(
-				twoPhaseUploader,
+				s3AccessHelper,
 				limitedExecutor(),
 				pathToObjectName(path));
 	}
@@ -77,7 +77,7 @@ final class S3RecoverableMultipartUploadFactory {
 		final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
 
 		return RecoverableMultiPartUploadImpl.recoverUpload(
-				twoPhaseUploader,
+				s3AccessHelper,
 				limitedExecutor(),
 				recoverable.uploadId(),
 				recoverable.getObjectName(),
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 2a84308..698f65f 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -129,7 +129,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
 	public static S3RecoverableWriter writer(
 			final FileSystem fs,
 			final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
-			final S3MultiPartUploader twoPhaseUploader,
+			final S3AccessHelper s3AccessHelper,
 			final Executor uploadThreadPool,
 			final long userDefinedMinPartSize,
 			final int maxConcurrentUploadsPerStream) {
@@ -139,7 +139,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
 		final S3RecoverableMultipartUploadFactory uploadFactory =
 				new S3RecoverableMultipartUploadFactory(
 						fs,
-						twoPhaseUploader,
+						s3AccessHelper,
 						maxConcurrentUploadsPerStream,
 						uploadThreadPool,
 						tempFileCreator);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index d3d25c3..5b15652 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.fs.s3.common;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -78,7 +78,7 @@ public class S3EntropyFsFactoryTest extends TestLogger {
 
 		@Nullable
 		@Override
-		protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+		protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 			return null;
 		}
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 72554e1..4c2f147 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -339,10 +339,10 @@ public class RecoverableMultiPartUploadImplTest {
 	}
 
 	/**
-	 * A {@link S3MultiPartUploader} that simulates uploading part files to S3 by
+	 * A {@link S3AccessHelper} that simulates uploading part files to S3 by
 	 * simply putting complete and incomplete part files in lists for further validation.
 	 */
-	private static class StubMultiPartUploader implements S3MultiPartUploader {
+	private static class StubMultiPartUploader implements S3AccessHelper {
 
 		private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
 		private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
similarity index 77%
rename from flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f446f70..f833471 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.fs.s3hadoop;
 
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.MathUtils;
 
 import com.amazonaws.SdkBaseException;
@@ -43,16 +43,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * An implementation of the {@link S3MultiPartUploader} for the Hadoop S3A filesystem.
+ * An implementation of the {@link S3AccessHelper} for the Hadoop S3A filesystem.
  */
-public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
+public class HadoopS3AccessHelper implements S3AccessHelper {
 
 	private final S3AFileSystem s3a;
 
-	private final InternalWriteOperationHelper s3uploader;
+	private final InternalWriteOperationHelper s3accessHelper;
 
-	public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) {
-		this.s3uploader = new InternalWriteOperationHelper(
+	public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) {
+		this.s3accessHelper = new InternalWriteOperationHelper(
 				checkNotNull(s3a),
 				checkNotNull(conf)
 		);
@@ -61,25 +61,25 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
 
 	@Override
 	public String startMultiPartUpload(String key) throws IOException {
-		return s3uploader.initiateMultiPartUpload(key);
+		return s3accessHelper.initiateMultiPartUpload(key);
 	}
 
 	@Override
 	public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException {
-		final UploadPartRequest uploadRequest = s3uploader.newUploadPartRequest(
+		final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
 				key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L);
-		return s3uploader.uploadPart(uploadRequest);
+		return s3accessHelper.uploadPart(uploadRequest);
 	}
 
 	@Override
 	public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
-		final PutObjectRequest putRequest = s3uploader.createPutObjectRequest(key, inputStream, length);
-		return s3uploader.putObject(putRequest);
+		final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
+		return s3accessHelper.putObject(putRequest);
 	}
 
 	@Override
 	public CompleteMultipartUploadResult commitMultiPartUpload(String destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException {
-		return s3uploader.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
+		return s3accessHelper.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
 	}
 
 	@Override
@@ -94,7 +94,7 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
 
 	/**
 	 * Internal {@link WriteOperationHelper} that is wrapped so that it only exposes
-	 * the functionality we need for the {@link S3MultiPartUploader}.
+	 * the functionality we need for the {@link S3AccessHelper}.
 	 */
 	private static final class InternalWriteOperationHelper extends WriteOperationHelper {
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 897629f..2637e7b 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3hadoop;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -96,8 +96,8 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
 
 	@Nullable
 	@Override
-	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+	protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 		final S3AFileSystem s3Afs = (S3AFileSystem) fs;
-		return new HadoopS3MultiPartUploader(s3Afs, s3Afs.getConf());
+		return new HadoopS3AccessHelper(s3Afs, s3Afs.getConf());
 	}
 }
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index b579d6e..0fb2857 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
 import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
@@ -92,7 +92,7 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
 
 	@Nullable
 	@Override
-	protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+	protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
 		return null;
 	}