You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2016/10/18 20:16:50 UTC
[04/12] hadoop git commit: HADOOP-13560. S3ABlockOutputStream to
support huge (many GB) file writes. Contributed by Steve Loughran
HADOOP-13560. S3ABlockOutputStream to support huge (many GB) file writes. Contributed by Steve Loughran
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bc176961
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bc176961
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bc176961
Branch: refs/heads/branch-2
Commit: bc176961e674c8a770d96164602fb565fdeb1fb0
Parents: 3972bb3
Author: Steve Loughran <st...@apache.org>
Authored: Tue Oct 18 19:33:38 2016 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Oct 18 19:33:38 2016 +0100
----------------------------------------------------------------------
.../src/main/resources/core-default.xml | 74 +-
.../hadoop/fs/contract/ContractTestUtils.java | 16 +-
hadoop-tools/hadoop-aws/pom.xml | 58 +-
.../s3a/BlockingThreadPoolExecutorService.java | 184 +----
.../org/apache/hadoop/fs/s3a/Constants.java | 71 +-
.../hadoop/fs/s3a/S3ABlockOutputStream.java | 703 ++++++++++++++++
.../org/apache/hadoop/fs/s3a/S3ADataBlocks.java | 821 +++++++++++++++++++
.../hadoop/fs/s3a/S3AFastOutputStream.java | 410 ---------
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 418 ++++++++--
.../hadoop/fs/s3a/S3AInstrumentation.java | 248 +++++-
.../apache/hadoop/fs/s3a/S3AOutputStream.java | 57 +-
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 39 +
.../fs/s3a/SemaphoredDelegatingExecutor.java | 230 ++++++
.../org/apache/hadoop/fs/s3a/Statistic.java | 32 +-
.../src/site/markdown/tools/hadoop-aws/index.md | 668 +++++++++++++--
.../fs/contract/s3a/ITestS3AContractDistCp.java | 10 +-
.../hadoop/fs/s3a/AbstractS3ATestBase.java | 1 +
.../ITestBlockingThreadPoolExecutorService.java | 48 +-
.../hadoop/fs/s3a/ITestS3ABlockOutputArray.java | 90 ++
.../fs/s3a/ITestS3ABlockOutputByteBuffer.java | 30 +
.../hadoop/fs/s3a/ITestS3ABlockOutputDisk.java | 30 +
.../fs/s3a/ITestS3ABlockingThreadPool.java | 2 +
.../hadoop/fs/s3a/ITestS3AConfiguration.java | 29 +
.../ITestS3AEncryptionBlockOutputStream.java | 36 +
.../s3a/ITestS3AEncryptionFastOutputStream.java | 35 -
.../hadoop/fs/s3a/ITestS3AFastOutputStream.java | 74 --
.../apache/hadoop/fs/s3a/ITestS3ATestUtils.java | 98 +++
.../apache/hadoop/fs/s3a/S3ATestConstants.java | 75 +-
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 148 +++-
.../apache/hadoop/fs/s3a/TestDataBlocks.java | 124 +++
.../ITestS3AFileContextStatistics.java | 1 +
.../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 412 ++++++++++
.../fs/s3a/scale/ITestS3ADeleteManyFiles.java | 19 +-
.../s3a/scale/ITestS3AHugeFilesArrayBlocks.java | 31 +
.../ITestS3AHugeFilesByteBufferBlocks.java | 34 +
.../scale/ITestS3AHugeFilesClassicOutput.java | 41 +
.../s3a/scale/ITestS3AHugeFilesDiskBlocks.java | 31 +
.../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 151 ++--
38 files changed, 4655 insertions(+), 924 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index e8db5d7..b4d019b 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1003,8 +1003,8 @@
<property>
<name>fs.s3a.threads.max</name>
<value>10</value>
- <description> Maximum number of concurrent active (part)uploads,
- which each use a thread from the threadpool.</description>
+ <description>The total number of threads available in the filesystem for data
+ uploads *or any other queued filesystem operation*.</description>
</property>
<property>
@@ -1017,8 +1017,7 @@
<property>
<name>fs.s3a.max.total.tasks</name>
<value>5</value>
- <description>Number of (part)uploads allowed to the queue before
- blocking additional uploads.</description>
+ <description>The number of operations which can be queued for execution</description>
</property>
<property>
@@ -1056,13 +1055,21 @@
<name>fs.s3a.multipart.purge</name>
<value>false</value>
<description>True if you want to purge existing multipart uploads that may not have been
- completed/aborted correctly</description>
+ completed/aborted correctly. The corresponding purge age is defined in
+ fs.s3a.multipart.purge.age.
+ If set, when the filesystem is instantiated then all outstanding uploads
+ older than the purge age will be terminated -across the entire bucket.
+ This will impact multipart uploads by other applications and users. so should
+ be used sparingly, with an age value chosen to stop failed uploads, without
+ breaking ongoing operations.
+ </description>
</property>
<property>
<name>fs.s3a.multipart.purge.age</name>
<value>86400</value>
- <description>Minimum age in seconds of multipart uploads to purge</description>
+ <description>Minimum age in seconds of multipart uploads to purge.
+ </description>
</property>
<property>
@@ -1095,10 +1102,50 @@
<property>
<name>fs.s3a.fast.upload</name>
<value>false</value>
- <description>Upload directly from memory instead of buffering to
- disk first. Memory usage and parallelism can be controlled as up to
- fs.s3a.multipart.size memory is consumed for each (part)upload actively
- uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
+ <description>
+ Use the incremental block-based fast upload mechanism with
+ the buffering mechanism set in fs.s3a.fast.upload.buffer.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.fast.upload.buffer</name>
+ <value>disk</value>
+ <description>
+ The buffering mechanism to use when using S3A fast upload
+ (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
+ This configuration option has no effect if fs.s3a.fast.upload is false.
+
+ "disk" will use the directories listed in fs.s3a.buffer.dir as
+ the location(s) to save data prior to being uploaded.
+
+ "array" uses arrays in the JVM heap
+
+ "bytebuffer" uses off-heap memory within the JVM.
+
+ Both "array" and "bytebuffer" will consume memory in a single stream up to the number
+ of blocks set by:
+
+ fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
+
+ If using either of these mechanisms, keep this value low
+
+ The total number of threads performing work across all threads is set by
+ fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
+ work items.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.fast.upload.active.blocks</name>
+ <value>4</value>
+ <description>
+ Maximum Number of blocks a single output stream can have
+ active (uploading, or queued to the central FileSystem
+ instance's pool of queued operations.
+
+ This stops a single stream overloading the shared thread pool.
+ </description>
</property>
<property>
@@ -1110,13 +1157,6 @@
</property>
<property>
- <name>fs.s3a.fast.buffer.size</name>
- <value>1048576</value>
- <description>Size of initial memory buffer in bytes allocated for an
- upload. No effect if fs.s3a.fast.upload is false.</description>
-</property>
-
-<property>
<name>fs.s3a.user.agent.prefix</name>
<value></value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 03f47c1..16bfb9a 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -965,7 +965,7 @@ public class ContractTestUtils extends Assert {
* @return the number of megabytes/second of the recorded operation
*/
public static double bandwidthMBs(long bytes, long durationNS) {
- return (bytes * 1000.0) / durationNS;
+ return bytes / (1024.0 * 1024) * 1.0e9 / durationNS;
}
/**
@@ -1415,6 +1415,14 @@ public class ContractTestUtils extends Assert {
return endTime - startTime;
}
+ /**
+ * Intermediate duration of the operation.
+ * @return how much time has passed since the start (in nanos).
+ */
+ public long elapsedTime() {
+ return now() - startTime;
+ }
+
public double bandwidth(long bytes) {
return bandwidthMBs(bytes, duration());
}
@@ -1422,10 +1430,12 @@ public class ContractTestUtils extends Assert {
/**
* Bandwidth as bytes per second.
* @param bytes bytes in
- * @return the number of bytes per second this operation timed.
+ * @return the number of bytes per second this operation.
+ * 0 if duration == 0.
*/
public double bandwidthBytes(long bytes) {
- return (bytes * 1.0) / duration();
+ double duration = duration();
+ return duration > 0 ? bytes / duration : 0;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 7d3e4eb..28825d9 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -35,6 +35,15 @@
<file.encoding>UTF-8</file.encoding>
<downloadSources>true</downloadSources>
<hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir>
+
+ <!-- are scale tests enabled ? -->
+ <fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled>
+ <!-- Size in MB of huge files. -->
+ <fs.s3a.scale.test.huge.filesize>unset</fs.s3a.scale.test.huge.filesize>
+ <!-- Size in MB of the partion size in huge file uploads. -->
+ <fs.s3a.scale.test.huge.partitionsize>unset</fs.s3a.scale.test.huge.partitionsize>
+ <!-- Timeout in seconds for scale tests.-->
+ <fs.s3a.scale.test.timeout>3600</fs.s3a.scale.test.timeout>
</properties>
<profiles>
@@ -115,6 +124,11 @@
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <!-- Propagate scale parameters -->
+ <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+ <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+ <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
+ <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
</systemPropertyVariables>
</configuration>
</plugin>
@@ -132,7 +146,10 @@
<forkCount>${testsThreadCount}</forkCount>
<reuseForks>false</reuseForks>
<argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+ <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
<systemPropertyVariables>
+ <!-- Tell tests that they are being executed in parallel -->
+ <test.parallel.execution>true</test.parallel.execution>
<test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
<test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
<hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
@@ -142,6 +159,11 @@
<!-- substitution. Putting a prefix in front of it like -->
<!-- "fork-" makes it work. -->
<test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <!-- Propagate scale parameters -->
+ <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+ <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+ <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
+ <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
</systemPropertyVariables>
<!-- Some tests cannot run in parallel. Tests that cover -->
<!-- access to the root directory must run in isolation -->
@@ -160,10 +182,11 @@
<excludes>
<exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude>
<exclude>**/ITestS3ABlockingThreadPool.java</exclude>
- <exclude>**/ITestS3AFastOutputStream.java</exclude>
<exclude>**/ITestS3AFileSystemContract.java</exclude>
<exclude>**/ITestS3AMiniYarnCluster.java</exclude>
<exclude>**/ITest*Root*.java</exclude>
+ <exclude>**/ITestS3AFileContextStatistics.java</exclude>
+ <include>**/ITestS3AHuge*.java</include>
</excludes>
</configuration>
</execution>
@@ -174,6 +197,16 @@
<goal>verify</goal>
</goals>
<configuration>
+ <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <systemPropertyVariables>
+ <!-- Tell tests that they are being executed sequentially -->
+ <test.parallel.execution>false</test.parallel.execution>
+ <!-- Propagate scale parameters -->
+ <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+ <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+ <fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
+ <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+ </systemPropertyVariables>
<!-- Do a sequential run for tests that cannot handle -->
<!-- parallel execution. -->
<includes>
@@ -183,6 +216,8 @@
<include>**/ITestS3AFileSystemContract.java</include>
<include>**/ITestS3AMiniYarnCluster.java</include>
<include>**/ITest*Root*.java</include>
+ <include>**/ITestS3AFileContextStatistics.java</include>
+ <include>**/ITestS3AHuge*.java</include>
</includes>
</configuration>
</execution>
@@ -210,7 +245,13 @@
<goal>verify</goal>
</goals>
<configuration>
- <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
+ <systemPropertyVariables>
+ <!-- Propagate scale parameters -->
+ <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
+ <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
+ <fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+ </systemPropertyVariables>
+ <forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
</configuration>
</execution>
</executions>
@@ -218,6 +259,19 @@
</plugins>
</build>
</profile>
+
+ <!-- Turn on scale tests-->
+ <profile>
+ <id>scale</id>
+ <activation>
+ <property>
+ <name>scale</name>
+ </property>
+ </activation>
+ <properties >
+ <fs.s3a.scale.test.enabled>true</fs.s3a.scale.test.enabled>
+ </properties>
+ </profile>
</profiles>
<build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index fc8ae87..eb40c3a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -18,30 +18,21 @@
package org.apache.hadoop.fs.s3a;
-import java.util.Collection;
-import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.util.concurrent.ForwardingListeningExecutorService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.classification.InterfaceAudience;
+
/**
* This ExecutorService blocks the submission of new tasks when its queue is
* already full by using a semaphore. Task submissions require permits, task
@@ -50,17 +41,17 @@ import com.google.common.util.concurrent.MoreExecutors;
* This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
* this s4 threadpool</a>
*/
-public class BlockingThreadPoolExecutorService
- extends ForwardingListeningExecutorService {
+@InterfaceAudience.Private
+final class BlockingThreadPoolExecutorService
+ extends SemaphoredDelegatingExecutor {
private static final Logger LOG = LoggerFactory
.getLogger(BlockingThreadPoolExecutorService.class);
- private Semaphore queueingPermits;
- private ListeningExecutorService executorDelegatee;
-
private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
+ private final ThreadPoolExecutor eventProcessingExecutor;
+
/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each
* created thread uniquely,
@@ -69,7 +60,7 @@ public class BlockingThreadPoolExecutorService
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
- public static ThreadFactory getNamedThreadFactory(final String prefix) {
+ static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
@@ -113,6 +104,12 @@ public class BlockingThreadPoolExecutorService
};
}
+ private BlockingThreadPoolExecutorService(int permitCount,
+ ThreadPoolExecutor eventProcessingExecutor) {
+ super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
+ permitCount, false);
+ this.eventProcessingExecutor = eventProcessingExecutor;
+ }
/**
* A thread pool that that blocks clients submitting additional tasks if
@@ -125,10 +122,12 @@ public class BlockingThreadPoolExecutorService
* @param unit time unit
* @param prefixName prefix of name for threads
*/
- public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
- long keepAliveTime, TimeUnit unit, String prefixName) {
- super();
- queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
+ public static BlockingThreadPoolExecutorService newInstance(
+ int activeTasks,
+ int waitingTasks,
+ long keepAliveTime, TimeUnit unit,
+ String prefixName) {
+
/* Although we generally only expect up to waitingTasks tasks in the
queue, we need to be able to buffer all tasks in case dequeueing is
slower than enqueueing. */
@@ -138,135 +137,34 @@ public class BlockingThreadPoolExecutorService
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
workQueue, newDaemonThreadFactory(prefixName),
new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r,
- ThreadPoolExecutor executor) {
- // This is not expected to happen.
- LOG.error("Could not submit task to executor {}",
- executor.toString());
- }
- });
+ @Override
+ public void rejectedExecution(Runnable r,
+ ThreadPoolExecutor executor) {
+ // This is not expected to happen.
+ LOG.error("Could not submit task to executor {}",
+ executor.toString());
+ }
+ });
eventProcessingExecutor.allowCoreThreadTimeOut(true);
- executorDelegatee =
- MoreExecutors.listeningDecorator(eventProcessingExecutor);
-
- }
-
- @Override
- protected ListeningExecutorService delegate() {
- return executorDelegatee;
- }
-
- @Override
- public <T> ListenableFuture<T> submit(Callable<T> task) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new CallableWithPermitRelease<T>(task));
- }
-
- @Override
- public <T> ListenableFuture<T> submit(Runnable task, T result) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new RunnableWithPermitRelease(task), result);
- }
-
- @Override
- public ListenableFuture<?> submit(Runnable task) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new RunnableWithPermitRelease(task));
- }
-
- @Override
- public void execute(Runnable command) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- super.execute(new RunnableWithPermitRelease(command));
+ return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
+ eventProcessingExecutor);
}
/**
- * Releases a permit after the task is executed.
+ * Get the actual number of active threads.
+ * @return the active thread count
*/
- class RunnableWithPermitRelease implements Runnable {
-
- private Runnable delegatee;
-
- public RunnableWithPermitRelease(Runnable delegatee) {
- this.delegatee = delegatee;
- }
-
- @Override
- public void run() {
- try {
- delegatee.run();
- } finally {
- queueingPermits.release();
- }
-
- }
- }
-
- /**
- * Releases a permit after the task is completed.
- */
- class CallableWithPermitRelease<T> implements Callable<T> {
-
- private Callable<T> delegatee;
-
- public CallableWithPermitRelease(Callable<T> delegatee) {
- this.delegatee = delegatee;
- }
-
- @Override
- public T call() throws Exception {
- try {
- return delegatee.call();
- } finally {
- queueingPermits.release();
- }
- }
-
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- throw new RuntimeException("Not implemented");
+ int getActiveCount() {
+ return eventProcessingExecutor.getActiveCount();
}
@Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit) throws InterruptedException {
- throw new RuntimeException("Not implemented");
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "BlockingThreadPoolExecutorService{");
+ sb.append(super.toString());
+ sb.append(", activeCount=").append(getActiveCount());
+ sb.append('}');
+ return sb.toString();
}
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
- TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- throw new RuntimeException("Not implemented");
- }
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index cf97c35..d2f0b90 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -35,6 +35,9 @@ public final class Constants {
private Constants() {
}
+ /** The minimum multipart size which S3 supports. */
+ public static final int MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
+
// s3 access key
public static final String ACCESS_KEY = "fs.s3a.access.key";
@@ -129,14 +132,72 @@ public final class Constants {
// comma separated list of directories
public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
- // should we upload directly from memory rather than using a file buffer
+ // switch to the fast block-by-block upload mechanism
public static final String FAST_UPLOAD = "fs.s3a.fast.upload";
public static final boolean DEFAULT_FAST_UPLOAD = false;
//initial size of memory buffer for a fast upload
+ @Deprecated
public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size";
public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB
+ /**
+ * What buffer to use.
+ * Default is {@link #FAST_UPLOAD_BUFFER_DISK}
+ * Value: {@value}
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BUFFER =
+ "fs.s3a.fast.upload.buffer";
+
+ /**
+ * Buffer blocks to disk: {@value}.
+ * Capacity is limited to available disk space.
+ */
+
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BUFFER_DISK = "disk";
+
+ /**
+ * Use an in-memory array. Fast but will run of heap rapidly: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BUFFER_ARRAY = "array";
+
+ /**
+ * Use a byte buffer. May be more memory efficient than the
+ * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer";
+
+ /**
+ * Default buffer option: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String DEFAULT_FAST_UPLOAD_BUFFER =
+ FAST_UPLOAD_BUFFER_DISK;
+
+ /**
+ * Maximum Number of blocks a single output stream can have
+ * active (uploading, or queued to the central FileSystem
+ * instance's pool of queued operations.
+ * This stops a single stream overloading the shared thread pool.
+ * {@value}
+ * <p>
+ * Default is {@link #DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS}
+ */
+ @InterfaceStability.Unstable
+ public static final String FAST_UPLOAD_ACTIVE_BLOCKS =
+ "fs.s3a.fast.upload.active.blocks";
+
+ /**
+ * Limit of queued block upload operations before writes
+ * block. Value: {@value}
+ */
+ @InterfaceStability.Unstable
+ public static final int DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS = 4;
+
// Private | PublicRead | PublicReadWrite | AuthenticatedRead |
// LogDeliveryWrite | BucketOwnerRead | BucketOwnerFullControl
public static final String CANNED_ACL = "fs.s3a.acl.default";
@@ -150,7 +211,7 @@ public final class Constants {
// purge any multipart uploads older than this number of seconds
public static final String PURGE_EXISTING_MULTIPART_AGE =
"fs.s3a.multipart.purge.age";
- public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
+ public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
// s3 server-side encryption
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
@@ -220,4 +281,10 @@ public final class Constants {
public static final Class<? extends S3ClientFactory>
DEFAULT_S3_CLIENT_FACTORY_IMPL =
S3ClientFactory.DefaultS3ClientFactory.class;
+
+ /**
+ * Maximum number of partitions in a multipart upload: {@value}.
+ */
+ @InterfaceAudience.Private
+ public static final int MAX_MULTIPART_COUNT = 10000;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
new file mode 100644
index 0000000..b66a23f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -0,0 +1,703 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
+/**
+ * Upload files/parts directly via different buffering mechanisms:
+ * including memory and disk.
+ *
+ * If the stream is closed and no update has started, then the upload
+ * is instead done as a single PUT operation.
+ *
+ * Unstable: statistics and error handling might evolve.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class S3ABlockOutputStream extends OutputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ABlockOutputStream.class);
+
+ /** Owner FileSystem. */
+ private final S3AFileSystem fs;
+
+ /** Object being uploaded. */
+ private final String key;
+
+ /** Size of all blocks. */
+ private final int blockSize;
+
+ /** Callback for progress. */
+ private final ProgressListener progressListener;
+ private final ListeningExecutorService executorService;
+
+ /**
+ * Retry policy for multipart commits; not all AWS SDK versions retry that.
+ */
+ private final RetryPolicy retryPolicy =
+ RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
+ 5,
+ 2000,
+ TimeUnit.MILLISECONDS);
+ /**
+ * Factory for blocks.
+ */
+ private final S3ADataBlocks.BlockFactory blockFactory;
+
+ /** Preallocated byte buffer for writing single characters. */
+ private final byte[] singleCharWrite = new byte[1];
+
+ /** Multipart upload details; null means none started. */
+ private MultiPartUpload multiPartUpload;
+
+ /** Closed flag. */
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ /** Current data block. Null means none currently active */
+ private S3ADataBlocks.DataBlock activeBlock;
+
+ /** Count of blocks uploaded. */
+ private long blockCount = 0;
+
+ /** Statistics to build up. */
+ private final S3AInstrumentation.OutputStreamStatistics statistics;
+
+ /**
+ * Write operation helper; encapsulation of the filesystem operations.
+ */
+ private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
+
+ /**
+ * An S3A output stream which uploads partitions in a separate pool of
+ * threads; different {@link S3ADataBlocks.BlockFactory}
+ * instances can control where data is buffered.
+ *
+ * @param fs S3AFilesystem
+ * @param key S3 object to work on.
+ * @param executorService the executor service to use to schedule work
+ * @param progress report progress in order to prevent timeouts. If
+ * this object implements {@code ProgressListener} then it will be
+ * directly wired up to the AWS client, so receive detailed progress
+ * information.
+ * @param blockSize size of a single block.
+ * @param blockFactory factory for creating stream destinations
+ * @param statistics stats for this stream
+ * @param writeOperationHelper state of the write operation.
+ * @throws IOException on any problem
+ */
+ S3ABlockOutputStream(S3AFileSystem fs,
+ String key,
+ ExecutorService executorService,
+ Progressable progress,
+ long blockSize,
+ S3ADataBlocks.BlockFactory blockFactory,
+ S3AInstrumentation.OutputStreamStatistics statistics,
+ S3AFileSystem.WriteOperationHelper writeOperationHelper)
+ throws IOException {
+ this.fs = fs;
+ this.key = key;
+ this.blockFactory = blockFactory;
+ this.blockSize = (int) blockSize;
+ this.statistics = statistics;
+ this.writeOperationHelper = writeOperationHelper;
+ Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
+ "Block size is too small: %d", blockSize);
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ this.multiPartUpload = null;
+ this.progressListener = (progress instanceof ProgressListener) ?
+ (ProgressListener) progress
+ : new ProgressableListener(progress);
+ // create that first block. This guarantees that an open + close sequence
+ // writes a 0-byte entry.
+ createBlockIfNeeded();
+ LOG.debug("Initialized S3ABlockOutputStream for {}" +
+ " output to {}", writeOperationHelper, activeBlock);
+ }
+
+ /**
+ * Demand create a destination block.
+ * @return the active block; null if there isn't one.
+ * @throws IOException on any failure to create
+ */
+ private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded()
+ throws IOException {
+ if (activeBlock == null) {
+ blockCount++;
+ if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
+ LOG.error("Number of partitions in stream exceeds limit for S3: " +
+ + Constants.MAX_MULTIPART_COUNT + " write may fail.");
+ }
+ activeBlock = blockFactory.create(this.blockSize);
+ }
+ return activeBlock;
+ }
+
+ /**
+ * Synchronized accessor to the active block.
+ * @return the active block; null if there isn't one.
+ */
+ private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
+ return activeBlock;
+ }
+
+ /**
+ * Predicate to query whether or not there is an active block.
+ * @return true if there is an active block.
+ */
+ private synchronized boolean hasActiveBlock() {
+ return activeBlock != null;
+ }
+
+ /**
+ * Clear the active block.
+ */
+ private void clearActiveBlock() {
+ LOG.debug("Clearing active block");
+ synchronized (this) {
+ activeBlock = null;
+ }
+ }
+
+ /**
+ * Check for the filesystem being open.
+ * @throws IOException if the filesystem is closed.
+ */
+ void checkOpen() throws IOException {
+ if (closed.get()) {
+ throw new IOException("Filesystem " + writeOperationHelper + " closed");
+ }
+ }
+
+ /**
+ * The flush operation does not trigger an upload; that awaits
+ * the next block being full. What it does do is call {@code flush() }
+ * on the current block, leaving it to choose how to react.
+ * @throws IOException Any IO problem.
+ */
+ @Override
+ public synchronized void flush() throws IOException {
+ checkOpen();
+ S3ADataBlocks.DataBlock dataBlock = getActiveBlock();
+ if (dataBlock != null) {
+ dataBlock.flush();
+ }
+ }
+
+ /**
+ * Writes a byte to the destination. If this causes the buffer to reach
+ * its limit, the actual upload is submitted to the threadpool.
+ * @param b the int of which the lowest byte is written
+ * @throws IOException on any problem
+ */
+ @Override
+ public synchronized void write(int b) throws IOException {
+ singleCharWrite[0] = (byte)b;
+ write(singleCharWrite, 0, 1);
+ }
+
+ /**
+ * Writes a range of bytes from to the memory buffer. If this causes the
+ * buffer to reach its limit, the actual upload is submitted to the
+ * threadpool and the remainder of the array is written to memory
+ * (recursively).
+ * @param source byte array containing
+ * @param offset offset in array where to start
+ * @param len number of bytes to be written
+ * @throws IOException on any problem
+ */
+ @Override
+ public synchronized void write(byte[] source, int offset, int len)
+ throws IOException {
+
+ S3ADataBlocks.validateWriteArgs(source, offset, len);
+ checkOpen();
+ if (len == 0) {
+ return;
+ }
+ S3ADataBlocks.DataBlock block = createBlockIfNeeded();
+ int written = block.write(source, offset, len);
+ int remainingCapacity = block.remainingCapacity();
+ if (written < len) {
+ // not everything was written \u2014the block has run out
+ // of capacity
+ // Trigger an upload then process the remainder.
+ LOG.debug("writing more data than block has capacity -triggering upload");
+ uploadCurrentBlock();
+ // tail recursion is mildly expensive, but given buffer sizes must be MB.
+ // it's unlikely to recurse very deeply.
+ this.write(source, offset + written, len - written);
+ } else {
+ if (remainingCapacity == 0) {
+ // the whole buffer is done, trigger an upload
+ uploadCurrentBlock();
+ }
+ }
+ }
+
+ /**
+ * Start an asynchronous upload of the current block.
+ * @throws IOException Problems opening the destination for upload
+ * or initializing the upload.
+ */
+ private synchronized void uploadCurrentBlock() throws IOException {
+ Preconditions.checkState(hasActiveBlock(), "No active block");
+ LOG.debug("Writing block # {}", blockCount);
+ if (multiPartUpload == null) {
+ LOG.debug("Initiating Multipart upload");
+ multiPartUpload = new MultiPartUpload();
+ }
+ try {
+ multiPartUpload.uploadBlockAsync(getActiveBlock());
+ } finally {
+ // set the block to null, so the next write will create a new block.
+ clearActiveBlock();
+ }
+ }
+
+ /**
+ * Close the stream.
+ *
+ * This will not return until the upload is complete
+ * or the attempt to perform the upload has failed.
+ * Exceptions raised in this method are indicative that the write has
+ * failed and data is at risk of being lost.
+ * @throws IOException on any failure.
+ */
+ @Override
+ public void close() throws IOException {
+ if (closed.getAndSet(true)) {
+ // already closed
+ LOG.debug("Ignoring close() as stream is already closed");
+ return;
+ }
+ S3ADataBlocks.DataBlock block = getActiveBlock();
+ boolean hasBlock = hasActiveBlock();
+ LOG.debug("{}: Closing block #{}: current block= {}",
+ this,
+ blockCount,
+ hasBlock ? block : "(none)");
+ try {
+ if (multiPartUpload == null) {
+ if (hasBlock) {
+ // no uploads of data have taken place, put the single block up.
+ // This must happen even if there is no data, so that 0 byte files
+ // are created.
+ putObject();
+ }
+ } else {
+ // there has already been at least one block scheduled for upload;
+ // put up the current then wait
+ if (hasBlock && block.hasData()) {
+ //send last part
+ uploadCurrentBlock();
+ }
+ // wait for the partial uploads to finish
+ final List<PartETag> partETags =
+ multiPartUpload.waitForAllPartUploads();
+ // then complete the operation
+ multiPartUpload.complete(partETags);
+ }
+ LOG.debug("Upload complete for {}", writeOperationHelper);
+ } catch (IOException ioe) {
+ writeOperationHelper.writeFailed(ioe);
+ throw ioe;
+ } finally {
+ LOG.debug("Closing block and factory");
+ IOUtils.closeStream(block);
+ IOUtils.closeStream(blockFactory);
+ LOG.debug("Statistics: {}", statistics);
+ IOUtils.closeStream(statistics);
+ clearActiveBlock();
+ }
+ // All end of write operations, including deleting fake parent directories
+ writeOperationHelper.writeSuccessful();
+ }
+
+ /**
+ * Upload the current block as a single PUT request; if the buffer
+ * is empty a 0-byte PUT will be invoked, as it is needed to create an
+ * entry at the far end.
+ * @throws IOException any problem.
+ */
+ private void putObject() throws IOException {
+ LOG.debug("Executing regular upload for {}", writeOperationHelper);
+
+ final S3ADataBlocks.DataBlock block = getActiveBlock();
+ int size = block.dataSize();
+ final PutObjectRequest putObjectRequest =
+ writeOperationHelper.newPutRequest(
+ block.startUpload(),
+ size);
+ long transferQueueTime = now();
+ BlockUploadProgress callback =
+ new BlockUploadProgress(
+ block, progressListener, transferQueueTime);
+ putObjectRequest.setGeneralProgressListener(callback);
+ statistics.blockUploadQueued(size);
+ ListenableFuture<PutObjectResult> putObjectResult =
+ executorService.submit(new Callable<PutObjectResult>() {
+ @Override
+ public PutObjectResult call() throws Exception {
+ PutObjectResult result = fs.putObjectDirect(putObjectRequest);
+ block.close();
+ return result;
+ }
+ });
+ clearActiveBlock();
+ //wait for completion
+ try {
+ putObjectResult.get();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted object upload", ie);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException ee) {
+ throw extractException("regular upload", key, ee);
+ }
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "S3ABlockOutputStream{");
+ sb.append(writeOperationHelper.toString());
+ sb.append(", blockSize=").append(blockSize);
+ // unsynced access; risks consistency in exchange for no risk of deadlock.
+ S3ADataBlocks.DataBlock block = activeBlock;
+ if (block != null) {
+ sb.append(", activeBlock=").append(block);
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ private void incrementWriteOperations() {
+ fs.incrementWriteOperations();
+ }
+
+ /**
+ * Current time in milliseconds.
+ * @return time
+ */
+ private long now() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * Multiple partition upload.
+ */
+ private class MultiPartUpload {
+ private final String uploadId;
+ private final List<ListenableFuture<PartETag>> partETagsFutures;
+
+ public MultiPartUpload() throws IOException {
+ this.uploadId = writeOperationHelper.initiateMultiPartUpload();
+ this.partETagsFutures = new ArrayList<>(2);
+ LOG.debug("Initiated multi-part upload for {} with " +
+ "id '{}'", writeOperationHelper, uploadId);
+ }
+
+ /**
+ * Upload a block of data.
+ * This will take the block
+ * @param block block to upload
+ * @throws IOException upload failure
+ */
+ private void uploadBlockAsync(final S3ADataBlocks.DataBlock block)
+ throws IOException {
+ LOG.debug("Queueing upload of {}", block);
+ final int size = block.dataSize();
+ final InputStream uploadStream = block.startUpload();
+ final int currentPartNumber = partETagsFutures.size() + 1;
+ final UploadPartRequest request =
+ writeOperationHelper.newUploadPartRequest(
+ uploadId,
+ uploadStream,
+ currentPartNumber,
+ size);
+ long transferQueueTime = now();
+ BlockUploadProgress callback =
+ new BlockUploadProgress(
+ block, progressListener, transferQueueTime);
+ request.setGeneralProgressListener(callback);
+ statistics.blockUploadQueued(block.dataSize());
+ ListenableFuture<PartETag> partETagFuture =
+ executorService.submit(new Callable<PartETag>() {
+ @Override
+ public PartETag call() throws Exception {
+ // this is the queued upload operation
+ LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
+ uploadId);
+ // do the upload
+ PartETag partETag = fs.uploadPart(request).getPartETag();
+ LOG.debug("Completed upload of {}", block);
+ LOG.debug("Stream statistics of {}", statistics);
+
+ // close the block
+ block.close();
+ return partETag;
+ }
+ });
+ partETagsFutures.add(partETagFuture);
+ }
+
+ /**
+ * Block awaiting all outstanding uploads to complete.
+ * @return list of results
+ * @throws IOException IO Problems
+ */
+ private List<PartETag> waitForAllPartUploads() throws IOException {
+ LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size());
+ try {
+ return Futures.allAsList(partETagsFutures).get();
+ } catch (InterruptedException ie) {
+ LOG.warn("Interrupted partUpload", ie);
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (ExecutionException ee) {
+ //there is no way of recovering so abort
+ //cancel all partUploads
+ LOG.debug("While waiting for upload completion", ee);
+ LOG.debug("Cancelling futures");
+ for (ListenableFuture<PartETag> future : partETagsFutures) {
+ future.cancel(true);
+ }
+ //abort multipartupload
+ this.abort();
+ throw extractException("Multi-part upload with id '" + uploadId
+ + "' to " + key, key, ee);
+ }
+ }
+
+ /**
+ * This completes a multipart upload.
+ * Sometimes it fails; here retries are handled to avoid losing all data
+ * on a transient failure.
+ * @param partETags list of partial uploads
+ * @throws IOException on any problem
+ */
+ private CompleteMultipartUploadResult complete(List<PartETag> partETags)
+ throws IOException {
+ int retryCount = 0;
+ AmazonClientException lastException;
+ String operation =
+ String.format("Completing multi-part upload for key '%s'," +
+ " id '%s' with %s partitions ",
+ key, uploadId, partETags.size());
+ do {
+ try {
+ LOG.debug(operation);
+ return writeOperationHelper.completeMultipartUpload(
+ uploadId,
+ partETags);
+ } catch (AmazonClientException e) {
+ lastException = e;
+ statistics.exceptionInMultipartComplete();
+ }
+ } while (shouldRetry(operation, lastException, retryCount++));
+ // this point is only reached if the operation failed more than
+ // the allowed retry count
+ throw translateException(operation, key, lastException);
+ }
+
+ /**
+ * Abort a multi-part upload. Retries are attempted on failures.
+ * IOExceptions are caught; this is expected to be run as a cleanup process.
+ */
+ public void abort() {
+ int retryCount = 0;
+ AmazonClientException lastException;
+ fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
+ String operation =
+ String.format("Aborting multi-part upload for '%s', id '%s",
+ writeOperationHelper, uploadId);
+ do {
+ try {
+ LOG.debug(operation);
+ writeOperationHelper.abortMultipartUpload(uploadId);
+ return;
+ } catch (AmazonClientException e) {
+ lastException = e;
+ statistics.exceptionInMultipartAbort();
+ }
+ } while (shouldRetry(operation, lastException, retryCount++));
+ // this point is only reached if the operation failed more than
+ // the allowed retry count
+ LOG.warn("Unable to abort multipart upload, you may need to purge " +
+ "uploaded parts", lastException);
+ }
+
+ /**
+ * Predicate to determine whether a failed operation should
+ * be attempted again.
+ * If a retry is advised, the exception is automatically logged and
+ * the filesystem statistic {@link Statistic#IGNORED_ERRORS} incremented.
+ * The method then sleeps for the sleep time suggested by the sleep policy;
+ * if the sleep is interrupted then {@code Thread.interrupted()} is set
+ * to indicate the thread was interrupted; then false is returned.
+ *
+ * @param operation operation for log message
+ * @param e exception raised.
+ * @param retryCount number of retries already attempted
+ * @return true if another attempt should be made
+ */
+ private boolean shouldRetry(String operation,
+ AmazonClientException e,
+ int retryCount) {
+ try {
+ RetryPolicy.RetryAction retryAction =
+ retryPolicy.shouldRetry(e, retryCount, 0, true);
+ boolean retry = retryAction == RetryPolicy.RetryAction.RETRY;
+ if (retry) {
+ fs.incrementStatistic(IGNORED_ERRORS);
+ LOG.info("Retrying {} after exception ", operation, e);
+ Thread.sleep(retryAction.delayMillis);
+ }
+ return retry;
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (Exception ignored) {
+ return false;
+ }
+ }
+
+ }
+
+ /**
+ * The upload progress listener registered for events returned
+ * during the upload of a single block.
+ * It updates statistics and handles the end of the upload.
+ * Transfer failures are logged at WARN.
+ */
+ private final class BlockUploadProgress implements ProgressListener {
+ private final S3ADataBlocks.DataBlock block;
+ private final ProgressListener nextListener;
+ private final long transferQueueTime;
+ private long transferStartTime;
+
+ /**
+ * Track the progress of a single block upload.
+ * @param block block to monitor
+ * @param nextListener optional next progress listener
+ * @param transferQueueTime time the block was transferred
+ * into the queue
+ */
+ private BlockUploadProgress(S3ADataBlocks.DataBlock block,
+ ProgressListener nextListener,
+ long transferQueueTime) {
+ this.block = block;
+ this.transferQueueTime = transferQueueTime;
+ this.nextListener = nextListener;
+ }
+
+ @Override
+ public void progressChanged(ProgressEvent progressEvent) {
+ ProgressEventType eventType = progressEvent.getEventType();
+ long bytesTransferred = progressEvent.getBytesTransferred();
+
+ int size = block.dataSize();
+ switch (eventType) {
+
+ case REQUEST_BYTE_TRANSFER_EVENT:
+ // bytes uploaded
+ statistics.bytesTransferred(bytesTransferred);
+ break;
+
+ case TRANSFER_PART_STARTED_EVENT:
+ transferStartTime = now();
+ statistics.blockUploadStarted(transferStartTime - transferQueueTime,
+ size);
+ incrementWriteOperations();
+ break;
+
+ case TRANSFER_PART_COMPLETED_EVENT:
+ statistics.blockUploadCompleted(now() - transferStartTime, size);
+ break;
+
+ case TRANSFER_PART_FAILED_EVENT:
+ statistics.blockUploadFailed(now() - transferStartTime, size);
+ LOG.warn("Transfer failure of block {}", block);
+ break;
+
+ default:
+ // nothing
+ }
+
+ if (nextListener != null) {
+ nextListener.progressChanged(progressEvent);
+ }
+ }
+ }
+
+ /**
+ * Bridge from AWS {@code ProgressListener} to Hadoop {@link Progressable}.
+ */
+ private static class ProgressableListener implements ProgressListener {
+ private final Progressable progress;
+
+ public ProgressableListener(Progressable progress) {
+ this.progress = progress;
+ }
+
+ public void progressChanged(ProgressEvent progressEvent) {
+ if (progress != null) {
+ progress.progress();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
new file mode 100644
index 0000000..0fe2af7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
+
+/**
+ * Set of classes to support output streaming into blocks which are then
+ * uploaded as partitions.
+ */
+final class S3ADataBlocks {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3ADataBlocks.class);
+
+ private S3ADataBlocks() {
+ }
+
+ /**
+ * Validate args to a write command. These are the same validation checks
+ * expected for any implementation of {@code OutputStream.write()}.
+ * @param b byte array containing data
+ * @param off offset in array where to start
+ * @param len number of bytes to be written
+ * @throws NullPointerException for a null buffer
+ * @throws IndexOutOfBoundsException if indices are out of range
+ */
+ static void validateWriteArgs(byte[] b, int off, int len)
+ throws IOException {
+ Preconditions.checkNotNull(b);
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException(
+ "write (b[" + b.length + "], " + off + ", " + len + ')');
+ }
+ }
+
+ /**
+ * Create a factory.
+ * @param owner factory owner
+ * @param name factory name -the option from {@link Constants}.
+ * @return the factory, ready to be initialized.
+ * @throws IllegalArgumentException if the name is unknown.
+ */
+ static BlockFactory createFactory(S3AFileSystem owner,
+ String name) {
+ switch (name) {
+ case Constants.FAST_UPLOAD_BUFFER_ARRAY:
+ return new ArrayBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BUFFER_DISK:
+ return new DiskBlockFactory(owner);
+ case Constants.FAST_UPLOAD_BYTEBUFFER:
+ return new ByteBufferBlockFactory(owner);
+ default:
+ throw new IllegalArgumentException("Unsupported block buffer" +
+ " \"" + name + '"');
+ }
+ }
+
+ /**
+ * Base class for block factories.
+ */
+ static abstract class BlockFactory implements Closeable {
+
+ private final S3AFileSystem owner;
+
+ protected BlockFactory(S3AFileSystem owner) {
+ this.owner = owner;
+ }
+
+
+ /**
+ * Create a block.
+ * @param limit limit of the block.
+ * @return a new block.
+ */
+ abstract DataBlock create(int limit) throws IOException;
+
+ /**
+ * Implement any close/cleanup operation.
+ * Base class is a no-op
+ * @throws IOException -ideally, it shouldn't.
+ */
+ @Override
+ public void close() throws IOException {
+ }
+
+ /**
+ * Owner.
+ */
+ protected S3AFileSystem getOwner() {
+ return owner;
+ }
+ }
+
+ /**
+ * This represents a block being uploaded.
+ */
+ static abstract class DataBlock implements Closeable {
+
+ enum DestState {Writing, Upload, Closed}
+
+ private volatile DestState state = Writing;
+
+ /**
+ * Atomically enter a state, verifying current state.
+ * @param current current state. null means "no check"
+ * @param next next state
+ * @throws IllegalStateException if the current state is not as expected
+ */
+ protected synchronized final void enterState(DestState current,
+ DestState next)
+ throws IllegalStateException {
+ verifyState(current);
+ LOG.debug("{}: entering state {}", this, next);
+ state = next;
+ }
+
+ /**
+ * Verify that the block is in the declared state.
+ * @param expected expected state.
+ * @throws IllegalStateException if the DataBlock is in the wrong state
+ */
+ protected final void verifyState(DestState expected)
+ throws IllegalStateException {
+ if (expected != null && state != expected) {
+ throw new IllegalStateException("Expected stream state " + expected
+ + " -but actual state is " + state + " in " + this);
+ }
+ }
+
+ /**
+ * Current state.
+ * @return the current state.
+ */
+ final DestState getState() {
+ return state;
+ }
+
+ /**
+ * Return the current data size.
+ * @return the size of the data
+ */
+ abstract int dataSize();
+
+ /**
+ * Predicate to verify that the block has the capacity to write
+ * the given set of bytes.
+ * @param bytes number of bytes desired to be written.
+ * @return true if there is enough space.
+ */
+ abstract boolean hasCapacity(long bytes);
+
+ /**
+ * Predicate to check if there is data in the block.
+ * @return true if there is
+ */
+ boolean hasData() {
+ return dataSize() > 0;
+ }
+
+ /**
+ * The remaining capacity in the block before it is full.
+ * @return the number of bytes remaining.
+ */
+ abstract int remainingCapacity();
+
+ /**
+ * Write a series of bytes from the buffer, from the offset.
+ * Returns the number of bytes written.
+ * Only valid in the state {@code Writing}.
+ * Base class verifies the state but does no writing.
+ * @param buffer buffer
+ * @param offset offset
+ * @param length length of write
+ * @return number of bytes written
+ * @throws IOException trouble
+ */
+ int write(byte[] buffer, int offset, int length) throws IOException {
+ verifyState(Writing);
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(offset >= 0, "offset is negative");
+ Preconditions.checkArgument(
+ !(buffer.length - offset < length),
+ "buffer shorter than amount of data to write");
+ return 0;
+ }
+
+ /**
+ * Flush the output.
+ * Only valid in the state {@code Writing}.
+ * In the base class, this is a no-op
+ * @throws IOException any IO problem.
+ */
+ void flush() throws IOException {
+ verifyState(Writing);
+ }
+
+ /**
+ * Switch to the upload state and return a stream for uploading.
+ * Base class calls {@link #enterState(DestState, DestState)} to
+ * manage the state machine.
+ * @return the stream
+ * @throws IOException trouble
+ */
+ InputStream startUpload() throws IOException {
+ LOG.debug("Start datablock upload");
+ enterState(Writing, Upload);
+ return null;
+ }
+
+ /**
+ * Enter the closed state.
+ * @return true if the class was in any other state, implying that
+ * the subclass should do its close operations
+ */
+ protected synchronized boolean enterClosedState() {
+ if (!state.equals(Closed)) {
+ enterState(null, Closed);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (enterClosedState()) {
+ LOG.debug("Closed {}", this);
+ innerClose();
+ }
+ }
+
+ /**
+ * Inner close logic for subclasses to implement.
+ */
+ protected void innerClose() throws IOException {
+
+ }
+
+ }
+
+ // ====================================================================
+
+ /**
+ * Use byte arrays on the heap for storage.
+ */
+ static class ArrayBlockFactory extends BlockFactory {
+
+ ArrayBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ @Override
+ DataBlock create(int limit) throws IOException {
+ return new ByteArrayBlock(limit);
+ }
+
+ }
+
+ /**
+ * Stream to memory via a {@code ByteArrayOutputStream}.
+ *
+ * This was taken from {@code S3AFastOutputStream} and has the
+ * same problem which surfaced there: it can consume a lot of heap space
+ * proportional to the mismatch between writes to the stream and
+ * the JVM-wide upload bandwidth to the S3 endpoint.
+ * The memory consumption can be limited by tuning the filesystem settings
+ * to restrict the number of queued/active uploads.
+ */
+
+ static class ByteArrayBlock extends DataBlock {
+ private ByteArrayOutputStream buffer;
+ private final int limit;
+ // cache data size so that it is consistent after the buffer is reset.
+ private Integer dataSize;
+
+ ByteArrayBlock(int limit) {
+ this.limit = limit;
+ buffer = new ByteArrayOutputStream();
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ * @return the amount of data available to upload.
+ */
+ @Override
+ int dataSize() {
+ return dataSize != null ? dataSize : buffer.size();
+ }
+
+ @Override
+ InputStream startUpload() throws IOException {
+ super.startUpload();
+ dataSize = buffer.size();
+ ByteArrayInputStream bufferData = new ByteArrayInputStream(
+ buffer.toByteArray());
+ buffer = null;
+ return bufferData;
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override
+ int remainingCapacity() {
+ return limit - dataSize();
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ buffer.write(b, offset, written);
+ return written;
+ }
+
+ @Override
+ protected void innerClose() {
+ buffer = null;
+ }
+
+ @Override
+ public String toString() {
+ return "ByteArrayBlock{" +
+ "state=" + getState() +
+ ", limit=" + limit +
+ ", dataSize=" + dataSize +
+ '}';
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Stream via Direct ByteBuffers; these are allocated off heap
+ * via {@link DirectBufferPool}.
+ * This is actually the most complex of all the block factories,
+ * due to the need to explicitly recycle buffers; in comparison, the
+ * {@link DiskBlock} buffer delegates the work of deleting files to
+ * the {@link DiskBlock.FileDeletingInputStream}. Here the
+ * input stream {@link ByteBufferInputStream} has a similar task, along
+ * with the foundational work of streaming data from a byte array.
+ */
+
+ static class ByteBufferBlockFactory extends BlockFactory {
+
+ private final DirectBufferPool bufferPool = new DirectBufferPool();
+ private final AtomicInteger buffersOutstanding = new AtomicInteger(0);
+
+ ByteBufferBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ @Override
+ ByteBufferBlock create(int limit) throws IOException {
+ return new ByteBufferBlock(limit);
+ }
+
+ private ByteBuffer requestBuffer(int limit) {
+ LOG.debug("Requesting buffer of size {}", limit);
+ buffersOutstanding.incrementAndGet();
+ return bufferPool.getBuffer(limit);
+ }
+
+ private void releaseBuffer(ByteBuffer buffer) {
+ LOG.debug("Releasing buffer");
+ bufferPool.returnBuffer(buffer);
+ buffersOutstanding.decrementAndGet();
+ }
+
+ /**
+ * Get count of outstanding buffers.
+ * @return the current buffer count
+ */
+ public int getOutstandingBufferCount() {
+ return buffersOutstanding.get();
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlockFactory{"
+ + "buffersOutstanding=" + buffersOutstanding +
+ '}';
+ }
+
+ /**
+ * A DataBlock which requests a buffer from pool on creation; returns
+ * it when the output stream is closed.
+ */
+ class ByteBufferBlock extends DataBlock {
+ private ByteBuffer buffer;
+ private final int bufferSize;
+ // cache data size so that it is consistent after the buffer is reset.
+ private Integer dataSize;
+
+ /**
+ * Instantiate. This will request a ByteBuffer of the desired size.
+ * @param bufferSize buffer size
+ */
+ ByteBufferBlock(int bufferSize) {
+ this.bufferSize = bufferSize;
+ buffer = requestBuffer(bufferSize);
+ }
+
+ /**
+ * Get the amount of data; if there is no buffer then the size is 0.
+ * @return the amount of data available to upload.
+ */
+ @Override
+ int dataSize() {
+ return dataSize != null ? dataSize : bufferCapacityUsed();
+ }
+
+ @Override
+ ByteBufferInputStream startUpload() throws IOException {
+ super.startUpload();
+ dataSize = bufferCapacityUsed();
+ // set the buffer up from reading from the beginning
+ buffer.limit(buffer.position());
+ buffer.position(0);
+ return new ByteBufferInputStream(dataSize, buffer);
+ }
+
+ @Override
+ public boolean hasCapacity(long bytes) {
+ return bytes <= remainingCapacity();
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return buffer != null ? buffer.remaining() : 0;
+ }
+
+ private int bufferCapacityUsed() {
+ return buffer.capacity() - buffer.remaining();
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ buffer.put(b, offset, written);
+ return written;
+ }
+
+ @Override
+ protected void innerClose() {
+ buffer = null;
+ }
+
+ @Override
+ public String toString() {
+ return "ByteBufferBlock{"
+ + "state=" + getState() +
+ ", dataSize=" + dataSize() +
+ ", limit=" + bufferSize +
+ ", remainingCapacity=" + remainingCapacity() +
+ '}';
+ }
+
+ }
+
+ /**
+ * Provide an input stream from a byte buffer; supporting
+ * {@link #mark(int)}, which is required to enable replay of failed
+ * PUT attempts.
+ * This input stream returns the buffer to the pool afterwards.
+ */
+ class ByteBufferInputStream extends InputStream {
+
+ private final int size;
+ private ByteBuffer byteBuffer;
+
+ ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
+ LOG.debug("Creating ByteBufferInputStream of size {}", size);
+ this.size = size;
+ this.byteBuffer = byteBuffer;
+ }
+
+ /**
+ * Return the buffer to the pool after the stream is closed.
+ */
+ @Override
+ public synchronized void close() {
+ if (byteBuffer != null) {
+ LOG.debug("releasing buffer");
+ releaseBuffer(byteBuffer);
+ byteBuffer = null;
+ }
+ }
+
+ /**
+ * Verify that the stream is open.
+ * @throws IOException if the stream is closed
+ */
+ private void verifyOpen() throws IOException {
+ if (byteBuffer == null) {
+ throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+
+ public synchronized int read() throws IOException {
+ if (available() > 0) {
+ return byteBuffer.get() & 0xFF;
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public synchronized long skip(long offset) throws IOException {
+ verifyOpen();
+ long newPos = position() + offset;
+ if (newPos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+ }
+ if (newPos > size) {
+ throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+ }
+ byteBuffer.position((int) newPos);
+ return newPos;
+ }
+
+ @Override
+ public synchronized int available() {
+ Preconditions.checkState(byteBuffer != null,
+ FSExceptionMessages.STREAM_IS_CLOSED);
+ return byteBuffer.remaining();
+ }
+
+ /**
+ * Get the current buffer position.
+ * @return the buffer position
+ */
+ public synchronized int position() {
+ return byteBuffer.position();
+ }
+
+ /**
+ * Check if there is data left.
+ * @return true if there is data remaining in the buffer.
+ */
+ public synchronized boolean hasRemaining() {
+ return byteBuffer.hasRemaining();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ LOG.debug("mark at {}", position());
+ byteBuffer.mark();
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ LOG.debug("reset");
+ byteBuffer.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ /**
+ * Read in data.
+ * @param buffer destination buffer
+ * @param offset offset within the buffer
+ * @param length length of bytes to read
+ * @throws EOFException if the position is negative
+ * @throws IndexOutOfBoundsException if there isn't space for the
+ * amount of data requested.
+ * @throws IllegalArgumentException other arguments are invalid.
+ */
+ @SuppressWarnings("NullableProblems")
+ public synchronized int read(byte[] buffer, int offset, int length)
+ throws IOException {
+ Preconditions.checkArgument(length >= 0, "length is negative");
+ Preconditions.checkArgument(buffer != null, "Null buffer");
+ if (buffer.length - offset < length) {
+ throw new IndexOutOfBoundsException(
+ FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+ + ": request length =" + length
+ + ", with offset =" + offset
+ + "; buffer capacity =" + (buffer.length - offset));
+ }
+ verifyOpen();
+ if (!hasRemaining()) {
+ return -1;
+ }
+
+ int toRead = Math.min(length, available());
+ byteBuffer.get(buffer, offset, toRead);
+ return toRead;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "ByteBufferInputStream{");
+ sb.append("size=").append(size);
+ ByteBuffer buffer = this.byteBuffer;
+ if (buffer != null) {
+ sb.append(", available=").append(buffer.remaining());
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+ }
+
+ // ====================================================================
+
+ /**
+ * Buffer blocks to disk.
+ */
+ static class DiskBlockFactory extends BlockFactory {
+
+ DiskBlockFactory(S3AFileSystem owner) {
+ super(owner);
+ }
+
+ /**
+ * Create a temp file and a block which writes to it.
+ * @param limit limit of the block.
+ * @return the new block
+ * @throws IOException IO problems
+ */
+ @Override
+ DataBlock create(int limit) throws IOException {
+ File destFile = getOwner()
+ .createTmpFileForWrite("s3ablock", limit, getOwner().getConf());
+ return new DiskBlock(destFile, limit);
+ }
+ }
+
+ /**
+ * Stream to a file.
+ * This will stop at the limit; the caller is expected to create a new block
+ */
+ static class DiskBlock extends DataBlock {
+
+ private int bytesWritten;
+ private final File bufferFile;
+ private final int limit;
+ private BufferedOutputStream out;
+ private InputStream uploadStream;
+
+ DiskBlock(File bufferFile, int limit)
+ throws FileNotFoundException {
+ this.limit = limit;
+ this.bufferFile = bufferFile;
+ out = new BufferedOutputStream(new FileOutputStream(bufferFile));
+ }
+
+ @Override
+ int dataSize() {
+ return bytesWritten;
+ }
+
+ @Override
+ boolean hasCapacity(long bytes) {
+ return dataSize() + bytes <= limit;
+ }
+
+ @Override
+ int remainingCapacity() {
+ return limit - bytesWritten;
+ }
+
+ @Override
+ int write(byte[] b, int offset, int len) throws IOException {
+ super.write(b, offset, len);
+ int written = Math.min(remainingCapacity(), len);
+ out.write(b, offset, written);
+ bytesWritten += written;
+ return written;
+ }
+
+ @Override
+ InputStream startUpload() throws IOException {
+ super.startUpload();
+ try {
+ out.flush();
+ } finally {
+ out.close();
+ out = null;
+ }
+ uploadStream = new FileInputStream(bufferFile);
+ return new FileDeletingInputStream(uploadStream);
+ }
+
+ /**
+ * The close operation will delete the destination file if it still
+ * exists.
+ * @throws IOException IO problems
+ */
+ @Override
+ protected void innerClose() throws IOException {
+ final DestState state = getState();
+ LOG.debug("Closing {}", this);
+ switch (state) {
+ case Writing:
+ if (bufferFile.exists()) {
+ // file was not uploaded
+ LOG.debug("Deleting buffer file as upload did not start");
+ boolean deleted = bufferFile.delete();
+ if (!deleted && bufferFile.exists()) {
+ LOG.warn("Failed to delete buffer file {}", bufferFile);
+ }
+ }
+ break;
+
+ case Upload:
+ LOG.debug("Buffer file {} exists \u2014close upload stream", bufferFile);
+ break;
+
+ case Closed:
+ // no-op
+ break;
+
+ default:
+ // this state can never be reached, but checkstyle complains, so
+ // it is here.
+ }
+ }
+
+ /**
+ * Flush operation will flush to disk.
+ * @throws IOException IOE raised on FileOutputStream
+ */
+ @Override
+ void flush() throws IOException {
+ super.flush();
+ out.flush();
+ }
+
+ @Override
+ public String toString() {
+ String sb = "FileBlock{"
+ + "destFile=" + bufferFile +
+ ", state=" + getState() +
+ ", dataSize=" + dataSize() +
+ ", limit=" + limit +
+ '}';
+ return sb;
+ }
+
+ /**
+ * An input stream which deletes the buffer file when closed.
+ */
+ private final class FileDeletingInputStream extends FilterInputStream {
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ FileDeletingInputStream(InputStream source) {
+ super(source);
+ }
+
+ /**
+ * Delete the input file when closed.
+ * @throws IOException IO problem
+ */
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ if (!closed.getAndSet(true)) {
+ if (!bufferFile.delete()) {
+ LOG.warn("delete({}) returned false",
+ bufferFile.getAbsoluteFile());
+ }
+ }
+ }
+ }
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org