You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/06 00:04:09 UTC
[02/27] hadoop git commit: HADOOP-11183. Memory-based
S3AOutputstream. (Thomas Demoor via stevel)
HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/15b7076a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/15b7076a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/15b7076a
Branch: refs/heads/YARN-2928
Commit: 15b7076ad5f2ae92d231140b2f8cebc392a92c87
Parents: e17e5ba
Author: Steve Loughran <st...@apache.org>
Authored: Tue Mar 3 16:18:39 2015 -0800
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Mar 3 16:18:51 2015 -0800
----------------------------------------------------------------------
hadoop-common-project/hadoop-common/CHANGES.txt | 2 +
.../src/main/resources/core-default.xml | 20 +-
.../org/apache/hadoop/fs/s3a/Constants.java | 8 +
.../hadoop/fs/s3a/S3AFastOutputStream.java | 413 +++++++++++++++++++
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 24 +-
.../src/site/markdown/tools/hadoop-aws/index.md | 46 ++-
.../hadoop/fs/s3a/TestS3AFastOutputStream.java | 74 ++++
7 files changed, 570 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-common-project/hadoop-common/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 11785f2..cb5cd4d 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -667,6 +667,8 @@ Release 2.7.0 - UNRELEASED
HADOOP-11620. Add support for load balancing across a group of KMS for HA.
(Arun Suresh via wang)
+ HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)
+
BUG FIXES
HADOOP-11512. Use getTrimmedStrings when reading serialization keys
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 80dd15b..74390d8 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -763,13 +763,13 @@ for ldap providers in the same way as above does.
<property>
<name>fs.s3a.connection.establish.timeout</name>
<value>5000</value>
- <description>Socket connection setup timeout in seconds.</description>
+ <description>Socket connection setup timeout in milliseconds.</description>
</property>
<property>
<name>fs.s3a.connection.timeout</name>
<value>50000</value>
- <description>Socket connection timeout in seconds.</description>
+ <description>Socket connection timeout in milliseconds.</description>
</property>
<property>
@@ -846,6 +846,22 @@ for ldap providers in the same way as above does.
</property>
<property>
+ <name>fs.s3a.fast.upload</name>
+ <value>false</value>
+ <description>Upload directly from memory instead of buffering to
+ disk first. Memory usage and parallelism can be controlled as up to
+ fs.s3a.multipart.size memory is consumed for each (part)upload actively
+ uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
+</property>
+
+ <property>
+ <name>fs.s3a.fast.buffer.size</name>
+ <value>1048576</value>
+ <description>Size of initial memory buffer in bytes allocated for an
+ upload. No effect if fs.s3a.fast.upload is false.</description>
+</property>
+
+<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
<description>The implementation class of the S3A Filesystem</description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 1d4f67b..e7462dc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -83,6 +83,14 @@ public class Constants {
// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
+ // should we upload directly from memory rather than using a file buffer
+ public static final String FAST_UPLOAD = "fs.s3a.fast.upload";
+ public static final boolean DEFAULT_FAST_UPLOAD = false;
+
+ //initial size of memory buffer for a fast upload
+ public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size";
+ public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB
+
// private | public-read | public-read-write | authenticated-read |
// log-delivery-write | bucket-owner-read | bucket-owner-full-control
public static final String CANNED_ACL = "fs.s3a.acl.default";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
new file mode 100644
index 0000000..a29c47b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+/**
+ * Upload files/parts asap directly from a memory buffer (instead of buffering
+ * to a file).
+ * <p/>
+ * Uploads are managed low-level rather than through the AWS TransferManager.
+ * This allows for uploading each part of a multi-part upload as soon as
+ * the bytes are in memory, rather than waiting until the file is closed.
+ * <p/>
+ * Unstable: statistics and error handling might evolve
+ */
+@InterfaceStability.Unstable
+public class S3AFastOutputStream extends OutputStream {
+
+ private static final Logger LOG = S3AFileSystem.LOG;
+ private final String key;
+ private final String bucket;
+ private final AmazonS3Client client;
+ private final int partSize;
+ private final int multiPartThreshold;
+ private final S3AFileSystem fs;
+ private final CannedAccessControlList cannedACL;
+ private final FileSystem.Statistics statistics;
+ private final String serverSideEncryptionAlgorithm;
+ private final ProgressListener progressListener;
+ private final ListeningExecutorService executorService;
+ private MultiPartUpload multiPartUpload;
+ private boolean closed;
+ private ByteArrayOutputStream buffer;
+ private int bufferLimit;
+
+
+ /**
+ * Creates a fast OutputStream that uploads to S3 from memory.
+ * For MultiPartUploads, as soon as sufficient bytes have been written to
+ * the stream a part is uploaded immediately (by using the low-level
+ * multi-part upload API on the AmazonS3Client).
+ *
+ * @param client AmazonS3Client used for S3 calls
+ * @param fs S3AFilesystem
+ * @param bucket S3 bucket name
+ * @param key S3 key name
+ * @param progress report progress in order to prevent timeouts
+ * @param statistics track FileSystem.Statistics on the performed operations
+ * @param cannedACL used CannedAccessControlList
+ * @param serverSideEncryptionAlgorithm algorithm for server side encryption
+ * @param partSize size of a single part in a multi-part upload (except
+ * last part)
+ * @param multiPartThreshold files at least this size use multi-part upload
+ * @throws IOException
+ */
+ public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
+ String bucket, String key, Progressable progress,
+ FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
+ String serverSideEncryptionAlgorithm, long partSize,
+ long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
+ throws IOException {
+ this.bucket = bucket;
+ this.key = key;
+ this.client = client;
+ this.fs = fs;
+ this.cannedACL = cannedACL;
+ this.statistics = statistics;
+ this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+ //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
+ if (partSize > Integer.MAX_VALUE) {
+ this.partSize = Integer.MAX_VALUE;
+ LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " +
+ "when using 'FAST_UPLOAD = true')");
+ } else {
+ this.partSize = (int) partSize;
+ }
+ if (multiPartThreshold > Integer.MAX_VALUE) {
+ this.multiPartThreshold = Integer.MAX_VALUE;
+ LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " +
+ "allowed size when using 'FAST_UPLOAD = true')");
+ } else {
+ this.multiPartThreshold = (int) multiPartThreshold;
+ }
+ this.bufferLimit = this.multiPartThreshold;
+ this.closed = false;
+ int initialBufferSize = this.fs.getConf()
+ .getInt(Constants.FAST_BUFFER_SIZE, Constants.DEFAULT_FAST_BUFFER_SIZE);
+ if (initialBufferSize < 0) {
+ LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " +
+ "default value");
+ initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE;
+ } else if (initialBufferSize > this.bufferLimit) {
+ LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " +
+ "exceed MIN_MULTIPART_THRESHOLD");
+ initialBufferSize = this.bufferLimit;
+ }
+ this.buffer = new ByteArrayOutputStream(initialBufferSize);
+ this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
+ this.multiPartUpload = null;
+ this.progressListener = new ProgressableListener(progress);
+ if (LOG.isDebugEnabled()){
+ LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
+ bucket, key);
+ }
+ }
+
+ /**
+ * Writes a byte to the memory buffer. If this causes the buffer to reach
+ * its limit, the actual upload is submitted to the threadpool.
+ * @param b the int of which the lowest byte is written
+ * @throws IOException
+ */
+ @Override
+ public synchronized void write(int b) throws IOException {
+ buffer.write(b);
+ if (buffer.size() == bufferLimit) {
+ uploadBuffer();
+ }
+ }
+
+ /**
+ * Writes a range of bytes from to the memory buffer. If this causes the
+ * buffer to reach its limit, the actual upload is submitted to the
+ * threadpool and the remainder of the array is written to memory
+ * (recursively).
+ * @param b byte array containing
+ * @param off offset in array where to start
+ * @param len number of bytes to be written
+ * @throws IOException
+ */
+ @Override
+ public synchronized void write(byte b[], int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ } else if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return;
+ }
+ if (buffer.size() + len < bufferLimit) {
+ buffer.write(b, off, len);
+ } else {
+ int firstPart = bufferLimit - buffer.size();
+ buffer.write(b, off, firstPart);
+ uploadBuffer();
+ this.write(b, off + firstPart, len - firstPart);
+ }
+ }
+
+ private synchronized void uploadBuffer() throws IOException {
+ if (multiPartUpload == null) {
+ multiPartUpload = initiateMultiPartUpload();
+ /* Upload the existing buffer if it exceeds partSize. This possibly
+ requires multiple parts! */
+ final byte[] allBytes = buffer.toByteArray();
+ buffer = null; //earlier gc?
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Total length of initial buffer: {}", allBytes.length);
+ }
+ int processedPos = 0;
+ while ((multiPartThreshold - processedPos) >= partSize) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initial buffer: processing from byte {} to byte {}",
+ processedPos, (processedPos + partSize - 1));
+ }
+ multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
+ processedPos, partSize), partSize);
+ processedPos += partSize;
+ }
+ //resize and reset stream
+ bufferLimit = partSize;
+ buffer = new ByteArrayOutputStream(bufferLimit);
+ buffer.write(allBytes, processedPos, multiPartThreshold - processedPos);
+ } else {
+ //upload next part
+ multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
+ .toByteArray()), partSize);
+ buffer.reset();
+ }
+ }
+
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ try {
+ if (multiPartUpload == null) {
+ putObject();
+ } else {
+ if (buffer.size() > 0) {
+ //send last part
+ multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
+ .toByteArray()), buffer.size());
+ }
+ final List<PartETag> partETags = multiPartUpload
+ .waitForAllPartUploads();
+ multiPartUpload.complete(partETags);
+ }
+ statistics.incrementWriteOps(1);
+ // This will delete unnecessary fake parent directories
+ fs.finishedWrite(key);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
+ }
+ } finally {
+ buffer = null;
+ super.close();
+ }
+ }
+
+ private ObjectMetadata createDefaultMetadata() {
+ ObjectMetadata om = new ObjectMetadata();
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+ om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+ }
+ return om;
+ }
+
+ private MultiPartUpload initiateMultiPartUpload() throws IOException {
+ final ObjectMetadata om = createDefaultMetadata();
+ final InitiateMultipartUploadRequest initiateMPURequest =
+ new InitiateMultipartUploadRequest(bucket, key, om);
+ initiateMPURequest.setCannedACL(cannedACL);
+ try {
+ return new MultiPartUpload(
+ client.initiateMultipartUpload(initiateMPURequest).getUploadId());
+ } catch (AmazonServiceException ase) {
+ throw new IOException("Unable to initiate MultiPartUpload (server side)" +
+ ": " + ase, ase);
+ } catch (AmazonClientException ace) {
+ throw new IOException("Unable to initiate MultiPartUpload (client side)" +
+ ": " + ace, ace);
+ }
+ }
+
+ private void putObject() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket,
+ key);
+ }
+ final ObjectMetadata om = createDefaultMetadata();
+ om.setContentLength(buffer.size());
+ final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+ new ByteArrayInputStream(buffer.toByteArray()), om);
+ putObjectRequest.setCannedAcl(cannedACL);
+ putObjectRequest.setGeneralProgressListener(progressListener);
+ ListenableFuture<PutObjectResult> putObjectResult =
+ executorService.submit(new Callable<PutObjectResult>() {
+ @Override
+ public PutObjectResult call() throws Exception {
+ return client.putObject(putObjectRequest);
+ }
+ });
+ //wait for completion
+ try {
+ putObjectResult.get();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted object upload:" + ie, ie);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ throw new IOException("Regular upload failed", ee.getCause());
+ }
+ }
+
+ private class MultiPartUpload {
+ private final String uploadId;
+ private final List<ListenableFuture<PartETag>> partETagsFutures;
+
+ public MultiPartUpload(String uploadId) {
+ this.uploadId = uploadId;
+ this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
+ "id '{}'", bucket, key, uploadId);
+ }
+ }
+
+ public void uploadPartAsync(ByteArrayInputStream inputStream,
+ int partSize) {
+ final int currentPartNumber = partETagsFutures.size() + 1;
+ final UploadPartRequest request =
+ new UploadPartRequest().withBucketName(bucket).withKey(key)
+ .withUploadId(uploadId).withInputStream(inputStream)
+ .withPartNumber(currentPartNumber).withPartSize(partSize);
+ request.setGeneralProgressListener(progressListener);
+ ListenableFuture<PartETag> partETagFuture =
+ executorService.submit(new Callable<PartETag>() {
+ @Override
+ public PartETag call() throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
+ uploadId);
+ }
+ return client.uploadPart(request).getPartETag();
+ }
+ });
+ partETagsFutures.add(partETagFuture);
+ }
+
+ public List<PartETag> waitForAllPartUploads() throws IOException {
+ try {
+ return Futures.allAsList(partETagsFutures).get();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted partUpload:" + ie, ie);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ //there is no way of recovering so abort
+ //cancel all partUploads
+ for (ListenableFuture<PartETag> future : partETagsFutures) {
+ future.cancel(true);
+ }
+ //abort multipartupload
+ this.abort();
+ throw new IOException("Part upload failed in multi-part upload with " +
+ "id '" +uploadId + "':" + ee, ee);
+ }
+ //should not happen?
+ return null;
+ }
+
+ public void complete(List<PartETag> partETags) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
+ uploadId);
+ }
+ final CompleteMultipartUploadRequest completeRequest =
+ new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
+ client.completeMultipartUpload(completeRequest);
+
+ }
+
+ public void abort() {
+ LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
+ try {
+ client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
+ key, uploadId));
+ } catch (Exception e2) {
+ LOG.warn("Unable to abort multipart upload, you may need to purge " +
+ "uploaded parts: " + e2, e2);
+ }
+ }
+ }
+
+ private static class ProgressableListener implements ProgressListener {
+ private final Progressable progress;
+
+ public ProgressableListener(Progressable progress) {
+ this.progress = progress;
+ }
+
+ public void progressChanged(ProgressEvent progressEvent) {
+ if (progress != null) {
+ progress.progress();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 2373e7e..1a30d6f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -88,7 +88,8 @@ public class S3AFileSystem extends FileSystem {
private int maxKeys;
private long partSize;
private TransferManager transfers;
- private int partSizeThreshold;
+ private ThreadPoolExecutor threadPoolExecutor;
+ private int multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
@@ -237,7 +238,7 @@ public class S3AFileSystem extends FileSystem {
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
- partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
+ multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
if (partSize < 5 * 1024 * 1024) {
@@ -245,9 +246,9 @@ public class S3AFileSystem extends FileSystem {
partSize = 5 * 1024 * 1024;
}
- if (partSizeThreshold < 5 * 1024 * 1024) {
+ if (multiPartThreshold < 5 * 1024 * 1024) {
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
- partSizeThreshold = 5 * 1024 * 1024;
+ multiPartThreshold = 5 * 1024 * 1024;
}
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
@@ -262,20 +263,20 @@ public class S3AFileSystem extends FileSystem {
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(maxThreads *
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+ threadPoolExecutor = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
newDaemonThreadFactory("s3a-transfer-shared-"));
- tpe.allowCoreThreadTimeOut(true);
+ threadPoolExecutor.allowCoreThreadTimeOut(true);
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
- transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+ transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
- transfers = new TransferManager(s3, tpe);
+ transfers = new TransferManager(s3, threadPoolExecutor);
transfers.setConfiguration(transferConfiguration);
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
@@ -391,7 +392,12 @@ public class S3AFileSystem extends FileSystem {
if (!overwrite && exists(f)) {
throw new FileAlreadyExistsException(f + " already exists");
}
-
+ if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
+ return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
+ key, progress, statistics, cannedACL,
+ serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold,
+ threadPoolExecutor), statistics);
+ }
// We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
bucket, key, progress, cannedACL, statistics,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 8e80b92..bf62634 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -213,13 +213,13 @@ If you do any of these: change your credentials immediately!
<property>
<name>fs.s3a.connection.establish.timeout</name>
<value>5000</value>
- <description>Socket connection setup timeout in seconds.</description>
+ <description>Socket connection setup timeout in milliseconds.</description>
</property>
<property>
<name>fs.s3a.connection.timeout</name>
<value>50000</value>
- <description>Socket connection timeout in seconds.</description>
+ <description>Socket connection timeout in milliseconds.</description>
</property>
<property>
@@ -292,7 +292,7 @@ If you do any of these: change your credentials immediately!
<name>fs.s3a.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3a</value>
<description>Comma separated list of directories that will be used to buffer file
- uploads to.</description>
+ uploads to. No effect if fs.s3a.fast.upload is true.</description>
</property>
<property>
@@ -301,6 +301,40 @@ If you do any of these: change your credentials immediately!
<description>The implementation class of the S3A Filesystem</description>
</property>
+### S3AFastOutputStream
+ **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
+
+ <property>
+ <name>fs.s3a.fast.upload</name>
+ <value>false</value>
+ <description>Upload directly from memory instead of buffering to
+ disk first. Memory usage and parallelism can be controlled as up to
+ fs.s3a.multipart.size memory is consumed for each (part)upload actively
+ uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
+ </property>
+
+ <property>
+ <name>fs.s3a.fast.buffer.size</name>
+ <value>1048576</value>
+ <description>Size (in bytes) of initial memory buffer allocated for an
+ upload. No effect if fs.s3a.fast.upload is false.</description>
+ </property>
+
+Writes are buffered in memory instead of to a file on local disk. This
+removes the throughput bottleneck of the local disk write and read cycle
+before starting the actual upload. Furthermore, it allows handling files that
+are larger than the remaining local disk space.
+
+However, non-trivial memory tuning is needed for optimal results and careless
+settings could cause memory overflow. Up to `fs.s3a.threads.max` parallel
+(part)uploads are active. Furthermore, up to `fs.s3a.max.total.tasks`
+additional part(uploads) can be waiting (and thus memory buffers are created).
+The memory buffer is uploaded as a single upload if it is not larger than
+`fs.s3a.multipart.threshold`. Else, a multi-part upload is initiatated and
+parts of size `fs.s3a.multipart.size` are used to protect against overflowing
+the available memory. These settings should be tuned to the envisioned
+workflow (some large files, many small ones, ...) and the physical
+limitations of the machine and cluster (memory, network bandwidth).
## Testing the S3 filesystem clients
@@ -334,7 +368,7 @@ each filesystem for its testing.
The contents of each bucket will be destroyed during the test process:
do not use the bucket for any purpose other than testing. Furthermore, for
s3a, all in-progress multi-part uploads to the bucket will be aborted at the
-start of a test (by forcing fs.s3a.multipart.purge=true) to clean up the
+start of a test (by forcing `fs.s3a.multipart.purge=true`) to clean up the
temporary state of previously failed tests.
Example:
@@ -392,14 +426,14 @@ Example:
## File `contract-test-options.xml`
The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
-must be created and configured for the test fileystems.
+must be created and configured for the test filesystems.
If a specific file `fs.contract.test.fs.*` test path is not defined for
any of the filesystems, those tests will be skipped.
The standard S3 authentication details must also be provided. This can be
through copy-and-paste of the `auth-keys.xml` credentials, or it can be
-through direct XInclude inclustion.
+through direct XInclude inclusion.
#### s3://
http://git-wip-us.apache.org/repos/asf/hadoop/blob/15b7076a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java
new file mode 100644
index 0000000..e507cf6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Tests regular and multi-part upload functionality for S3AFastOutputStream.
+ * File sizes are kept small to reduce test duration on slow connections
+ */
+public class TestS3AFastOutputStream {
+ private FileSystem fs;
+
+
+ @Rule
+ public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+ conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+ conf.setBoolean(Constants.FAST_UPLOAD, true);
+ fs = S3ATestUtils.createTestFileSystem(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.delete(getTestPath(), true);
+ }
+ }
+
+ protected Path getTestPath() {
+ return new Path("/tests3a");
+ }
+
+ @Test
+ public void testRegularUpload() throws IOException {
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
+ }
+
+ @Test
+ public void testMultiPartUpload() throws IOException {
+ ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 *
+ 1024);
+ }
+}