You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sj...@apache.org on 2016/10/18 23:45:33 UTC
[47/50] [abbrv] hadoop git commit: HADOOP-13560. S3ABlockOutputStream
to support huge (many GB) file writes. Contributed by Steve Loughran
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
deleted file mode 100644
index c25d0fb..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.event.ProgressEvent;
-import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PartETag;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.model.UploadPartRequest;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.util.Progressable;
-import org.slf4j.Logger;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-import static org.apache.hadoop.fs.s3a.Statistic.*;
-
-/**
- * Upload files/parts asap directly from a memory buffer (instead of buffering
- * to a file).
- * <p>
- * Uploads are managed low-level rather than through the AWS TransferManager.
- * This allows for uploading each part of a multi-part upload as soon as
- * the bytes are in memory, rather than waiting until the file is closed.
- * <p>
- * Unstable: statistics and error handling might evolve
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class S3AFastOutputStream extends OutputStream {
-
- private static final Logger LOG = S3AFileSystem.LOG;
- private final String key;
- private final String bucket;
- private final AmazonS3 client;
- private final int partSize;
- private final int multiPartThreshold;
- private final S3AFileSystem fs;
- private final CannedAccessControlList cannedACL;
- private final ProgressListener progressListener;
- private final ListeningExecutorService executorService;
- private MultiPartUpload multiPartUpload;
- private boolean closed;
- private ByteArrayOutputStream buffer;
- private int bufferLimit;
-
-
- /**
- * Creates a fast OutputStream that uploads to S3 from memory.
- * For MultiPartUploads, as soon as sufficient bytes have been written to
- * the stream a part is uploaded immediately (by using the low-level
- * multi-part upload API on the AmazonS3Client).
- *
- * @param client AmazonS3Client used for S3 calls
- * @param fs S3AFilesystem
- * @param bucket S3 bucket name
- * @param key S3 key name
- * @param progress report progress in order to prevent timeouts
- * @param cannedACL used CannedAccessControlList
- * @param partSize size of a single part in a multi-part upload (except
- * last part)
- * @param multiPartThreshold files at least this size use multi-part upload
- * @param threadPoolExecutor thread factory
- * @throws IOException on any problem
- */
- public S3AFastOutputStream(AmazonS3 client,
- S3AFileSystem fs,
- String bucket,
- String key,
- Progressable progress,
- CannedAccessControlList cannedACL,
- long partSize,
- long multiPartThreshold,
- ExecutorService threadPoolExecutor)
- throws IOException {
- this.bucket = bucket;
- this.key = key;
- this.client = client;
- this.fs = fs;
- this.cannedACL = cannedACL;
- //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
- if (partSize > Integer.MAX_VALUE) {
- this.partSize = Integer.MAX_VALUE;
- LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " +
- "when using 'FAST_UPLOAD = true')");
- } else {
- this.partSize = (int) partSize;
- }
- if (multiPartThreshold > Integer.MAX_VALUE) {
- this.multiPartThreshold = Integer.MAX_VALUE;
- LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " +
- "allowed size when using 'FAST_UPLOAD = true')");
- } else {
- this.multiPartThreshold = (int) multiPartThreshold;
- }
- this.bufferLimit = this.multiPartThreshold;
- this.closed = false;
- int initialBufferSize = this.fs.getConf()
- .getInt(Constants.FAST_BUFFER_SIZE, Constants.DEFAULT_FAST_BUFFER_SIZE);
- if (initialBufferSize < 0) {
- LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " +
- "default value");
- initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE;
- } else if (initialBufferSize > this.bufferLimit) {
- LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " +
- "exceed MIN_MULTIPART_THRESHOLD");
- initialBufferSize = this.bufferLimit;
- }
- this.buffer = new ByteArrayOutputStream(initialBufferSize);
- this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
- this.multiPartUpload = null;
- this.progressListener = new ProgressableListener(progress);
- LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
- bucket, key);
- }
-
- /**
- * Writes a byte to the memory buffer. If this causes the buffer to reach
- * its limit, the actual upload is submitted to the threadpool.
- * @param b the int of which the lowest byte is written
- * @throws IOException on any problem
- */
- @Override
- public synchronized void write(int b) throws IOException {
- buffer.write(b);
- if (buffer.size() == bufferLimit) {
- uploadBuffer();
- }
- }
-
- /**
- * Writes a range of bytes from to the memory buffer. If this causes the
- * buffer to reach its limit, the actual upload is submitted to the
- * threadpool and the remainder of the array is written to memory
- * (recursively).
- * @param b byte array containing
- * @param off offset in array where to start
- * @param len number of bytes to be written
- * @throws IOException on any problem
- */
- @Override
- public synchronized void write(byte[] b, int off, int len)
- throws IOException {
- if (b == null) {
- throw new NullPointerException();
- } else if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return;
- }
- if (buffer.size() + len < bufferLimit) {
- buffer.write(b, off, len);
- } else {
- int firstPart = bufferLimit - buffer.size();
- buffer.write(b, off, firstPart);
- uploadBuffer();
- this.write(b, off + firstPart, len - firstPart);
- }
- }
-
- private synchronized void uploadBuffer() throws IOException {
- if (multiPartUpload == null) {
- multiPartUpload = initiateMultiPartUpload();
- /* Upload the existing buffer if it exceeds partSize. This possibly
- requires multiple parts! */
- final byte[] allBytes = buffer.toByteArray();
- buffer = null; //earlier gc?
- LOG.debug("Total length of initial buffer: {}", allBytes.length);
- int processedPos = 0;
- while ((multiPartThreshold - processedPos) >= partSize) {
- LOG.debug("Initial buffer: processing from byte {} to byte {}",
- processedPos, (processedPos + partSize - 1));
- multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
- processedPos, partSize), partSize);
- processedPos += partSize;
- }
- //resize and reset stream
- bufferLimit = partSize;
- buffer = new ByteArrayOutputStream(bufferLimit);
- buffer.write(allBytes, processedPos, multiPartThreshold - processedPos);
- } else {
- //upload next part
- multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
- .toByteArray()), partSize);
- buffer.reset();
- }
- }
-
- /**
- * Close the stream. This will not return until the upload is complete
- * or the attempt to perform the upload has failed.
- * Exceptions raised in this method are indicative that the write has
- * failed and data is at risk of being lost.
- * @throws IOException on any failure.
- */
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
- closed = true;
- try {
- if (multiPartUpload == null) {
- putObject();
- } else {
- int size = buffer.size();
- if (size > 0) {
- fs.incrementPutStartStatistics(size);
- //send last part
- multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
- .toByteArray()), size);
- }
- final List<PartETag> partETags = multiPartUpload
- .waitForAllPartUploads();
- multiPartUpload.complete(partETags);
- }
- // This will delete unnecessary fake parent directories
- fs.finishedWrite(key);
- LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
- } finally {
- buffer = null;
- super.close();
- }
- }
-
- /**
- * Create the default metadata for a multipart upload operation.
- * @return the metadata to use/extend.
- */
- private ObjectMetadata createDefaultMetadata() {
- return fs.newObjectMetadata();
- }
-
- private MultiPartUpload initiateMultiPartUpload() throws IOException {
- final InitiateMultipartUploadRequest initiateMPURequest =
- new InitiateMultipartUploadRequest(bucket,
- key,
- createDefaultMetadata());
- initiateMPURequest.setCannedACL(cannedACL);
- try {
- return new MultiPartUpload(
- client.initiateMultipartUpload(initiateMPURequest).getUploadId());
- } catch (AmazonClientException ace) {
- throw translateException("initiate MultiPartUpload", key, ace);
- }
- }
-
- private void putObject() throws IOException {
- LOG.debug("Executing regular upload for bucket '{}' key '{}'",
- bucket, key);
- final ObjectMetadata om = createDefaultMetadata();
- final int size = buffer.size();
- om.setContentLength(size);
- final PutObjectRequest putObjectRequest =
- fs.newPutObjectRequest(key,
- om,
- new ByteArrayInputStream(buffer.toByteArray()));
- putObjectRequest.setGeneralProgressListener(progressListener);
- ListenableFuture<PutObjectResult> putObjectResult =
- executorService.submit(new Callable<PutObjectResult>() {
- @Override
- public PutObjectResult call() throws Exception {
- fs.incrementPutStartStatistics(size);
- return client.putObject(putObjectRequest);
- }
- });
- //wait for completion
- try {
- putObjectResult.get();
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted object upload: {}", ie, ie);
- Thread.currentThread().interrupt();
- } catch (ExecutionException ee) {
- throw extractException("regular upload", key, ee);
- }
- }
-
-
- private class MultiPartUpload {
- private final String uploadId;
- private final List<ListenableFuture<PartETag>> partETagsFutures;
-
- public MultiPartUpload(String uploadId) {
- this.uploadId = uploadId;
- this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
- LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
- "id '{}'", bucket, key, uploadId);
- }
-
- private void uploadPartAsync(ByteArrayInputStream inputStream,
- int partSize) {
- final int currentPartNumber = partETagsFutures.size() + 1;
- final UploadPartRequest request =
- new UploadPartRequest().withBucketName(bucket).withKey(key)
- .withUploadId(uploadId).withInputStream(inputStream)
- .withPartNumber(currentPartNumber).withPartSize(partSize);
- request.setGeneralProgressListener(progressListener);
- ListenableFuture<PartETag> partETagFuture =
- executorService.submit(new Callable<PartETag>() {
- @Override
- public PartETag call() throws Exception {
- LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
- uploadId);
- return fs.uploadPart(request).getPartETag();
- }
- });
- partETagsFutures.add(partETagFuture);
- }
-
- private List<PartETag> waitForAllPartUploads() throws IOException {
- try {
- return Futures.allAsList(partETagsFutures).get();
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted partUpload: {}", ie, ie);
- Thread.currentThread().interrupt();
- return null;
- } catch (ExecutionException ee) {
- //there is no way of recovering so abort
- //cancel all partUploads
- for (ListenableFuture<PartETag> future : partETagsFutures) {
- future.cancel(true);
- }
- //abort multipartupload
- this.abort();
- throw extractException("Multi-part upload with id '" + uploadId + "'",
- key, ee);
- }
- }
-
- private void complete(List<PartETag> partETags) throws IOException {
- try {
- LOG.debug("Completing multi-part upload for key '{}', id '{}'",
- key, uploadId);
- client.completeMultipartUpload(
- new CompleteMultipartUploadRequest(bucket,
- key,
- uploadId,
- partETags));
- } catch (AmazonClientException e) {
- throw translateException("Completing multi-part upload", key, e);
- }
- }
-
- public void abort() {
- LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
- try {
- fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
- client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
- key, uploadId));
- } catch (Exception e2) {
- LOG.warn("Unable to abort multipart upload, you may need to purge " +
- "uploaded parts: {}", e2, e2);
- }
- }
- }
-
- private static class ProgressableListener implements ProgressListener {
- private final Progressable progress;
-
- public ProgressableListener(Progressable progress) {
- this.progress = progress;
- }
-
- public void progressChanged(ProgressEvent progressEvent) {
- if (progress != null) {
- progress.progress();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 85d1fc7..2354819 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -37,14 +37,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.CopyObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
@@ -55,6 +61,8 @@ import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.event.ProgressEvent;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -68,6 +76,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@@ -118,9 +127,12 @@ public class S3AFileSystem extends FileSystem {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
- private ExecutorService threadPoolExecutor;
+ private ListeningExecutorService threadPoolExecutor;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
+ private static final Logger PROGRESS =
+ LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
+ private LocalDirAllocator directoryAllocator;
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private S3AInstrumentation instrumentation;
@@ -131,6 +143,10 @@ public class S3AFileSystem extends FileSystem {
// The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000;
+ private boolean blockUploadEnabled;
+ private String blockOutputBuffer;
+ private S3ADataBlocks.BlockFactory blockFactory;
+ private int blockOutputActiveBlocks;
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
@@ -157,18 +173,11 @@ public class S3AFileSystem extends FileSystem {
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
listing = new Listing(this);
- partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
- if (partSize < 5 * 1024 * 1024) {
- LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
- partSize = 5 * 1024 * 1024;
- }
+ partSize = getMultipartSizeProperty(conf,
+ MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+ multiPartThreshold = getMultipartSizeProperty(conf,
+ MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
- multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
- DEFAULT_MIN_MULTIPART_THRESHOLD);
- if (multiPartThreshold < 5 * 1024 * 1024) {
- LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
- multiPartThreshold = 5 * 1024 * 1024;
- }
//check but do not store the block size
longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
@@ -189,14 +198,14 @@ public class S3AFileSystem extends FileSystem {
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
maxThreads = 2;
}
- int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
- if (totalTasks < 1) {
- LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
- totalTasks = 1;
- }
- long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
- threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
- maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+ int totalTasks = intOption(conf,
+ MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
+ long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
+ DEFAULT_KEEPALIVE_TIME, 0);
+ threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+ maxThreads,
+ maxThreads + totalTasks,
+ keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
initTransferManager();
@@ -209,8 +218,25 @@ public class S3AFileSystem extends FileSystem {
serverSideEncryptionAlgorithm =
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+ LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
+
+ blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD);
+
+ if (blockUploadEnabled) {
+ blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
+ DEFAULT_FAST_UPLOAD_BUFFER);
+ partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
+ blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
+ blockOutputActiveBlocks = intOption(conf,
+ FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
+ LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
+ " queue limit={}",
+ blockOutputBuffer, partSize, blockOutputActiveBlocks);
+ } else {
+ LOG.debug("Using S3AOutputStream");
+ }
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
@@ -337,6 +363,33 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Demand create the directory allocator, then create a temporary file.
+ * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
+ * @param pathStr prefix for the temporary file
+ * @param size the size of the file that is going to be written
+ * @param conf the Configuration object
+ * @return a unique temporary file
+ * @throws IOException IO problems
+ */
+ synchronized File createTmpFileForWrite(String pathStr, long size,
+ Configuration conf) throws IOException {
+ if (directoryAllocator == null) {
+ String bufferDir = conf.get(BUFFER_DIR) != null
+ ? BUFFER_DIR : "hadoop.tmp.dir";
+ directoryAllocator = new LocalDirAllocator(bufferDir);
+ }
+ return directoryAllocator.createTmpFileForWrite(pathStr, size, conf);
+ }
+
+ /**
+ * Get the bucket of this filesystem.
+ * @return the bucket
+ */
+ public String getBucket() {
+ return bucket;
+ }
+
+ /**
* Change the input policy for this FS.
* @param inputPolicy new policy
*/
@@ -460,6 +513,7 @@ public class S3AFileSystem extends FileSystem {
* @see #setPermission(Path, FsPermission)
*/
@Override
+ @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
@@ -484,28 +538,33 @@ public class S3AFileSystem extends FileSystem {
}
instrumentation.fileCreated();
- if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
- return new FSDataOutputStream(
- new S3AFastOutputStream(s3,
- this,
- bucket,
+ FSDataOutputStream output;
+ if (blockUploadEnabled) {
+ output = new FSDataOutputStream(
+ new S3ABlockOutputStream(this,
key,
+ new SemaphoredDelegatingExecutor(threadPoolExecutor,
+ blockOutputActiveBlocks, true),
progress,
- cannedACL,
partSize,
- multiPartThreshold,
- threadPoolExecutor),
- statistics);
+ blockFactory,
+ instrumentation.newOutputStreamStatistics(),
+ new WriteOperationHelper(key)
+ ),
+ null);
+ } else {
+
+ // We pass null to FSDataOutputStream so it won't count writes that
+ // are being buffered to a file
+ output = new FSDataOutputStream(
+ new S3AOutputStream(getConf(),
+ this,
+ key,
+ progress
+ ),
+ null);
}
- // We pass null to FSDataOutputStream so it won't count writes that
- // are being buffered to a file
- return new FSDataOutputStream(
- new S3AOutputStream(getConf(),
- this,
- key,
- progress
- ),
- null);
+ return output;
}
/**
@@ -750,6 +809,33 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Decrement a gauge by a specific value.
+ * @param statistic The operation to decrement
+ * @param count the count to decrement
+ */
+ protected void decrementGauge(Statistic statistic, long count) {
+ instrumentation.decrementGauge(statistic, count);
+ }
+
+ /**
+ * Increment a gauge by a specific value.
+ * @param statistic The operation to increment
+ * @param count the count to increment
+ */
+ protected void incrementGauge(Statistic statistic, long count) {
+ instrumentation.incrementGauge(statistic, count);
+ }
+
+ /**
+ * Get the storage statistics of this filesystem.
+ * @return the storage statistics
+ */
+ @Override
+ public S3AStorageStatistics getStorageStatistics() {
+ return storageStatistics;
+ }
+
+ /**
* Request object metadata; increments counters in the process.
* @param key key
* @return the metadata
@@ -896,7 +982,9 @@ public class S3AFileSystem extends FileSystem {
*/
public ObjectMetadata newObjectMetadata(long length) {
final ObjectMetadata om = newObjectMetadata();
- om.setContentLength(length);
+ if (length >= 0) {
+ om.setContentLength(length);
+ }
return om;
}
@@ -918,7 +1006,41 @@ public class S3AFileSystem extends FileSystem {
len = putObjectRequest.getMetadata().getContentLength();
}
incrementPutStartStatistics(len);
- return transfers.upload(putObjectRequest);
+ try {
+ Upload upload = transfers.upload(putObjectRequest);
+ incrementPutCompletedStatistics(true, len);
+ return upload;
+ } catch (AmazonClientException e) {
+ incrementPutCompletedStatistics(false, len);
+ throw e;
+ }
+ }
+
+ /**
+ * PUT an object directly (i.e. not via the transfer manager).
+ * Byte length is calculated from the file length, or, if there is no
+ * file, from the content length of the header.
+ * @param putObjectRequest the request
+ * @return the upload initiated
+ * @throws AmazonClientException on problems
+ */
+ public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
+ throws AmazonClientException {
+ long len;
+ if (putObjectRequest.getFile() != null) {
+ len = putObjectRequest.getFile().length();
+ } else {
+ len = putObjectRequest.getMetadata().getContentLength();
+ }
+ incrementPutStartStatistics(len);
+ try {
+ PutObjectResult result = s3.putObject(putObjectRequest);
+ incrementPutCompletedStatistics(true, len);
+ return result;
+ } catch (AmazonClientException e) {
+ incrementPutCompletedStatistics(false, len);
+ throw e;
+ }
}
/**
@@ -926,10 +1048,20 @@ public class S3AFileSystem extends FileSystem {
* Increments the write and put counters
* @param request request
* @return the result of the operation.
+ * @throws AmazonClientException on problems
*/
- public UploadPartResult uploadPart(UploadPartRequest request) {
- incrementPutStartStatistics(request.getPartSize());
- return s3.uploadPart(request);
+ public UploadPartResult uploadPart(UploadPartRequest request)
+ throws AmazonClientException {
+ long len = request.getPartSize();
+ incrementPutStartStatistics(len);
+ try {
+ UploadPartResult uploadPartResult = s3.uploadPart(request);
+ incrementPutCompletedStatistics(true, len);
+ return uploadPartResult;
+ } catch (AmazonClientException e) {
+ incrementPutCompletedStatistics(false, len);
+ throw e;
+ }
}
/**
@@ -942,9 +1074,28 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("PUT start {} bytes", bytes);
incrementWriteOperations();
incrementStatistic(OBJECT_PUT_REQUESTS);
+ incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
+ if (bytes > 0) {
+ incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
+ }
+ }
+
+ /**
+ * At the end of a put/multipart upload operation, update the
+ * relevant counters and gauges.
+ *
+ * @param success did the operation succeed?
+ * @param bytes bytes in the request.
+ */
+ public void incrementPutCompletedStatistics(boolean success, long bytes) {
+ LOG.debug("PUT completed success={}; {} bytes", success, bytes);
+ incrementWriteOperations();
if (bytes > 0) {
incrementStatistic(OBJECT_PUT_BYTES, bytes);
+ decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
}
+ incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED);
+ decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
}
/**
@@ -955,7 +1106,7 @@ public class S3AFileSystem extends FileSystem {
* @param bytes bytes successfully uploaded.
*/
public void incrementPutProgressStatistics(String key, long bytes) {
- LOG.debug("PUT {}: {} bytes", key, bytes);
+ PROGRESS.debug("PUT {}: {} bytes", key, bytes);
incrementWriteOperations();
if (bytes > 0) {
statistics.incrementBytesWritten(bytes);
@@ -1475,7 +1626,7 @@ public class S3AFileSystem extends FileSystem {
LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src);
- final ObjectMetadata om = newObjectMetadata();
+ final ObjectMetadata om = newObjectMetadata(srcfile.length());
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
Upload up = putObject(putObjectRequest);
ProgressableProgressListener listener = new ProgressableProgressListener(
@@ -1743,6 +1894,10 @@ public class S3AFileSystem extends FileSystem {
.append(serverSideEncryptionAlgorithm)
.append('\'');
}
+ if (blockFactory != null) {
+ sb.append(", blockFactory=").append(blockFactory);
+ }
+ sb.append(", executor=").append(threadPoolExecutor);
sb.append(", statistics {")
.append(statistics)
.append("}");
@@ -1950,4 +2105,163 @@ public class S3AFileSystem extends FileSystem {
getFileBlockLocations(status, 0, status.getLen())
: null);
}
+
+ /**
+ * Helper for an ongoing write operation.
+ * <p>
+ * It hides direct access to the S3 API from the output stream,
+ * and is a location where the object upload process can be evolved/enhanced.
+ * <p>
+ * Features
+ * <ul>
+ * <li>Methods to create and submit requests to S3, so avoiding
+ * all direct interaction with the AWS APIs.</li>
+ * <li>Some extra preflight checks of arguments, so failing fast on
+ * errors.</li>
+ * <li>Callbacks to let the FS know of events in the output stream
+ * upload process.</li>
+ * </ul>
+ *
+ * Each instance of this state is unique to a single output stream.
+ */
+ final class WriteOperationHelper {
+ private final String key;
+
+ private WriteOperationHelper(String key) {
+ this.key = key;
+ }
+
+ /**
+ * Create a {@link PutObjectRequest} request.
+ * The metadata is assumed to have been configured with the size of the
+ * operation.
+ * @param inputStream source data.
+ * @param length size, if known. Use -1 for not known
+ * @return the request
+ */
+ PutObjectRequest newPutRequest(InputStream inputStream, long length) {
+ return newPutObjectRequest(key, newObjectMetadata(length), inputStream);
+ }
+
+ /**
+ * Callback on a successful write.
+ */
+ void writeSuccessful() {
+ finishedWrite(key);
+ }
+
+ /**
+ * Callback on a write failure.
+ * @param e Any exception raised which triggered the failure.
+ */
+ void writeFailed(Exception e) {
+ LOG.debug("Write to {} failed", this, e);
+ }
+
+ /**
+ * Create a new object metadata instance.
+ * Any standard metadata headers are added here, for example:
+ * encryption.
+ * @param length size, if known. Use -1 for not known
+ * @return a new metadata instance
+ */
+ public ObjectMetadata newObjectMetadata(long length) {
+ return S3AFileSystem.this.newObjectMetadata(length);
+ }
+
+ /**
+ * Start the multipart upload process.
+ * @return the upload result containing the ID
+ * @throws IOException IO problem
+ */
+ String initiateMultiPartUpload() throws IOException {
+ LOG.debug("Initiating Multipart upload");
+ final InitiateMultipartUploadRequest initiateMPURequest =
+ new InitiateMultipartUploadRequest(bucket,
+ key,
+ newObjectMetadata(-1));
+ initiateMPURequest.setCannedACL(cannedACL);
+ try {
+ return s3.initiateMultipartUpload(initiateMPURequest)
+ .getUploadId();
+ } catch (AmazonClientException ace) {
+ throw translateException("initiate MultiPartUpload", key, ace);
+ }
+ }
+
+ /**
+ * Complete a multipart upload operation.
+ * @param uploadId multipart operation Id
+ * @param partETags list of partial uploads
+ * @return the result
+ * @throws AmazonClientException on problems.
+ */
+ CompleteMultipartUploadResult completeMultipartUpload(String uploadId,
+ List<PartETag> partETags) throws AmazonClientException {
+ Preconditions.checkNotNull(uploadId);
+ Preconditions.checkNotNull(partETags);
+ Preconditions.checkArgument(!partETags.isEmpty(),
+ "No partitions have been uploaded");
+ return s3.completeMultipartUpload(
+ new CompleteMultipartUploadRequest(bucket,
+ key,
+ uploadId,
+ partETags));
+ }
+
+ /**
+ * Abort a multipart upload operation.
+ * @param uploadId multipart operation Id
+ * @return the result
+ * @throws AmazonClientException on problems.
+ */
+ void abortMultipartUpload(String uploadId) throws AmazonClientException {
+ s3.abortMultipartUpload(
+ new AbortMultipartUploadRequest(bucket, key, uploadId));
+ }
+
+ /**
+ * Create and initialize a part request of a multipart upload.
+ * @param uploadId ID of ongoing upload
+ * @param uploadStream source of data to upload
+ * @param partNumber current part number of the upload
+ * @param size amount of data
+ * @return the request.
+ */
+ UploadPartRequest newUploadPartRequest(String uploadId,
+ InputStream uploadStream,
+ int partNumber,
+ int size) {
+ Preconditions.checkNotNull(uploadId);
+ Preconditions.checkNotNull(uploadStream);
+ Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
+ Preconditions.checkArgument(partNumber> 0 && partNumber <=10000,
+ "partNumber must be between 1 and 10000 inclusive, but is %s",
+ partNumber);
+
+ LOG.debug("Creating part upload request for {} #{} size {}",
+ uploadId, partNumber, size);
+ return new UploadPartRequest()
+ .withBucketName(bucket)
+ .withKey(key)
+ .withUploadId(uploadId)
+ .withInputStream(uploadStream)
+ .withPartNumber(partNumber)
+ .withPartSize(size);
+ }
+
+ /**
+ * The toString method is intended to be used in logging/toString calls.
+ * @return a string description.
+ */
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "{bucket=").append(bucket);
+ sb.append(", key='").append(key).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 26b5b51..963c53f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.fs.s3a;
-import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricStringBuilder;
@@ -29,10 +31,12 @@ import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
+import java.io.Closeable;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -50,6 +54,9 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ S3AInstrumentation.class);
+
public static final String CONTEXT = "S3AFileSystem";
private final MetricsRegistry registry =
new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
@@ -100,7 +107,23 @@ public class S3AInstrumentation {
OBJECT_METADATA_REQUESTS,
OBJECT_MULTIPART_UPLOAD_ABORTED,
OBJECT_PUT_BYTES,
- OBJECT_PUT_REQUESTS
+ OBJECT_PUT_REQUESTS,
+ OBJECT_PUT_REQUESTS_COMPLETED,
+ STREAM_WRITE_FAILURES,
+ STREAM_WRITE_BLOCK_UPLOADS,
+ STREAM_WRITE_BLOCK_UPLOADS_COMMITTED,
+ STREAM_WRITE_BLOCK_UPLOADS_ABORTED,
+ STREAM_WRITE_TOTAL_TIME,
+ STREAM_WRITE_TOTAL_DATA,
+ };
+
+
+ private static final Statistic[] GAUGES_TO_CREATE = {
+ OBJECT_PUT_REQUESTS_ACTIVE,
+ OBJECT_PUT_BYTES_PENDING,
+ STREAM_WRITE_BLOCK_UPLOADS_ACTIVE,
+ STREAM_WRITE_BLOCK_UPLOADS_PENDING,
+ STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING,
};
public S3AInstrumentation(URI name) {
@@ -143,6 +166,9 @@ public class S3AInstrumentation {
for (Statistic statistic : COUNTERS_TO_CREATE) {
counter(statistic);
}
+ for (Statistic statistic : GAUGES_TO_CREATE) {
+ gauge(statistic.getSymbol(), statistic.getDescription());
+ }
}
/**
@@ -254,13 +280,13 @@ public class S3AInstrumentation {
* Lookup a counter by name. Return null if it is not known.
* @param name counter name
* @return the counter
+ * @throws IllegalStateException if the metric is not a counter
*/
private MutableCounterLong lookupCounter(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
return null;
}
- Preconditions.checkNotNull(metric, "not found: " + name);
if (!(metric instanceof MutableCounterLong)) {
throw new IllegalStateException("Metric " + name
+ " is not a MutableCounterLong: " + metric);
@@ -269,6 +295,20 @@ public class S3AInstrumentation {
}
/**
+ * Look up a gauge.
+ * @param name gauge name
+ * @return the gauge or null
+ * @throws ClassCastException if the metric is not a Gauge.
+ */
+ public MutableGaugeLong lookupGauge(String name) {
+ MutableMetric metric = lookupMetric(name);
+ if (metric == null) {
+ LOG.debug("No gauge {}", name);
+ }
+ return (MutableGaugeLong) metric;
+ }
+
+ /**
* Look up a metric from both the registered set and the lighter weight
* stream entries.
* @param name metric name
@@ -349,6 +389,47 @@ public class S3AInstrumentation {
counter.incr(count);
}
}
+ /**
+ * Increment a specific counter.
+ * No-op if not defined.
+ * @param op operation
+ * @param count atomic long containing value
+ */
+ public void incrementCounter(Statistic op, AtomicLong count) {
+ incrementCounter(op, count.get());
+ }
+
+ /**
+ * Increment a specific gauge.
+ * No-op if not defined.
+ * @param op operation
+ * @param count increment value
+ * @throws ClassCastException if the metric is of the wrong type
+ */
+ public void incrementGauge(Statistic op, long count) {
+ MutableGaugeLong gauge = lookupGauge(op.getSymbol());
+ if (gauge != null) {
+ gauge.incr(count);
+ } else {
+ LOG.debug("No Gauge: "+ op);
+ }
+ }
+
+ /**
+ * Decrement a specific gauge.
+ * No-op if not defined.
+ * @param op operation
+ * @param count increment value
+ * @throws ClassCastException if the metric is of the wrong type
+ */
+ public void decrementGauge(Statistic op, long count) {
+ MutableGaugeLong gauge = lookupGauge(op.getSymbol());
+ if (gauge != null) {
+ gauge.decr(count);
+ } else {
+ LOG.debug("No Gauge: " + op);
+ }
+ }
/**
* Create a stream input statistics instance.
@@ -553,4 +634,165 @@ public class S3AInstrumentation {
return sb.toString();
}
}
+
+ /**
+ * Create a stream output statistics instance.
+ * @return the new instance
+ */
+
+ OutputStreamStatistics newOutputStreamStatistics() {
+ return new OutputStreamStatistics();
+ }
+
+ /**
+ * Merge in the statistics of a single output stream into
+ * the filesystem-wide statistics.
+ * @param statistics stream statistics
+ */
+ private void mergeOutputStreamStatistics(OutputStreamStatistics statistics) {
+ incrementCounter(STREAM_WRITE_TOTAL_TIME, statistics.totalUploadDuration());
+ incrementCounter(STREAM_WRITE_QUEUE_DURATION, statistics.queueDuration);
+ incrementCounter(STREAM_WRITE_TOTAL_DATA, statistics.bytesUploaded);
+ incrementCounter(STREAM_WRITE_BLOCK_UPLOADS,
+ statistics.blockUploadsCompleted);
+ }
+
+ /**
+ * Statistics updated by an output stream during its actual operation.
+ * Some of these stats may be relayed. However, as block upload is
+ * spans multiple
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public final class OutputStreamStatistics implements Closeable {
+ private final AtomicLong blocksSubmitted = new AtomicLong(0);
+ private final AtomicLong blocksInQueue = new AtomicLong(0);
+ private final AtomicLong blocksActive = new AtomicLong(0);
+ private final AtomicLong blockUploadsCompleted = new AtomicLong(0);
+ private final AtomicLong blockUploadsFailed = new AtomicLong(0);
+ private final AtomicLong bytesPendingUpload = new AtomicLong(0);
+
+ private final AtomicLong bytesUploaded = new AtomicLong(0);
+ private final AtomicLong transferDuration = new AtomicLong(0);
+ private final AtomicLong queueDuration = new AtomicLong(0);
+ private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
+
+ /**
+ * Block is queued for upload.
+ */
+ void blockUploadQueued(int blockSize) {
+ blocksSubmitted.incrementAndGet();
+ blocksInQueue.incrementAndGet();
+ bytesPendingUpload.addAndGet(blockSize);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, 1);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, blockSize);
+ }
+
+ /** Queued block has been scheduled for upload. */
+ void blockUploadStarted(long duration, int blockSize) {
+ queueDuration.addAndGet(duration);
+ blocksInQueue.decrementAndGet();
+ blocksActive.incrementAndGet();
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_PENDING, -1);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, 1);
+ }
+
+ /** A block upload has completed. */
+ void blockUploadCompleted(long duration, int blockSize) {
+ this.transferDuration.addAndGet(duration);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_ACTIVE, -1);
+ blocksActive.decrementAndGet();
+ blockUploadsCompleted.incrementAndGet();
+ }
+
+ /**
+ * A block upload has failed.
+ * A final transfer completed event is still expected, so this
+ * does not decrement the active block counter.
+ */
+ void blockUploadFailed(long duration, int blockSize) {
+ blockUploadsFailed.incrementAndGet();
+ }
+
+ /** Intermediate report of bytes uploaded. */
+ void bytesTransferred(long byteCount) {
+ bytesUploaded.addAndGet(byteCount);
+ bytesPendingUpload.addAndGet(-byteCount);
+ incrementGauge(STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING, -byteCount);
+ }
+
+ /**
+ * Note an exception in a multipart complete.
+ */
+ void exceptionInMultipartComplete() {
+ exceptionsInMultipartFinalize.incrementAndGet();
+ }
+
+ /**
+ * Note an exception in a multipart abort.
+ */
+ void exceptionInMultipartAbort() {
+ exceptionsInMultipartFinalize.incrementAndGet();
+ }
+
+ /**
+ * Get the number of bytes pending upload.
+ * @return the number of bytes in the pending upload state.
+ */
+ public long getBytesPendingUpload() {
+ return bytesPendingUpload.get();
+ }
+
+ /**
+ * Output stream has closed.
+ * Trigger merge in of all statistics not updated during operation.
+ */
+ @Override
+ public void close() {
+ if (bytesPendingUpload.get() > 0) {
+ LOG.warn("Closing output stream statistics while data is still marked" +
+ " as pending upload in {}", this);
+ }
+ mergeOutputStreamStatistics(this);
+ }
+
+ long averageQueueTime() {
+ return blocksSubmitted.get() > 0 ?
+ (queueDuration.get() / blocksSubmitted.get()) : 0;
+ }
+
+ double effectiveBandwidth() {
+ double duration = totalUploadDuration() / 1000.0;
+ return duration > 0 ?
+ (bytesUploaded.get() / duration) : 0;
+ }
+
+ long totalUploadDuration() {
+ return queueDuration.get() + transferDuration.get();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "OutputStreamStatistics{");
+ sb.append("blocksSubmitted=").append(blocksSubmitted);
+ sb.append(", blocksInQueue=").append(blocksInQueue);
+ sb.append(", blocksActive=").append(blocksActive);
+ sb.append(", blockUploadsCompleted=").append(blockUploadsCompleted);
+ sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
+ sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
+ sb.append(", bytesUploaded=").append(bytesUploaded);
+ sb.append(", exceptionsInMultipartFinalize=").append(
+ exceptionsInMultipartFinalize);
+ sb.append(", transferDuration=").append(transferDuration).append(" ms");
+ sb.append(", queueDuration=").append(queueDuration).append(" ms");
+ sb.append(", averageQueueTime=").append(averageQueueTime()).append(" ms");
+ sb.append(", totalUploadDuration=").append(totalUploadDuration())
+ .append(" ms");
+ sb.append(", effectiveBandwidth=").append(effectiveBandwidth())
+ .append(" bytes/s");
+ sb.append('}');
+ return sb.toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 23ba682..6ebc9e4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,8 +35,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/**
@@ -45,37 +45,27 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.*;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AOutputStream extends OutputStream {
- private OutputStream backupStream;
- private File backupFile;
- private boolean closed;
- private String key;
- private Progressable progress;
- private long partSize;
- private long partSizeThreshold;
- private S3AFileSystem fs;
- private LocalDirAllocator lDirAlloc;
+ private final OutputStream backupStream;
+ private final File backupFile;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final String key;
+ private final Progressable progress;
+ private final S3AFileSystem fs;
public static final Logger LOG = S3AFileSystem.LOG;
public S3AOutputStream(Configuration conf,
- S3AFileSystem fs, String key, Progressable progress)
+ S3AFileSystem fs,
+ String key,
+ Progressable progress)
throws IOException {
this.key = key;
this.progress = progress;
this.fs = fs;
- partSize = fs.getPartitionSize();
- partSizeThreshold = fs.getMultiPartThreshold();
-
- if (conf.get(BUFFER_DIR, null) != null) {
- lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
- } else {
- lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
- }
- backupFile = lDirAlloc.createTmpFileForWrite("output-",
+ backupFile = fs.createTmpFileForWrite("output-",
LocalDirAllocator.SIZE_UNKNOWN, conf);
- closed = false;
LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
key, backupFile);
@@ -84,25 +74,33 @@ public class S3AOutputStream extends OutputStream {
new FileOutputStream(backupFile));
}
+ /**
+ * Check for the filesystem being open.
+ * @throws IOException if the filesystem is closed.
+ */
+ void checkOpen() throws IOException {
+ if (closed.get()) {
+ throw new IOException("Output Stream closed");
+ }
+ }
+
@Override
public void flush() throws IOException {
+ checkOpen();
backupStream.flush();
}
@Override
- public synchronized void close() throws IOException {
- if (closed) {
+ public void close() throws IOException {
+ if (closed.getAndSet(true)) {
return;
}
backupStream.close();
LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
- LOG.debug("Minimum upload part size: {} threshold {}" , partSize,
- partSizeThreshold);
-
try {
- final ObjectMetadata om = fs.newObjectMetadata();
+ final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
Upload upload = fs.putObject(
fs.newPutObjectRequest(
key,
@@ -126,18 +124,19 @@ public class S3AOutputStream extends OutputStream {
LOG.warn("Could not delete temporary s3a file: {}", backupFile);
}
super.close();
- closed = true;
}
LOG.debug("OutputStream for key '{}' upload complete", key);
}
@Override
public void write(int b) throws IOException {
+ checkOpen();
backupStream.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
+ checkOpen();
backupStream.write(b, off, len);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 93d819b..c89f690 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ExecutionException;
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
import static org.apache.hadoop.fs.s3a.Constants.AWS_CREDENTIALS_PROVIDER;
import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
/**
@@ -460,4 +461,42 @@ public final class S3AUtils {
key, v, min));
return v;
}
+
+ /**
+ * Get a size property from the configuration: this property must
+ * be at least equal to {@link Constants#MULTIPART_MIN_SIZE}.
+ * If it is too small, it is rounded up to that minimum, and a warning
+ * printed.
+ * @param conf configuration
+ * @param property property name
+ * @param defVal default value
+ * @return the value, guaranteed to be above the minimum size
+ */
+ public static long getMultipartSizeProperty(Configuration conf,
+ String property, long defVal) {
+ long partSize = conf.getLong(property, defVal);
+ if (partSize < MULTIPART_MIN_SIZE) {
+ LOG.warn("{} must be at least 5 MB; configured value is {}",
+ property, partSize);
+ partSize = MULTIPART_MIN_SIZE;
+ }
+ return partSize;
+ }
+
+ /**
+ * Ensure that the long value is in the range of an integer.
+ * @param name property name for error messages
+ * @param size original size
+ * @return the size, guaranteed to be less than or equal to the max
+ * value of an integer.
+ */
+ public static int ensureOutputParameterInRange(String name, long size) {
+ if (size > Integer.MAX_VALUE) {
+ LOG.warn("s3a: {} capped to ~2.14GB" +
+ " (maximum allowed size with current output mechanism)", name);
+ return Integer.MAX_VALUE;
+ } else {
+ return (int)size;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
new file mode 100644
index 0000000..6b21912
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
+ * contains the thread pool logic, whereas this isolates the semaphore
+ * and submit logic for use with other thread pools and delegation models.
+ * In particular, it <i>permits multiple per stream executors to share a
+ * single per-FS-instance executor; the latter to throttle overall
+ * load from the the FS, the others to limit the amount of load which
+ * a single output stream can generate.</i>
+ * <p>
+ * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
+ * this s4 threadpool</a>
+ */
+@SuppressWarnings("NullableProblems")
+@InterfaceAudience.Private
+class SemaphoredDelegatingExecutor extends
+ ForwardingListeningExecutorService {
+
+ private final Semaphore queueingPermits;
+ private final ListeningExecutorService executorDelegatee;
+ private final int permitCount;
+
+ /**
+ * Instantiate.
+ * @param executorDelegatee Executor to delegate to
+ * @param permitCount number of permits into the queue permitted
+ * @param fair should the semaphore be "fair"
+ */
+ SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
+ int permitCount,
+ boolean fair) {
+ this.permitCount = permitCount;
+ queueingPermits = new Semaphore(permitCount, fair);
+ this.executorDelegatee = executorDelegatee;
+ }
+
+ @Override
+ protected ListeningExecutorService delegate() {
+ return executorDelegatee;
+ }
+
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+ long timeout, TimeUnit unit) throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+ TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new CallableWithPermitRelease<>(task));
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task), result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task));
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ super.execute(new RunnableWithPermitRelease(command));
+ }
+
+ /**
+ * Get the number of permits available; guaranteed to be
+ * {@code 0 <= availablePermits <= size}.
+ * @return the number of permits available at the time of invocation.
+ */
+ public int getAvailablePermits() {
+ return queueingPermits.availablePermits();
+ }
+
+ /**
+ * Get the number of threads waiting to acquire a permit.
+ * @return snapshot of the length of the queue of blocked threads.
+ */
+ public int getWaitingCount() {
+ return queueingPermits.getQueueLength();
+ }
+
+ /**
+ * Total number of permits.
+ * @return the number of permits as set in the constructor
+ */
+ public int getPermitCount() {
+ return permitCount;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "SemaphoredDelegatingExecutor{");
+ sb.append("permitCount=").append(getPermitCount());
+ sb.append(", available=").append(getAvailablePermits());
+ sb.append(", waiting=").append(getWaitingCount());
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Releases a permit after the task is executed.
+ */
+ class RunnableWithPermitRelease implements Runnable {
+
+ private Runnable delegatee;
+
+ public RunnableWithPermitRelease(Runnable delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public void run() {
+ try {
+ delegatee.run();
+ } finally {
+ queueingPermits.release();
+ }
+
+ }
+ }
+
+ /**
+ * Releases a permit after the task is completed.
+ */
+ class CallableWithPermitRelease<T> implements Callable<T> {
+
+ private Callable<T> delegatee;
+
+ public CallableWithPermitRelease(Callable<T> delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public T call() throws Exception {
+ try {
+ return delegatee.call();
+ } finally {
+ queueingPermits.release();
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index d84a355..36ec50b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -81,10 +81,16 @@ public enum Statistic {
"Object multipart upload aborted"),
OBJECT_PUT_REQUESTS("object_put_requests",
"Object put/multipart upload count"),
+ OBJECT_PUT_REQUESTS_COMPLETED("object_put_requests_completed",
+ "Object put/multipart upload completed count"),
+ OBJECT_PUT_REQUESTS_ACTIVE("object_put_requests_active",
+ "Current number of active put requests"),
OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"),
+ OBJECT_PUT_BYTES_PENDING("object_put_bytes_pending",
+ "number of bytes queued for upload/being actively uploaded"),
STREAM_ABORTED("stream_aborted",
"Count of times the TCP stream was aborted"),
- STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_pperations",
+ STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_operations",
"Number of executed seek operations which went backwards in a stream"),
STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"),
STREAM_CLOSE_OPERATIONS("stream_close_operations",
@@ -112,7 +118,29 @@ public enum Statistic {
STREAM_CLOSE_BYTES_READ("stream_bytes_read_in_close",
"Count of bytes read when closing streams during seek operations."),
STREAM_ABORT_BYTES_DISCARDED("stream_bytes_discarded_in_abort",
- "Count of bytes discarded by aborting the stream");
+ "Count of bytes discarded by aborting the stream"),
+ STREAM_WRITE_FAILURES("stream_write_failures",
+ "Count of stream write failures reported"),
+ STREAM_WRITE_BLOCK_UPLOADS("stream_write_block_uploads",
+ "Count of block/partition uploads completed"),
+ STREAM_WRITE_BLOCK_UPLOADS_ACTIVE("stream_write_block_uploads_active",
+ "Count of block/partition uploads completed"),
+ STREAM_WRITE_BLOCK_UPLOADS_COMMITTED("stream_write_block_uploads_committed",
+ "Count of number of block uploads committed"),
+ STREAM_WRITE_BLOCK_UPLOADS_ABORTED("stream_write_block_uploads_aborted",
+ "Count of number of block uploads aborted"),
+
+ STREAM_WRITE_BLOCK_UPLOADS_PENDING("stream_write_block_uploads_pending",
+ "Gauge of block/partitions uploads queued to be written"),
+ STREAM_WRITE_BLOCK_UPLOADS_DATA_PENDING(
+ "stream_write_block_uploads_data_pending",
+ "Gauge of block/partitions data uploads queued to be written"),
+ STREAM_WRITE_TOTAL_TIME("stream_write_total_time",
+ "Count of total time taken for uploads to complete"),
+ STREAM_WRITE_TOTAL_DATA("stream_write_total_data",
+ "Count of total data uploaded in block output"),
+ STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration",
+ "Total queue duration of all block uploads");
private static final Map<String, Statistic> SYMBOL_MAP =
new HashMap<>(Statistic.values().length);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org