You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/03 15:06:01 UTC

[GitHub] kl0u commented on a change in pull request #7161: [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files.

kl0u commented on a change in pull request #7161: [FLINK-10963][fs-connector, s3] Cleanup tmp S3 objects uploaded as backups of in-progress files.
URL: https://github.com/apache/flink/pull/7161#discussion_r238303737
 
 

 ##########
 File path: flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/writer/S3RecoverableMultipartUploadFactory.java
 ##########
 @@ -86,41 +84,17 @@ RecoverableMultiPartUpload recoverRecoverableUpload(S3Recoverable recoverable) t
 				incompletePart);
 	}
 
-	@VisibleForTesting
-	Optional<File> downloadLastDataChunk(S3Recoverable recoverable) throws IOException {
+	private Optional<File> recoverInProgressPart(S3Recoverable recoverable) throws IOException {
 
-		final String objectName = recoverable.incompleteObjectName();
-		if (objectName == null) {
+		final String objectKey = recoverable.incompleteObjectName();
+		if (objectKey == null) {
 			return Optional.empty();
 		}
 
 		// download the file (simple way)
-		final RefCountedFile fileAndStream = tmpFileSupplier.apply(null);
-		final File file = fileAndStream.getFile();
-
-		long numBytes = 0L;
-
-		try (
-				final OffsetAwareOutputStream outStream = fileAndStream.getStream();
-				final org.apache.hadoop.fs.FSDataInputStream inStream =
-						fs.open(new org.apache.hadoop.fs.Path('/' + objectName))
-		) {
-			final byte[] buffer = new byte[32 * 1024];
-
-			int numRead;
-			while ((numRead = inStream.read(buffer)) > 0) {
-				outStream.write(buffer, 0, numRead);
-				numBytes += numRead;
-			}
-		}
-
-		// some sanity checks
-		if (numBytes != file.length() || numBytes != fileAndStream.getStream().getLength()) {
-			throw new IOException(String.format("Error recovering writer: " +
-							"Downloading the last data chunk file gives incorrect length. " +
-							"File=%d bytes, Stream=%d bytes",
-					file.length(), numBytes));
-		}
+		final RefCountedFile refCountedFile = tmpFileSupplier.apply(null);
+		final File file = refCountedFile.getFile();
+		final long numBytes = s3AccessHelper.getObject(objectKey, file);
 
 Review comment:
   It is ok because eventually we just end up wrapping the file in a new `RefCountedFile`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services