You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/04 16:21:52 UTC
[flink] 03/06: [hotfix][s3-connector] Renamed S3MultiPartUploader
to S3AccessHelper.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0c791cff57aa6ca0167c4967d2e69fdfca988fd4
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Wed Nov 21 11:11:10 2018 +0100
[hotfix][s3-connector] Renamed S3MultiPartUploader to S3AccessHelper.
---
.../fs/s3/common/AbstractS3FileSystemFactory.java | 6 ++---
.../flink/fs/s3/common/FlinkS3FileSystem.java | 12 ++++-----
.../writer/RecoverableMultiPartUploadImpl.java | 30 +++++++++++-----------
...3MultiPartUploader.java => S3AccessHelper.java} | 2 +-
.../flink/fs/s3/common/writer/S3Committer.java | 12 ++++-----
.../S3RecoverableMultipartUploadFactory.java | 10 ++++----
.../fs/s3/common/writer/S3RecoverableWriter.java | 4 +--
.../flink/fs/s3/common/S3EntropyFsFactoryTest.java | 4 +--
.../writer/RecoverableMultiPartUploadImplTest.java | 4 +--
...PartUploader.java => HadoopS3AccessHelper.java} | 26 +++++++++----------
.../flink/fs/s3hadoop/S3FileSystemFactory.java | 6 ++---
.../flink/fs/s3presto/S3FileSystemFactory.java | 4 +--
12 files changed, 60 insertions(+), 60 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 318fd39..6ccdeae 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,7 +141,7 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
final String localTmpDirectory = flinkConfig.getString(CoreOptions.TMP_DIRS);
final long s3minPartSize = flinkConfig.getLong(PART_UPLOAD_MIN_SIZE);
final int maxConcurrentUploads = flinkConfig.getInteger(MAX_CONCURRENT_UPLOADS);
- final S3MultiPartUploader s3AccessHelper = getS3AccessHelper(fs);
+ final S3AccessHelper s3AccessHelper = getS3AccessHelper(fs);
return new FlinkS3FileSystem(
fs,
@@ -166,6 +166,6 @@ public abstract class AbstractS3FileSystemFactory implements FileSystemFactory {
URI fsUri, org.apache.hadoop.conf.Configuration hadoopConfig);
@Nullable
- protected abstract S3MultiPartUploader getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
+ protected abstract S3AccessHelper getS3AccessHelper(org.apache.hadoop.fs.FileSystem fs);
}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index 553edde..5248e06 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.fs.s3.common.utils.RefCountedFile;
import org.apache.flink.fs.s3.common.utils.RefCountedTmpFileCreator;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.fs.s3.common.writer.S3RecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.util.Preconditions;
@@ -60,7 +60,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
private final FunctionWithException<File, RefCountedFile, IOException> tmpFileCreator;
@Nullable
- private final S3MultiPartUploader s3UploadHelper;
+ private final S3AccessHelper s3AccessHelper;
private final Executor uploadThreadPool;
@@ -83,7 +83,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
String localTmpDirectory,
@Nullable String entropyInjectionKey,
int entropyLength,
- @Nullable S3MultiPartUploader s3UploadHelper,
+ @Nullable S3AccessHelper s3UploadHelper,
long s3uploadPartSize,
int maxConcurrentUploadsPerStream) {
@@ -99,7 +99,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
// recoverable writer parameter configuration initialization
this.localTmpDir = Preconditions.checkNotNull(localTmpDirectory);
this.tmpFileCreator = RefCountedTmpFileCreator.inDirectories(new File(localTmpDirectory));
- this.s3UploadHelper = s3UploadHelper;
+ this.s3AccessHelper = s3UploadHelper;
this.uploadThreadPool = Executors.newCachedThreadPool();
Preconditions.checkArgument(s3uploadPartSize >= S3_MULTIPART_MIN_PART_SIZE);
@@ -131,7 +131,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
@Override
public RecoverableWriter createRecoverableWriter() throws IOException {
- if (s3UploadHelper == null) {
+ if (s3AccessHelper == null) {
// this is the case for Presto
throw new UnsupportedOperationException("This s3 file system implementation does not support recoverable writers.");
}
@@ -139,7 +139,7 @@ public class FlinkS3FileSystem extends HadoopFileSystem implements EntropyInject
return S3RecoverableWriter.writer(
getHadoopFileSystem(),
tmpFileCreator,
- s3UploadHelper,
+ s3AccessHelper,
uploadThreadPool,
s3uploadPartSize,
maxConcurrentUploadsPerStream);
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 80042ce..fe2a4cd 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -58,7 +58,7 @@ import static org.apache.flink.util.Preconditions.checkState;
@NotThreadSafe
final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload {
- private final S3MultiPartUploader s3MPUploader;
+ private final S3AccessHelper s3AccessHelper;
private final Executor uploadThreadPool;
@@ -71,7 +71,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
// ------------------------------------------------------------------------
private RecoverableMultiPartUploadImpl(
- S3MultiPartUploader s3uploader,
+ S3AccessHelper s3AccessHelper,
Executor uploadThreadPool,
String uploadId,
String objectName,
@@ -81,7 +81,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
) {
checkArgument(numBytes >= 0L);
- this.s3MPUploader = checkNotNull(s3uploader);
+ this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.uploadThreadPool = checkNotNull(uploadThreadPool);
this.currentUploadInfo = new MultiPartUploadInfo(objectName, uploadId, partsSoFar, numBytes, incompletePart);
this.namePrefixForTempObjects = incompleteObjectNamePrefix(objectName);
@@ -111,7 +111,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
currentUploadInfo.registerNewPart(partLength);
file.retain(); // keep the file while the async upload still runs
- uploadThreadPool.execute(new UploadTask(s3MPUploader, currentUploadInfo, file, future));
+ uploadThreadPool.execute(new UploadTask(s3AccessHelper, currentUploadInfo, file, future));
}
@Override
@@ -124,7 +124,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
final S3Recoverable snapshot = snapshotAndGetRecoverable(null);
return new S3Committer(
- s3MPUploader,
+ s3AccessHelper,
snapshot.getObjectName(),
snapshot.uploadId(),
snapshot.parts(),
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
// they do not fall under the user's global TTL on S3.
// Figure out a way to clean them.
- s3MPUploader.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+ s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
}
finally {
file.release();
@@ -244,14 +244,14 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
// ------------------------------------------------------------------------
public static RecoverableMultiPartUploadImpl newUpload(
- final S3MultiPartUploader s3uploader,
+ final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String objectName) throws IOException {
- final String multiPartUploadId = s3uploader.startMultiPartUpload(objectName);
+ final String multiPartUploadId = s3AccessHelper.startMultiPartUpload(objectName);
return new RecoverableMultiPartUploadImpl(
- s3uploader,
+ s3AccessHelper,
uploadThreadPool,
multiPartUploadId,
objectName,
@@ -261,7 +261,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
}
public static RecoverableMultiPartUploadImpl recoverUpload(
- final S3MultiPartUploader s3uploader,
+ final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final String multipartUploadId,
final String objectName,
@@ -270,7 +270,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
final Optional<File> incompletePart) {
return new RecoverableMultiPartUploadImpl(
- s3uploader,
+ s3AccessHelper,
uploadThreadPool,
multipartUploadId,
objectName,
@@ -286,7 +286,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
private static class UploadTask implements Runnable {
- private final S3MultiPartUploader s3uploader;
+ private final S3AccessHelper s3AccessHelper;
private final String objectName;
@@ -299,7 +299,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
private final CompletableFuture<PartETag> future;
UploadTask(
- final S3MultiPartUploader s3uploader,
+ final S3AccessHelper s3AccessHelper,
final MultiPartUploadInfo currentUpload,
final RefCountedFSOutputStream file,
final CompletableFuture<PartETag> future) {
@@ -313,7 +313,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
// these are limits put by Amazon
checkArgument(partNumber >= 1 && partNumber <= 10_000);
- this.s3uploader = checkNotNull(s3uploader);
+ this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.file = checkNotNull(file);
this.future = checkNotNull(future);
}
@@ -321,7 +321,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
@Override
public void run() {
try (final InputStream inputStream = file.getInputStream()) {
- final UploadPartResult result = s3uploader.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
+ final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
future.complete(new PartETag(result.getPartNumber(), result.getETag()));
file.release();
}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
similarity index 99%
rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index da227a4..57920a5 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* the upload with all its parts will be either committed or discarded.
*/
@Internal
-public interface S3MultiPartUploader {
+public interface S3AccessHelper {
/**
* Initializes a Multi-Part Upload.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
index 1fc8bf1..5fbc5bb 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3Committer.java
@@ -40,7 +40,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
private static final Logger LOG = LoggerFactory.getLogger(S3Committer.class);
- private final S3MultiPartUploader s3uploader;
+ private final S3AccessHelper s3AccessHelper;
private final String uploadId;
@@ -50,8 +50,8 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
private final long totalLength;
- S3Committer(S3MultiPartUploader s3uploader, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
- this.s3uploader = checkNotNull(s3uploader);
+ S3Committer(S3AccessHelper s3AccessHelper, String objectName, String uploadId, List<PartETag> parts, long totalLength) {
+ this.s3AccessHelper = checkNotNull(s3AccessHelper);
this.objectName = checkNotNull(objectName);
this.uploadId = checkNotNull(uploadId);
this.parts = checkNotNull(parts);
@@ -64,7 +64,7 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
LOG.info("Committing {} with MPU ID {}", objectName, uploadId);
final AtomicInteger errorCount = new AtomicInteger();
- s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
+ s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, errorCount);
if (errorCount.get() == 0) {
LOG.debug("Successfully committed {} with MPU ID {}", objectName, uploadId);
@@ -82,14 +82,14 @@ public final class S3Committer implements RecoverableFsDataOutputStream.Committe
LOG.info("Trying to commit after recovery {} with MPU ID {}", objectName, uploadId);
try {
- s3uploader.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
+ s3AccessHelper.commitMultiPartUpload(objectName, uploadId, parts, totalLength, new AtomicInteger());
} catch (IOException e) {
LOG.info("Failed to commit after recovery {} with MPU ID {}. " +
"Checking if file was committed before...", objectName, uploadId);
LOG.trace("Exception when committing:", e);
try {
- ObjectMetadata metadata = s3uploader.getObjectMetadata(objectName);
+ ObjectMetadata metadata = s3AccessHelper.getObjectMetadata(objectName);
if (totalLength != metadata.getContentLength()) {
String message = String.format("Inconsistent result for object %s: conflicting lengths. " +
"Recovered committer for upload %s indicates %s bytes, present object is %s bytes",
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index b201981..9a171ae 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -43,7 +43,7 @@ final class S3RecoverableMultipartUploadFactory {
private final org.apache.hadoop.fs.FileSystem fs;
- private final S3MultiPartUploader twoPhaseUploader;
+ private final S3AccessHelper s3AccessHelper;
private final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier;
@@ -53,7 +53,7 @@ final class S3RecoverableMultipartUploadFactory {
S3RecoverableMultipartUploadFactory(
final FileSystem fs,
- final S3MultiPartUploader twoPhaseUploader,
+ final S3AccessHelper s3AccessHelper,
final int maxConcurrentUploadsPerStream,
final Executor executor,
final FunctionWithException<File, RefCountedFile, IOException> tmpFileSupplier) {
@@ -61,14 +61,14 @@ final class S3RecoverableMultipartUploadFactory {
this.fs = Preconditions.checkNotNull(fs);
this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
this.executor = executor;
- this.twoPhaseUploader = twoPhaseUploader;
+ this.s3AccessHelper = s3AccessHelper;
this.tmpFileSupplier = tmpFileSupplier;
}
RecoverableMultiPartUpload getNewRecoverableUpload(Path path) throws IOException {
return RecoverableMultiPartUploadImpl.newUpload(
- twoPhaseUploader,
+ s3AccessHelper,
limitedExecutor(),
pathToObjectName(path));
}
@@ -77,7 +77,7 @@ final class S3RecoverableMultipartUploadFactory {
final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
return RecoverableMultiPartUploadImpl.recoverUpload(
- twoPhaseUploader,
+ s3AccessHelper,
limitedExecutor(),
recoverable.uploadId(),
recoverable.getObjectName(),
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
index 2a84308..698f65f 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableWriter.java
@@ -129,7 +129,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
public static S3RecoverableWriter writer(
final FileSystem fs,
final FunctionWithException<File, RefCountedFile, IOException> tempFileCreator,
- final S3MultiPartUploader twoPhaseUploader,
+ final S3AccessHelper s3AccessHelper,
final Executor uploadThreadPool,
final long userDefinedMinPartSize,
final int maxConcurrentUploadsPerStream) {
@@ -139,7 +139,7 @@ public class S3RecoverableWriter implements RecoverableWriter {
final S3RecoverableMultipartUploadFactory uploadFactory =
new S3RecoverableMultipartUploadFactory(
fs,
- twoPhaseUploader,
+ s3AccessHelper,
maxConcurrentUploadsPerStream,
uploadThreadPool,
tempFileCreator);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
index d3d25c3..5b15652 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java
@@ -19,7 +19,7 @@
package org.apache.flink.fs.s3.common;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.FileSystem;
@@ -78,7 +78,7 @@ public class S3EntropyFsFactoryTest extends TestLogger {
@Nullable
@Override
- protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+ protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
return null;
}
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 72554e1..4c2f147 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -339,10 +339,10 @@ public class RecoverableMultiPartUploadImplTest {
}
/**
- * A {@link S3MultiPartUploader} that simulates uploading part files to S3 by
+ * A {@link S3AccessHelper} that simulates uploading part files to S3 by
* simply putting complete and incomplete part files in lists for further validation.
*/
- private static class StubMultiPartUploader implements S3MultiPartUploader {
+ private static class StubMultiPartUploader implements S3AccessHelper {
private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
similarity index 77%
rename from flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
rename to flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f446f70..f833471 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3MultiPartUploader.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -18,7 +18,7 @@
package org.apache.flink.fs.s3hadoop;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.util.MathUtils;
import com.amazonaws.SdkBaseException;
@@ -43,16 +43,16 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
- * An implementation of the {@link S3MultiPartUploader} for the Hadoop S3A filesystem.
+ * An implementation of the {@link S3AccessHelper} for the Hadoop S3A filesystem.
*/
-public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
+public class HadoopS3AccessHelper implements S3AccessHelper {
private final S3AFileSystem s3a;
- private final InternalWriteOperationHelper s3uploader;
+ private final InternalWriteOperationHelper s3accessHelper;
- public HadoopS3MultiPartUploader(S3AFileSystem s3a, Configuration conf) {
- this.s3uploader = new InternalWriteOperationHelper(
+ public HadoopS3AccessHelper(S3AFileSystem s3a, Configuration conf) {
+ this.s3accessHelper = new InternalWriteOperationHelper(
checkNotNull(s3a),
checkNotNull(conf)
);
@@ -61,25 +61,25 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
@Override
public String startMultiPartUpload(String key) throws IOException {
- return s3uploader.initiateMultiPartUpload(key);
+ return s3accessHelper.initiateMultiPartUpload(key);
}
@Override
public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException {
- final UploadPartRequest uploadRequest = s3uploader.newUploadPartRequest(
+ final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L);
- return s3uploader.uploadPart(uploadRequest);
+ return s3accessHelper.uploadPart(uploadRequest);
}
@Override
public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
- final PutObjectRequest putRequest = s3uploader.createPutObjectRequest(key, inputStream, length);
- return s3uploader.putObject(putRequest);
+ final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
+ return s3accessHelper.putObject(putRequest);
}
@Override
public CompleteMultipartUploadResult commitMultiPartUpload(String destKey, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException {
- return s3uploader.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
+ return s3accessHelper.completeMPUwithRetries(destKey, uploadId, partETags, length, errorCount);
}
@Override
@@ -94,7 +94,7 @@ public class HadoopS3MultiPartUploader implements S3MultiPartUploader {
/**
* Internal {@link WriteOperationHelper} that is wrapped so that it only exposes
- * the functionality we need for the {@link S3MultiPartUploader}.
+ * the functionality we need for the {@link S3AccessHelper}.
*/
private static final class InternalWriteOperationHelper extends WriteOperationHelper {
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 897629f..2637e7b 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3hadoop;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -96,8 +96,8 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
@Nullable
@Override
- protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+ protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
final S3AFileSystem s3Afs = (S3AFileSystem) fs;
- return new HadoopS3MultiPartUploader(s3Afs, s3Afs.getConf());
+ return new HadoopS3AccessHelper(s3Afs, s3Afs.getConf());
}
}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index b579d6e..0fb2857 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
import org.apache.flink.fs.s3.common.HadoopConfigLoader;
-import org.apache.flink.fs.s3.common.writer.S3MultiPartUploader;
+import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.PrestoS3FileSystem;
@@ -92,7 +92,7 @@ public class S3FileSystemFactory extends AbstractS3FileSystemFactory {
@Nullable
@Override
- protected S3MultiPartUploader getS3AccessHelper(FileSystem fs) {
+ protected S3AccessHelper getS3AccessHelper(FileSystem fs) {
return null;
}