You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/01/16 12:34:39 UTC

[flink] branch release-1.7 updated: [FLINK-11187] [s3] Use file over stream for writes

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new d5506b9  [FLINK-11187] [s3] Use file over stream for writes
d5506b9 is described below

commit d5506b9191f0af0346e10561df783815e7cd0565
Author: Addison Higham <ah...@instructure.com>
AuthorDate: Tue Jan 8 15:32:28 2019 -0700

    [FLINK-11187] [s3] Use file over stream for writes
    
    This changes the S3AccessHelper API to take a file instead of an input
    stream.
    
    This allows s3 client to properly reset a file instead of a file over
    stream for writes.
    
    This fixes an issue where the underlying s3 implementation has an
    intermittent failure, tries to reset the stream, fails to do so, and
    results in hung requests with delayed errors.
---
 .../fs/s3/common/utils/RefCountedBufferingFileStream.java  |  7 ++-----
 .../flink/fs/s3/common/utils/RefCountedFSOutputStream.java |  8 ++++----
 .../s3/common/writer/RecoverableMultiPartUploadImpl.java   |  9 ++++-----
 .../apache/flink/fs/s3/common/writer/S3AccessHelper.java   | 12 +++++-------
 .../s3/common/utils/RefCountedBufferingFileStreamTest.java |  3 ++-
 .../common/writer/RecoverableMultiPartUploadImplTest.java  | 14 +++++++-------
 .../common/writer/S3RecoverableFsDataOutputStreamTest.java |  9 ++++-----
 .../org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java |  9 ++++-----
 8 files changed, 32 insertions(+), 39 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
index 8f3aff8..29f2590 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStream.java
@@ -24,9 +24,6 @@ import org.apache.flink.util.function.FunctionWithException;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.Files;
-import java.nio.file.StandardOpenOption;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,8 +61,8 @@ public class RefCountedBufferingFileStream extends RefCountedFSOutputStream {
 	}
 
 	@Override
