You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/04 16:21:53 UTC
[flink] 04/06: [hotfix] Consolidated all S3 accesses under the
S3AccessHelper.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit cf793009b5f16976efa0698f2d6ca5f4a4139c63
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 22 10:46:10 2018 +0100
[hotfix] Consolidated all S3 accesses under the S3AccessHelper.
---
.../writer/RecoverableMultiPartUploadImpl.java | 2 +-
.../flink/fs/s3/common/writer/S3AccessHelper.java | 21 +++++++++---
.../S3RecoverableMultipartUploadFactory.java | 37 ++++++----------------
.../writer/RecoverableMultiPartUploadImplTest.java | 11 +++++--
.../flink/fs/s3hadoop/HadoopS3AccessHelper.java | 25 ++++++++++++++-
5 files changed, 58 insertions(+), 38 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index fe2a4cd..9f0a811 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
// they do not fall under the user's global TTL on S3.
// Figure out a way to clean them.
- s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+ s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos());
}
finally {
file.release();
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index 57920a5..dbc099a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -26,6 +26,7 @@ import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartResult;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
@@ -66,10 +67,9 @@ public interface S3AccessHelper {
UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException;
/**
- * Uploads a part and associates it with the MPU with the provided {@code uploadId}.
- *
- * <p>Contrary to the {@link #uploadIncompletePart(String, InputStream, long)}, this part can
- * be smaller than the minimum part size imposed by S3.
+ * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method,
+ * this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection
+ * policies specified for your S3 bucket.
*
* @param key the key used to identify this part.
* @param file the (local) file holding the data to be uploaded.
@@ -77,7 +77,7 @@ public interface S3AccessHelper {
* @return The {@link PutObjectResult result} of the attempt to stage the incomplete part.
* @throws IOException
*/
- PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException;
+ PutObjectResult putObject(String key, InputStream file, long length) throws IOException;
/**
* Finalizes a Multi-Part Upload.
@@ -93,6 +93,17 @@ public interface S3AccessHelper {
CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException;
/**
+ * Gets the object associated with the provided {@code key} from S3 and
+ * puts it in the provided {@code targetLocation}.
+ *
+ * @param key the key of the object to fetch.
+ * @param targetLocation the file to read the object to.
+ * @return The number of bytes read.
+ * @throws IOException
+ */
+ long getObject(String key, File targetLocation) throws IOException;
+
+ /**
* Fetches the metadata associated with a given key on S3.
*
* @param key the key.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index 9a171ae..ddb09ab 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -19,10 +19,8 @@
package org.apache.flink.fs.s3.common.writer;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.OffsetAwareOutputStream;
import org.apache.flink.fs.s3.common.utils.RefCountedFile;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.util.Preconditions;
@@ -74,7 +72,7 @@ final class S3RecoverableMultipartUploadFactory {
}
RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) throws IOException {
- final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
+ final Optional<File> incompletePart = recoverInProgressPart(recoverable);
return RecoverableMultiPartUploadImpl.recoverUpload(
s3AccessHelper,
@@ -86,36 +84,20 @@ final class S3RecoverableMultipartUploadFactory {
incompletePart);
}
- @VisibleForTesting
- Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws IOException {
+ private Optional<File> recoverInProgressPart(S3Recoverable recoverable) throws IOException {
- final String objectName = recoverable.incompleteObjectName();
- if (objectName == null) {
+ final String objectKey = recoverable.incompleteObjectName();
+ if (objectKey == null) {
return Optional.empty();
}
// download the file (simple way)
- final RefCountedFile fileAndStream = tmpFileSupplier.apply(null);
- final File file = fileAndStream.getFile();
-
- long numBytes = 0L;
-
- try (
- final OffsetAwareOutputStream outStream = fileAndStream.getStream();
- final org.apache.hadoop.fs.FSDataInputStream inStream =
- fs.open(new org.apache.hadoop.fs.Path('/' + objectName))
- ) {
- final byte[] buffer = new byte[32 * 1024];
-
- int numRead;
- while ((numRead = inStream.read(buffer)) > 0) {
- outStream.write(buffer, 0, numRead);
- numBytes += numRead;
- }
- }
+ final RefCountedFile refCountedFile = tmpFileSupplier.apply(null);
+ final File file = refCountedFile.getFile();
+ final long numBytes = s3AccessHelper.getObject(objectKey, file);
// some sanity checks
- if (numBytes != file.length() || numBytes != fileAndStream.getStream().getLength()) {
+ if (numBytes != file.length()) {
throw new IOException(String.format("Error recovering writer: " +
"Downloading the last data chunk file gives incorrect length. " +
"File=%d bytes, Stream=%d bytes",
@@ -132,8 +114,7 @@ final class S3RecoverableMultipartUploadFactory {
return Optional.of(file);
}
- @VisibleForTesting
- String pathToObjectName(final Path path) {
+ private String pathToObjectName(final Path path) {
org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
if (!hadoopPath.isAbsolute()) {
hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 4c2f147..a986111 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -347,11 +347,11 @@ public class RecoverableMultiPartUploadImplTest {
private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
- public List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
+ List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
return completePartsUploaded;
}
- public List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
+ List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
return incompletePartsUploaded;
}
@@ -367,12 +367,17 @@ public class RecoverableMultiPartUploadImplTest {
}
@Override
- public PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException {
+ public PutObjectResult putObject(String key, InputStream file, long length) throws IOException {
final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
return storeAndGetPutObjectResult(key, content);
}
@Override
+ public long getObject(String key, File targetLocation) throws IOException {
+ return 0;
+ }
+
+ @Override
public CompleteMultipartUploadResult commitMultiPartUpload(
String key,
String uploadId,
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f833471..473439c 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -35,8 +35,11 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,7 +75,7 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
}
@Override
- public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
+ public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException {
final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
return s3accessHelper.putObject(putRequest);
}
@@ -83,6 +86,26 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
}
@Override
+ public long getObject(String key, File targetLocation) throws IOException {
+ long numBytes = 0L;
+ try (
+ final OutputStream outStream = new FileOutputStream(targetLocation);
+ final org.apache.hadoop.fs.FSDataInputStream inStream =
+ s3a.open(new org.apache.hadoop.fs.Path('/' + key))
+ ) {
+ final byte[] buffer = new byte[32 * 1024];
+
+ int numRead;
+ while ((numRead = inStream.read(buffer)) > 0) {
+ outStream.write(buffer, 0, numRead);
+ numBytes += numRead;
+ }
+ }
+
+ return numBytes;
+ }
+
+ @Override
public ObjectMetadata getObjectMetadata(String key) throws IOException {
try {
return s3a.getObjectMetadata(new Path('/' + key));