You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/12/04 16:21:53 UTC

[flink] 04/06: [hotfix] Consolidated all S3 accesses under the S3AccessHelper.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf793009b5f16976efa0698f2d6ca5f4a4139c63
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 22 10:46:10 2018 +0100

    [hotfix] Consolidated all S3 accesses under the S3AccessHelper.
---
 .../writer/RecoverableMultiPartUploadImpl.java     |  2 +-
 .../flink/fs/s3/common/writer/S3AccessHelper.java  | 21 +++++++++---
 .../S3RecoverableMultipartUploadFactory.java       | 37 ++++++----------------
 .../writer/RecoverableMultiPartUploadImplTest.java | 11 +++++--
 .../flink/fs/s3hadoop/HadoopS3AccessHelper.java    | 25 ++++++++++++++-
 5 files changed, 58 insertions(+), 38 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index fe2a4cd..9f0a811 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -179,7 +179,7 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 			// they do not fall under the user's global TTL on S3.
 			// Figure out a way to clean them.
 
-			s3AccessHelper.uploadIncompletePart(incompletePartObjectName, inputStream, file.getPos());
+			s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos());
 		}
 		finally {
 			file.release();
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index 57920a5..dbc099a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -26,6 +26,7 @@ import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.UploadPartResult;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
@@ -66,10 +67,9 @@ public interface S3AccessHelper {
 	UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException;
 
 	/**
-	 * Uploads a part and associates it with the MPU with the provided {@code uploadId}.
-	 *
-	 * <p>Contrary to the {@link #uploadIncompletePart(String, InputStream, long)}, this part can
-	 * be smaller than the minimum part size imposed by S3.
+	 * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method,
+	 * this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection
+	 * policies specified for your S3 bucket.
 	 *
 	 * @param key the key used to identify this part.
 	 * @param file the (local) file holding the data to be uploaded.
@@ -77,7 +77,7 @@ public interface S3AccessHelper {
 	 * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part.
 	 * @throws IOException
 	 */
-	PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException;
+	PutObjectResult putObject(String key, InputStream file, long length) throws IOException;
 
 	/**
 	 * Finalizes a Multi-Part Upload.
@@ -93,6 +93,17 @@ public interface S3AccessHelper {
 	CompleteMultipartUploadResult commitMultiPartUpload(String key, String uploadId, List<PartETag> partETags, long length, AtomicInteger errorCount) throws IOException;
 
 	/**
+	 * Gets the object associated with the provided {@code key} from S3 and
+	 * puts it in the provided {@code targetLocation}.
+	 *
+	 * @param key the key of the object to fetch.
+	 * @param targetLocation the file to read the object to.
+	 * @return The number of bytes read.
+	 * @throws IOException
+	 */
+	long getObject(String key, File targetLocation) throws IOException;
+
+	/**
 	 * Fetches the metadata associated with a given key on S3.
 	 *
 	 * @param key the key.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
index 9a171ae..ddb09ab 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
@@ -19,10 +19,8 @@
 package org.apache.flink.fs.s3.common.writer;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.fs.s3.common.utils.BackPressuringExecutor;
-import org.apache.flink.fs.s3.common.utils.OffsetAwareOutputStream;
 import org.apache.flink.fs.s3.common.utils.RefCountedFile;
 import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.util.Preconditions;
@@ -74,7 +72,7 @@ final class S3RecoverableMultipartUploadFactory {
 	}
 
 	RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) throws IOException {
-		final Optional<File> incompletePart = downloadLastDataChunk(recoverable);
+		final Optional<File> incompletePart = recoverInProgressPart(recoverable);
 
 		return RecoverableMultiPartUploadImpl.recoverUpload(
 				s3AccessHelper,
@@ -86,36 +84,20 @@ final class S3RecoverableMultipartUploadFactory {
 				incompletePart);
 	}
 
-	@VisibleForTesting
-	Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws IOException {
+	private Optional<File> recoverInProgressPart(S3Recoverable recoverable) throws IOException {
 
-		final String objectName = recoverable.incompleteObjectName();
-		if (objectName == null) {
+		final String objectKey = recoverable.incompleteObjectName();
+		if (objectKey == null) {
 			return Optional.empty();
 		}
 
 		// download the file (simple way)
-		final RefCountedFile fileAndStream = tmpFileSupplier.apply(null);
-		final File file = fileAndStream.getFile();
-
-		long numBytes = 0L;
-
-		try (
-				final OffsetAwareOutputStream outStream = fileAndStream.getStream();
-				final org.apache.hadoop.fs.FSDataInputStream inStream =
-						fs.open(new org.apache.hadoop.fs.Path('/' + objectName))
-		) {
-			final byte[] buffer = new byte[32 * 1024];
-
-			int numRead;
-			while ((numRead = inStream.read(buffer)) > 0) {
-				outStream.write(buffer, 0, numRead);
-				numBytes += numRead;
-			}
-		}
+		final RefCountedFile refCountedFile = tmpFileSupplier.apply(null);
+		final File file = refCountedFile.getFile();
+		final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
 		// some sanity checks
-		if (numBytes != file.length() || numBytes != fileAndStream.getStream().getLength()) {
+		if (numBytes != file.length()) {
 			throw new IOException(String.format("Error recovering writer: " +
 							"Downloading the last data chunk file gives incorrect length. " +
 							"File=%d bytes, Stream=%d bytes",
@@ -132,8 +114,7 @@ final class S3RecoverableMultipartUploadFactory {
 		return Optional.of(file);
 	}
 
-	@VisibleForTesting
-	String pathToObjectName(final Path path) {
+	private String pathToObjectName(final Path path) {
 		org.apache.hadoop.fs.Path hadoopPath = HadoopFileSystem.toHadoopPath(path);
 		if (!hadoopPath.isAbsolute()) {
 			hadoopPath = new org.apache.hadoop.fs.Path(fs.getWorkingDirectory(), hadoopPath);
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 4c2f147..a986111 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -347,11 +347,11 @@ public class RecoverableMultiPartUploadImplTest {
 		private final List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> completePartsUploaded = new ArrayList<>();
 		private final List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> incompletePartsUploaded = new ArrayList<>();
 
-		public List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
+		List<RecoverableMultiPartUploadImplTest.TestUploadPartResult> getCompletePartsUploaded() {
 			return completePartsUploaded;
 		}
 
-		public List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
+		List<RecoverableMultiPartUploadImplTest.TestPutObjectResult> getIncompletePartsUploaded() {
 			return incompletePartsUploaded;
 		}
 
@@ -367,12 +367,17 @@ public class RecoverableMultiPartUploadImplTest {
 		}
 
 		@Override
-		public PutObjectResult uploadIncompletePart(String key, InputStream file, long length) throws IOException {
+		public PutObjectResult putObject(String key, InputStream file, long length) throws IOException {
 			final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
 			return storeAndGetPutObjectResult(key, content);
 		}
 
 		@Override
+		public long getObject(String key, File targetLocation) throws IOException {
+			return 0;
+		}
+
+		@Override
 		public CompleteMultipartUploadResult commitMultiPartUpload(
 				String key,
 				String uploadId,
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index f833471..473439c 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -35,8 +35,11 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -72,7 +75,7 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
 	}
 
 	@Override
-	public PutObjectResult uploadIncompletePart(String key, InputStream inputStream, long length) throws IOException {
+	public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException {
 		final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
 		return s3accessHelper.putObject(putRequest);
 	}
@@ -83,6 +86,26 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
 	}
 
 	@Override
+	public long getObject(String key, File targetLocation) throws IOException {
+		long numBytes = 0L;
+		try (
+				final OutputStream outStream = new FileOutputStream(targetLocation);
+				final org.apache.hadoop.fs.FSDataInputStream inStream =
+						s3a.open(new org.apache.hadoop.fs.Path('/' + key))
+		) {
+			final byte[] buffer = new byte[32 * 1024];
+
+			int numRead;
+			while ((numRead = inStream.read(buffer)) > 0) {
+				outStream.write(buffer, 0, numRead);
+				numBytes += numRead;
+			}
+		}
+
+		return numBytes;
+	}
+
+	@Override
 	public ObjectMetadata getObjectMetadata(String key) throws IOException {
 		try {
 			return s3a.getObjectMetadata(new Path('/' + key));