-	public InputStream getInputStream() throws IOException {
-		return Files.newInputStream(currentTmpFile.getFile().toPath(), StandardOpenOption.READ);
+	public File getInputFile() {
+		return currentTmpFile.getFile();
 	}
 
 	@Override
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
index d4b962e..d51e37e 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/RefCountedFSOutputStream.java
@@ -21,8 +21,8 @@ package org.apache.flink.fs.s3.common.utils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataOutputStream;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 
 /**
  * A {@link FSDataOutputStream} with the {@link RefCounted} functionality.
@@ -31,11 +31,11 @@ import java.io.InputStream;
 public abstract class RefCountedFSOutputStream extends FSDataOutputStream implements RefCounted {
 
 	/**
-	 * Gets an {@link InputStream} that allows to read the contents of the file.
+	 * Gets the underlying {@link File} that allows to read the contents of the file.
 	 *
-	 * @return An input stream to the contents of the file.
+	 * @return A handle to the File object.
 	 */
-	public abstract InputStream getInputStream() throws IOException;
+	public abstract File getInputFile();
 
 	/**
 	 * Checks if the file is closed for writes.
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
index 9d88e65..0d0998a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImpl.java
@@ -30,7 +30,6 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Deque;
@@ -173,8 +172,8 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 		// first, upload the trailing data file. during that time, other in-progress uploads may complete.
 		final String incompletePartObjectName = createIncompletePartObjectName();
 		file.retain();
-		try (InputStream inputStream = file.getInputStream()) {
-			s3AccessHelper.putObject(incompletePartObjectName, inputStream, file.getPos());
+		try {
+			s3AccessHelper.putObject(incompletePartObjectName, file.getInputFile());
 		}
 		finally {
 			file.release();
@@ -315,8 +314,8 @@ final class RecoverableMultiPartUploadImpl implements RecoverableMultiPartUpload
 
 		@Override
 		public void run() {
-			try (final InputStream inputStream = file.getInputStream()) {
-				final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, inputStream, file.getPos());
+			try {
+				final UploadPartResult result = s3AccessHelper.uploadPart(objectName, uploadId, partNumber, file.getInputFile(), file.getPos());
 				future.complete(new PartETag(result.getPartNumber(), result.getETag()));
 				file.release();
 			}
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
index bcdea3c..593d9d3 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3AccessHelper.java
@@ -28,7 +28,6 @@ import com.amazonaws.services.s3.model.UploadPartResult;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -59,25 +58,24 @@ public interface S3AccessHelper {
 	 * @param key the key this MPU is associated with.
 	 * @param uploadId the id of the MPU.
 	 * @param partNumber the number of the part being uploaded (has to be in [1 ... 10000]).
-	 * @param file the (local) file holding the part to be uploaded.
+	 * @param inputFile the (local) file holding the part to be uploaded.
 	 * @param length the length of the part.
 	 * @return The {@link UploadPartResult result} of the attempt to upload the part.
 	 * @throws IOException
 	 */
-	UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException;
+	UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException;
 
 	/**
-	 * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, InputStream, long)} method,
+	 * Uploads an object to S3. Contrary to the {@link #uploadPart(String, String, int, File, long)} method,
 	 * this object is not going to be associated to any MPU and, as such, it is not subject to the garbage collection
 	 * policies specified for your S3 bucket.
 	 *
 	 * @param key the key used to identify this part.
-	 * @param file the (local) file holding the data to be uploaded.
-	 * @param length the size of the data to be uploaded.
+	 * @param inputFile the (local) file holding the data to be uploaded.
 	 * @return The {@link PutObjectResult result} of the attempt to stage the incomplete part.
 	 * @throws IOException
 	 */
-	PutObjectResult putObject(String key, InputStream file, long length) throws IOException;
+	PutObjectResult putObject(String key, File inputFile) throws IOException;
 
 	/**
 	 * Finalizes a Multi-Part Upload.
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
index 5b7d1cc..50ea9bd 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/utils/RefCountedBufferingFileStreamTest.java
@@ -24,6 +24,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
@@ -121,7 +122,7 @@ public class RefCountedBufferingFileStreamTest {
 		Assert.assertEquals(contentToWrite.length, stream.getPos());
 
 		final byte[] contentRead = new byte[contentToWrite.length];
-		stream.getInputStream().read(contentRead, 0, contentRead.length);
+		new FileInputStream(stream.getInputFile()).read(contentRead, 0, contentRead.length);
 		Assert.assertTrue(Arrays.equals(contentToWrite, contentRead));
 
 		stream.release();
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
index 673796d..0194065 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/RecoverableMultiPartUploadImplTest.java
@@ -35,8 +35,8 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -361,14 +361,14 @@ public class RecoverableMultiPartUploadImplTest {
 		}
 
 		@Override
-		public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream file, long length) throws IOException {
-			final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
+		public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
+			final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(length));
 			return storeAndGetUploadPartResult(key, partNumber, content);
 		}
 
 		@Override
-		public PutObjectResult putObject(String key, InputStream file, long length) throws IOException {
-			final byte[] content = getFileContentBytes(file, MathUtils.checkedDownCast(length));
+		public PutObjectResult putObject(String key, File inputFile) throws IOException {
+			final byte[] content = getFileContentBytes(inputFile, MathUtils.checkedDownCast(inputFile.length()));
 			return storeAndGetPutObjectResult(key, content);
 		}
 
@@ -397,9 +397,9 @@ public class RecoverableMultiPartUploadImplTest {
 			return null;
 		}
 
-		private byte[] getFileContentBytes(InputStream file, int length) throws IOException {
+		private byte[] getFileContentBytes(File file, int length) throws IOException {
 			final byte[] content = new byte[length];
-			file.read(content, 0, length);
+			new FileInputStream(file).read(content, 0, length);
 			return content;
 		}
 
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
index 7a32392..14ed2e2 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/writer/S3RecoverableFsDataOutputStreamTest.java
@@ -39,8 +39,8 @@ import javax.annotation.Nullable;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.FileAlreadyExistsException;
@@ -314,10 +314,9 @@ public class S3RecoverableFsDataOutputStreamTest {
 
 	private static byte[] readFileContents(RefCountedFSOutputStream file) throws IOException {
 		final byte[] content = new byte[MathUtils.checkedDownCast(file.getPos())];
-		try (InputStream inputStream = file.getInputStream()) {
-			int bytesRead = inputStream.read(content, 0, content.length); // TODO: 10/2/18 see if closed in download
-			Assert.assertEquals(file.getPos(), bytesRead);
-		}
+		File inputFile = file.getInputFile();
+		long bytesRead = new FileInputStream(inputFile).read(content, 0,  MathUtils.checkedDownCast(inputFile.length()));
+		Assert.assertEquals(file.getPos(), bytesRead);
 		return content;
 	}
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
index b9612ad..4d37ce0 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/HadoopS3AccessHelper.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.fs.s3a.WriteOperationHelper;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -68,15 +67,15 @@ public class HadoopS3AccessHelper implements S3AccessHelper {
 	}
 
 	@Override
-	public UploadPartResult uploadPart(String key, String uploadId, int partNumber, InputStream inputStream, long length) throws IOException {
+	public UploadPartResult uploadPart(String key, String uploadId, int partNumber, File inputFile, long length) throws IOException {
 		final UploadPartRequest uploadRequest = s3accessHelper.newUploadPartRequest(
-				key, uploadId, partNumber, MathUtils.checkedDownCast(length), inputStream, null, 0L);
+			key, uploadId, partNumber, MathUtils.checkedDownCast(length), null, inputFile, 0L);
 		return s3accessHelper.uploadPart(uploadRequest);
 	}
 
 	@Override
-	public PutObjectResult putObject(String key, InputStream inputStream, long length) throws IOException {
-		final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputStream, length);
+	public PutObjectResult putObject(String key, File inputFile) throws IOException {
+		final PutObjectRequest putRequest = s3accessHelper.createPutObjectRequest(key, inputFile);
 		return s3accessHelper.putObject(putRequest);
 	}