You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2020/11/21 18:01:42 UTC
[iceberg] branch master updated: AWS: Add progressive multipart
upload to S3FileIO (#1767)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5e3f919 AWS: Add progressive multipart upload to S3FileIO (#1767)
5e3f919 is described below
commit 5e3f9198e5675a852df4f0e1c28b4e3cf6630f86
Author: Daniel Weeks <dw...@apache.org>
AuthorDate: Sat Nov 21 10:01:26 2020 -0800
AWS: Add progressive multipart upload to S3FileIO (#1767)
* AWS: Add progressive multipart upload to S3FileIO
* Fix test after rebase
* Simply the complete and ensure parts are in order
* Add sort to stream
* Checkstyle
* Add abort attempt back to complete
* Initialize only once
* Add executor service for async tasks per errorprone
* Address comments
* Refactor setting encryption for requests
* Fix defaults to no-arg AwsProperties
* Address some failure cases and add more testing
* Checkstyle
---
.../java/org/apache/iceberg/aws/AwsProperties.java | 70 ++++++
.../java/org/apache/iceberg/aws/s3/S3FileIO.java | 1 -
.../org/apache/iceberg/aws/s3/S3InputStream.java | 7 +-
.../org/apache/iceberg/aws/s3/S3OutputStream.java | 269 ++++++++++++++++++---
.../org/apache/iceberg/aws/s3/S3RequestUtil.java | 124 ++++++++++
.../apache/iceberg/aws/s3/S3OutputStreamTest.java | 151 +++++++++---
.../main/java/org/apache/iceberg/GuavaClasses.java | 2 +
.../java/org/apache/iceberg/util/PropertyUtil.java | 9 +
8 files changed, 548 insertions(+), 85 deletions(-)
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
index 358a8c9..c2f58be 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -82,9 +82,40 @@ public class AwsProperties {
public static final String GLUE_CATALOG_SKIP_ARCHIVE = "gluecatalog.skip-archive";
public static final boolean GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT = false;
+ /**
+ * Number of threads to use for uploading parts to S3 (shared pool across all output streams).
+ */
+ public static final String S3FILEIO_MULTIPART_UPLOAD_THREADS = "s3fileio.multipart.num-threads";
+
+ /**
+ * The size of a single part for multipart upload requests (default: 32MB).
+ */
+ public static final String S3FILEIO_MULTIPART_SIZE = "s3fileio.multipart.part.size";
+
+ /**
+ * The threshold expressed as a factor times the multipart size at which to
+ * switch from uploading using a single put object request to uploading using multipart upload
+ * (default: 1.5).
+ */
+ public static final String S3FILEIO_MULTIPART_THRESHOLD_FACTOR = "s3fileio.multipart.threshold";
+
+ /**
+ * Location to put staging files for upload to S3.
+ */
+ public static final String S3FILEIO_STAGING_DIRECTORY = "s3fileio.staging.dir";
+
+
+ static final int MIN_MULTIPART_UPLOAD_SIZE = 5 * 1024 * 1024;
+ static final int DEFAULT_MULTIPART_SIZE = 32 * 1024 * 1024;
+ static final double DEFAULT_MULTIPART_THRESHOLD = 1.5;
+
private String s3FileIoSseType;
private String s3FileIoSseKey;
private String s3FileIoSseMd5;
+ private int s3FileIoMultipartUploadThreads;
+ private int s3FileIoMultiPartSize;
+ private double s3FileIoMultipartThresholdFactor;
+ private String s3fileIoStagingDirectory;
private String glueCatalogId;
private boolean glueCatalogSkipArchive;
@@ -94,6 +125,11 @@ public class AwsProperties {
this.s3FileIoSseKey = null;
this.s3FileIoSseMd5 = null;
+ this.s3FileIoMultipartUploadThreads = Runtime.getRuntime().availableProcessors();
+ this.s3FileIoMultiPartSize = DEFAULT_MULTIPART_SIZE;
+ this.s3FileIoMultipartThresholdFactor = DEFAULT_MULTIPART_THRESHOLD;
+ this.s3fileIoStagingDirectory = System.getProperty("java.io.tmpdir");
+
this.glueCatalogId = null;
this.glueCatalogSkipArchive = GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT;
}
@@ -111,6 +147,24 @@ public class AwsProperties {
this.glueCatalogId = properties.get(GLUE_CATALOG_ID);
this.glueCatalogSkipArchive = PropertyUtil.propertyAsBoolean(properties,
AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE, AwsProperties.GLUE_CATALOG_SKIP_ARCHIVE_DEFAULT);
+
+ this.s3FileIoMultipartUploadThreads = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_UPLOAD_THREADS,
+ Runtime.getRuntime().availableProcessors());
+
+ this.s3FileIoMultiPartSize = PropertyUtil.propertyAsInt(properties, S3FILEIO_MULTIPART_SIZE,
+ DEFAULT_MULTIPART_SIZE);
+
+ this.s3FileIoMultipartThresholdFactor = PropertyUtil.propertyAsDouble(properties,
+ S3FILEIO_MULTIPART_THRESHOLD_FACTOR, DEFAULT_MULTIPART_THRESHOLD);
+
+ Preconditions.checkArgument(s3FileIoMultipartThresholdFactor >= 1.0,
+ "Multipart threshold factor must be >= to 1.0");
+
+ Preconditions.checkArgument(s3FileIoMultiPartSize >= MIN_MULTIPART_UPLOAD_SIZE,
+ "Minimum multipart upload object size must be larger than 5 MB.");
+
+ this.s3fileIoStagingDirectory = PropertyUtil.propertyAsString(properties, S3FILEIO_STAGING_DIRECTORY,
+ System.getProperty("java.io.tmpdir"));
}
public String s3FileIoSseType() {
@@ -152,4 +206,20 @@ public class AwsProperties {
public void setGlueCatalogSkipArchive(boolean skipArchive) {
this.glueCatalogSkipArchive = skipArchive;
}
+
+ public int s3FileIoMultipartUploadThreads() {
+ return s3FileIoMultipartUploadThreads;
+ }
+
+ public int s3FileIoMultiPartSize() {
+ return s3FileIoMultiPartSize;
+ }
+
+ public double s3FileIOMultipartThresholdFactor() {
+ return s3FileIoMultipartThresholdFactor;
+ }
+
+ public String getS3fileIoStagingDirectory() {
+ return s3fileIoStagingDirectory;
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 443c58f..edcd87a 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -39,7 +39,6 @@ import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
public class S3FileIO implements FileIO {
private final SerializableSupplier<S3Client> s3;
private AwsProperties awsProperties;
-
private transient S3Client client;
public S3FileIO() {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
index 2d3962b..7d58fc4 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
class S3InputStream extends SeekableInputStream {
private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);
@@ -139,11 +138,7 @@ class S3InputStream extends SeekableInputStream {
.key(location.key())
.range(String.format("bytes=%s-", pos));
- if (AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(awsProperties.s3FileIoSseType())) {
- requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
- requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
- requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
- }
+ S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
closeStream();
stream = s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 42b9608..15fd19f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -19,51 +19,98 @@
package org.apache.iceberg.aws.s3;
+import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
import java.util.Arrays;
-import java.util.Locale;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Predicates;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.io.CountingOutputStream;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
class S3OutputStream extends PositionOutputStream {
private static final Logger LOG = LoggerFactory.getLogger(S3OutputStream.class);
+ private static volatile ExecutorService executorService;
+
private final StackTraceElement[] createStack;
private final S3Client s3;
private final S3URI location;
private final AwsProperties awsProperties;
- private final OutputStream stream;
- private final File stagingFile;
- private long pos = 0;
+ private CountingOutputStream stream;
+ private final List<File> stagingFiles = Lists.newArrayList();
+ private final File stagingDirectory;
+ private File currentStagingFile;
+ private String multipartUploadId;
+ private final Map<File, CompletableFuture<CompletedPart>> multiPartMap = Maps.newHashMap();
+ private final int multiPartSize;
+ private final int multiPartThresholdSize;
+ private long pos = 0;
private boolean closed = false;
- S3OutputStream(S3Client s3, S3URI location) throws IOException {
- this(s3, location, new AwsProperties());
- }
-
S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) throws IOException {
+ if (executorService == null) {
+ synchronized (this) {
+ if (executorService == null) {
+ executorService = MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(
+ awsProperties.s3FileIoMultipartUploadThreads(),
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("iceberg-s3fileio-upload-%d")
+ .build()));
+ }
+ }
+ }
+
this.s3 = s3;
this.location = location;
this.awsProperties = awsProperties;
createStack = Thread.currentThread().getStackTrace();
- stagingFile = File.createTempFile("s3fileio-", ".tmp");
- stream = new BufferedOutputStream(new FileOutputStream(stagingFile));
- stagingFile.deleteOnExit();
+ multiPartSize = awsProperties.s3FileIoMultiPartSize();
+ multiPartThresholdSize = (int) (multiPartSize * awsProperties.s3FileIOMultipartThresholdFactor());
+ stagingDirectory = new File(awsProperties.getS3fileIoStagingDirectory());
+
+ newStream();
}
@Override
@@ -78,14 +125,60 @@ class S3OutputStream extends PositionOutputStream {
@Override
public void write(int b) throws IOException {
+ if (stream.getCount() >= multiPartSize) {
+ newStream();
+ uploadParts();
+ }
+
stream.write(b);
pos += 1;
+
+ // switch to multipart upload
+ if (multipartUploadId == null && pos >= multiPartThresholdSize) {
+ initializeMultiPartUpload();
+ uploadParts();
+ }
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
- stream.write(b, off, len);
+ int remaining = len;
+ int relativeOffset = off;
+
+ // Write the remainder of the part size to the staging file
+ // and continue to write new staging files if the write is
+ // larger than the part size.
+ while (stream.getCount() + remaining > multiPartSize) {
+ int writeSize = multiPartSize - (int) stream.getCount();
+
+ stream.write(b, relativeOffset, writeSize);
+ remaining -= writeSize;
+ relativeOffset += writeSize;
+
+ newStream();
+ uploadParts();
+ }
+
+ stream.write(b, relativeOffset, remaining);
pos += len;
+
+ // switch to multipart upload
+ if (multipartUploadId == null && pos >= multiPartThresholdSize) {
+ initializeMultiPartUpload();
+ uploadParts();
+ }
+ }
+
+ private void newStream() throws IOException {
+ if (stream != null) {
+ stream.close();
+ }
+
+ currentStagingFile = File.createTempFile("s3fileio-", ".tmp", stagingDirectory);
+ currentStagingFile.deleteOnExit();
+ stagingFiles.add(currentStagingFile);
+
+ stream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(currentStagingFile)));
}
@Override
@@ -100,42 +193,138 @@ class S3OutputStream extends PositionOutputStream {
try {
stream.close();
- PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder()
- .bucket(location.bucket())
- .key(location.key());
+ completeUploads();
+ } finally {
+ cleanUpStagingFiles();
+ }
+ }
+
+ private void initializeMultiPartUpload() {
+ CreateMultipartUploadRequest.Builder requestBuilder = CreateMultipartUploadRequest.builder()
+ .bucket(location.bucket()).key(location.key());
+ S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
+
+ multipartUploadId = s3.createMultipartUpload(requestBuilder.build()).uploadId();
+ }
- switch (awsProperties.s3FileIoSseType().toLowerCase(Locale.ENGLISH)) {
- case AwsProperties.S3FILEIO_SSE_TYPE_NONE:
- break;
+ private void uploadParts() {
+ // exit if multipart has not been initiated
+ if (multipartUploadId == null) {
+ return;
+ }
- case AwsProperties.S3FILEIO_SSE_TYPE_KMS:
- requestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
- requestBuilder.ssekmsKeyId(awsProperties.s3FileIoSseKey());
- break;
+ stagingFiles.stream()
+ // do not upload the file currently being written
+ .filter(f -> closed || !f.equals(currentStagingFile))
+ // do not upload any files that have already been processed
+ .filter(Predicates.not(multiPartMap::containsKey))
+ .forEach(f -> {
+ UploadPartRequest.Builder requestBuilder = UploadPartRequest.builder()
+ .bucket(location.bucket())
+ .key(location.key())
+ .uploadId(multipartUploadId)
+ .partNumber(stagingFiles.indexOf(f) + 1)
+ .contentLength(f.length());
- case AwsProperties.S3FILEIO_SSE_TYPE_S3:
- requestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
- break;
+ S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
- case AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM:
- requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
- requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
- requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
- break;
+ UploadPartRequest uploadRequest = requestBuilder.build();
- default:
- throw new IllegalArgumentException(
- "Cannot support given S3 encryption type: " + awsProperties.s3FileIoSseType());
- }
+ CompletableFuture<CompletedPart> future = CompletableFuture.supplyAsync(
+ () -> {
+ UploadPartResponse response = s3.uploadPart(uploadRequest, RequestBody.fromFile(f));
+ return CompletedPart.builder().eTag(response.eTag()).partNumber(uploadRequest.partNumber()).build();
+ },
+ executorService
+ ).whenComplete((result, thrown) -> {
+ try {
+ Files.deleteIfExists(f.toPath());
+ } catch (IOException e) {
+ LOG.warn("Failed to delete staging file: {}", f, e);
+ }
- s3.putObject(requestBuilder.build(), RequestBody.fromFile(stagingFile));
- } finally {
- if (!stagingFile.delete()) {
- LOG.warn("Could not delete temporary file: {}", stagingFile);
+ if (thrown != null) {
+ LOG.error("Failed to upload part: {}", uploadRequest, thrown);
+ abortUpload();
+ }
+ });
+
+ multiPartMap.put(f, future);
+ });
+ }
+
+ private void completeMultiPartUpload() {
+ Preconditions.checkState(closed, "Complete upload called on open stream: " + location);
+
+ List<CompletedPart> completedParts =
+ multiPartMap.values()
+ .stream()
+ .map(CompletableFuture::join)
+ .sorted(Comparator.comparing(CompletedPart::partNumber))
+ .collect(Collectors.toList());
+
+ CompleteMultipartUploadRequest request = CompleteMultipartUploadRequest.builder()
+ .bucket(location.bucket()).key(location.key())
+ .uploadId(multipartUploadId)
+ .multipartUpload(CompletedMultipartUpload.builder().parts(completedParts).build()).build();
+
+ Tasks.foreach(request)
+ .noRetry()
+ .onFailure((r, thrown) -> {
+ LOG.error("Failed to complete multipart upload request: {}", r, thrown);
+ abortUpload();
+ })
+ .throwFailureWhenFinished()
+ .run(s3::completeMultipartUpload);
+ }
+
+ private void abortUpload() {
+ if (multipartUploadId != null) {
+ try {
+ s3.abortMultipartUpload(AbortMultipartUploadRequest.builder()
+ .bucket(location.bucket()).key(location.key()).uploadId(multipartUploadId).build());
+ } finally {
+ cleanUpStagingFiles();
}
}
}
+ private void cleanUpStagingFiles() {
+ Tasks.foreach(stagingFiles)
+ .suppressFailureWhenFinished()
+ .onFailure((file, thrown) -> LOG.warn("Failed to delete staging file: {}", file, thrown))
+ .run(File::delete);
+ }
+
+ private void completeUploads() {
+ if (multipartUploadId == null) {
+ long contentLength = stagingFiles.stream().mapToLong(File::length).sum();
+ InputStream contentStream = new BufferedInputStream(stagingFiles.stream()
+ .map(S3OutputStream::uncheckedInputStream)
+ .reduce(SequenceInputStream::new)
+ .orElseGet(() -> new ByteArrayInputStream(new byte[0])));
+
+ PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder()
+ .bucket(location.bucket())
+ .key(location.key());
+
+ S3RequestUtil.configureEncryption(awsProperties, requestBuilder);
+
+ s3.putObject(requestBuilder.build(), RequestBody.fromInputStream(contentStream, contentLength));
+ } else {
+ uploadParts();
+ completeMultiPartUpload();
+ }
+ }
+
+ private static InputStream uncheckedInputStream(File file) {
+ try {
+ return new FileInputStream(file);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
@SuppressWarnings("checkstyle:NoFinalizer")
@Override
protected void finalize() throws Throwable {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3RequestUtil.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3RequestUtil.java
new file mode 100644
index 0000000..5f9309e
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3RequestUtil.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.s3;
+
+import java.util.Locale;
+import org.apache.iceberg.aws.AwsProperties;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
+
+public class S3RequestUtil {
+
+ private S3RequestUtil() {
+ }
+
+ static void configureEncryption(AwsProperties awsProperties, PutObjectRequest.Builder requestBuilder) {
+ switch (awsProperties.s3FileIoSseType().toLowerCase(Locale.ENGLISH)) {
+ case AwsProperties.S3FILEIO_SSE_TYPE_NONE:
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_KMS:
+ requestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
+ requestBuilder.ssekmsKeyId(awsProperties.s3FileIoSseKey());
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_S3:
+ requestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM:
+ requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+ requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+ requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot support given S3 encryption type: " + awsProperties.s3FileIoSseType());
+ }
+ }
+
+ static void configureEncryption(AwsProperties awsProperties, CreateMultipartUploadRequest.Builder requestBuilder) {
+ switch (awsProperties.s3FileIoSseType().toLowerCase(Locale.ENGLISH)) {
+ case AwsProperties.S3FILEIO_SSE_TYPE_NONE:
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_KMS:
+ requestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
+ requestBuilder.ssekmsKeyId(awsProperties.s3FileIoSseKey());
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_S3:
+ requestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM:
+ requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+ requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+ requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot support given S3 encryption type: " + awsProperties.s3FileIoSseType());
+ }
+ }
+
+ static void configureEncryption(AwsProperties awsProperties, UploadPartRequest.Builder requestBuilder) {
+ switch (awsProperties.s3FileIoSseType().toLowerCase(Locale.ENGLISH)) {
+ case AwsProperties.S3FILEIO_SSE_TYPE_NONE:
+ case AwsProperties.S3FILEIO_SSE_TYPE_KMS:
+ case AwsProperties.S3FILEIO_SSE_TYPE_S3:
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM:
+ requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+ requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+ requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot support given S3 encryption type: " + awsProperties.s3FileIoSseType());
+ }
+ }
+
+ static void configureEncryption(AwsProperties awsProperties, GetObjectRequest.Builder requestBuilder) {
+ switch (awsProperties.s3FileIoSseType().toLowerCase(Locale.ENGLISH)) {
+ case AwsProperties.S3FILEIO_SSE_TYPE_NONE:
+ case AwsProperties.S3FILEIO_SSE_TYPE_KMS:
+ case AwsProperties.S3FILEIO_SSE_TYPE_S3:
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM:
+ requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+ requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+ requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot support given S3 encryption type: " + awsProperties.s3FileIoSseType());
+ }
+ }
+}
diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
index 3e1b355..b4dc1ec 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/S3OutputStreamTest.java
@@ -21,86 +21,151 @@ package org.apache.iceberg.aws.s3;
import com.adobe.testing.s3mock.junit4.S3MockRule;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Stream;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
public class S3OutputStreamTest {
+ private static final Logger LOG = LoggerFactory.getLogger(S3OutputStreamTest.class);
+ private static final String BUCKET = "test-bucket";
+
@ClassRule
public static final S3MockRule S3_MOCK_RULE = S3MockRule.builder().silent().build();
private final S3Client s3 = S3_MOCK_RULE.createS3ClientV2();
+ private final S3Client s3mock = mock(S3Client.class, delegatesTo(s3));
private final Random random = new Random(1);
+ private final Path tmpDir = Files.createTempDirectory("s3fileio-test-");
+
+ private final AwsProperties properties = new AwsProperties(ImmutableMap.of(
+ AwsProperties.S3FILEIO_MULTIPART_SIZE, Integer.toString(5 * 1024 * 1024),
+ AwsProperties.S3FILEIO_STAGING_DIRECTORY, tmpDir.toString()));
+
+ public S3OutputStreamTest() throws IOException {
+ }
@Before
public void before() {
- s3.createBucket(CreateBucketRequest.builder().bucket("bucket").build());
+ s3.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
}
@Test
- public void getPos() throws IOException {
- S3URI uri = new S3URI("s3://bucket/path/to/pos.dat");
- int writeSize = 1024;
-
- try (S3OutputStream stream = new S3OutputStream(s3, uri)) {
- stream.write(new byte[writeSize]);
- assertEquals(writeSize, stream.getPos());
- }
+ public void testWrite() {
+ // Run tests for both byte and array write paths
+ Stream.of(true, false).forEach(arrayWrite -> {
+ // Test small file write (less than multipart threshold)
+ writeAndVerify(s3mock, randomURI(), randomData(1024), arrayWrite);
+ verify(s3mock, times(1)).putObject((PutObjectRequest) any(), (RequestBody) any());
+ reset(s3mock);
+
+ // Test file larger than part size but less than multipart threshold
+ writeAndVerify(s3mock, randomURI(), randomData(6 * 1024 * 1024), arrayWrite);
+ verify(s3mock, times(1)).putObject((PutObjectRequest) any(), (RequestBody) any());
+ reset(s3mock);
+
+ // Test file large enough to trigger multipart upload
+ writeAndVerify(s3mock, randomURI(), randomData(10 * 1024 * 1024), arrayWrite);
+ verify(s3mock, times(2)).uploadPart((UploadPartRequest) any(), (RequestBody) any());
+ reset(s3mock);
+
+ // Test uploading many parts
+ writeAndVerify(s3mock, randomURI(), randomData(22 * 1024 * 1024), arrayWrite);
+ verify(s3mock, times(5)).uploadPart((UploadPartRequest) any(), (RequestBody) any());
+ reset(s3mock);
+ });
}
@Test
- public void testWrite() throws IOException {
- S3URI uri = new S3URI("s3://bucket/path/to/out.dat");
- int size = 5 * 1024 * 1024;
- byte [] expected = new byte[size];
- random.nextBytes(expected);
-
- try (S3OutputStream stream = new S3OutputStream(s3, uri)) {
- for (int i = 0; i < size; i++) {
- stream.write(expected[i]);
- assertEquals(i + 1, stream.getPos());
- }
- }
-
- byte [] actual = readS3Data(uri);
+ public void testAbortAfterFailedPartUpload() {
+ doThrow(new RuntimeException()).when(s3mock).uploadPart((UploadPartRequest) any(), (RequestBody) any());
- assertArrayEquals(expected, actual);
+ try (S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties)) {
+ stream.write(randomData(10 * 1024 * 1024));
+ } catch (Exception e) {
+ verify(s3mock, atLeastOnce()).abortMultipartUpload((AbortMultipartUploadRequest) any());
+ }
}
@Test
- public void testWriteArray() throws IOException {
- S3URI uri = new S3URI("s3://bucket/path/to/array-out.dat");
- byte [] expected = new byte[5 * 1024 * 1024];
- random.nextBytes(expected);
-
- try (S3OutputStream stream = new S3OutputStream(s3, uri)) {
- stream.write(expected);
- assertEquals(expected.length, stream.getPos());
- }
-
- byte [] actual = readS3Data(uri);
+ public void testAbortMultipart() {
+ doThrow(new RuntimeException()).when(s3mock).completeMultipartUpload((CompleteMultipartUploadRequest) any());
- assertArrayEquals(expected, actual);
+ try (S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties)) {
+ stream.write(randomData(10 * 1024 * 1024));
+ } catch (Exception e) {
+ verify(s3mock).abortMultipartUpload((AbortMultipartUploadRequest) any());
+ }
}
@Test
public void testMultipleClose() throws IOException {
- S3URI uri = new S3URI("s3://bucket/path/to/array-out.dat");
- S3OutputStream stream = new S3OutputStream(s3, uri);
+ S3OutputStream stream = new S3OutputStream(s3, randomURI(), properties);
stream.close();
stream.close();
}
+ private void writeAndVerify(S3Client client, S3URI uri, byte [] data, boolean arrayWrite) {
+ try (S3OutputStream stream = new S3OutputStream(client, uri, properties)) {
+ if (arrayWrite) {
+ stream.write(data);
+ assertEquals(data.length, stream.getPos());
+ } else {
+ for (int i = 0; i < data.length; i++) {
+ stream.write(data[i]);
+ assertEquals(i + 1, stream.getPos());
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
+ byte[] actual = readS3Data(uri);
+ assertArrayEquals(data, actual);
+
+ // Verify all staging files are cleaned up
+ try {
+ assertEquals(0, Files.list(tmpDir).count());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private byte[] readS3Data(S3URI uri) {
ResponseBytes<GetObjectResponse> data =
s3.getObject(GetObjectRequest.builder().bucket(uri.bucket()).key(uri.key()).build(),
@@ -108,4 +173,14 @@ public class S3OutputStreamTest {
return data.asByteArray();
}
+
+ private byte[] randomData(int size) {
+ byte [] result = new byte[size];
+ random.nextBytes(result);
+ return result;
+ }
+
+ private S3URI randomURI() {
+ return new S3URI(String.format("s3://%s/data/%s.dat", BUCKET, UUID.randomUUID()));
+ }
}
diff --git a/bundled-guava/src/main/java/org/apache/iceberg/GuavaClasses.java b/bundled-guava/src/main/java/org/apache/iceberg/GuavaClasses.java
index 107e420..81c9351 100644
--- a/bundled-guava/src/main/java/org/apache/iceberg/GuavaClasses.java
+++ b/bundled-guava/src/main/java/org/apache/iceberg/GuavaClasses.java
@@ -46,6 +46,7 @@ import com.google.common.collect.Streams;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
+import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
import com.google.common.primitives.Bytes;
import com.google.common.util.concurrent.MoreExecutors;
@@ -90,6 +91,7 @@ public class GuavaClasses {
MoreExecutors.class.getName();
ThreadFactoryBuilder.class.getName();
Iterables.class.getName();
+ CountingOutputStream.class.getName();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
index 2df88c0..e47eb7b 100644
--- a/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
@@ -35,6 +35,15 @@ public class PropertyUtil {
return defaultValue;
}
+ public static double propertyAsDouble(Map<String, String> properties,
+ String property, double defaultValue) {
+ String value = properties.get(property);
+ if (value != null) {
+ return Double.parseDouble(properties.get(property));
+ }
+ return defaultValue;
+ }
+
public static int propertyAsInt(Map<String, String> properties,
String property, int defaultValue) {
String value = properties.get(property);