You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2016/10/18 20:16:48 UTC
[02/12] hadoop git commit: HADOOP-13560. S3ABlockOutputStream to
support huge (many GB) file writes. Contributed by Steve Loughran
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 67972ca..166fd73 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1,3 +1,4 @@
+
<!---
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -881,40 +882,361 @@ Seoul
If the wrong endpoint is used, the request may fail. This may be reported as a 301/redirect error,
or as a 400 Bad Request.
-### S3AFastOutputStream
- **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
- <property>
- <name>fs.s3a.fast.upload</name>
- <value>false</value>
- <description>Upload directly from memory instead of buffering to
- disk first. Memory usage and parallelism can be controlled as up to
- fs.s3a.multipart.size memory is consumed for each (part)upload actively
- uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
- </property>
- <property>
- <name>fs.s3a.fast.buffer.size</name>
- <value>1048576</value>
- <description>Size (in bytes) of initial memory buffer allocated for an
- upload. No effect if fs.s3a.fast.upload is false.</description>
- </property>
+### <a name="s3a_fast_upload"></a>Stabilizing: S3A Fast Upload
+
+
+**New in Hadoop 2.7; significantly enhanced in Hadoop 2.9**
+
+
+Because of the nature of the S3 object store, data written to an S3A `OutputStream`
+is not written incrementally \u2014instead, by default, it is buffered to disk
+until the stream is closed in its `close()` method.
+
+This can make output slow:
+
+* The execution time for `OutputStream.close()` is proportional to the amount of data
+buffered and inversely proportional to the bandwidth. That is `O(data/bandwidth)`.
+* The bandwidth is that available from the host to S3: other work in the same
+process, server or network at the time of upload may increase the upload time,
+hence the duration of the `close()` call.
+* If a process uploading data fails before `OutputStream.close()` is called,
+all data is lost.
+* The disks hosting temporary directories defined in `fs.s3a.buffer.dir` must
+have the capacity to store the entire buffered file.
+
+Put succinctly: the further the process is from the S3 endpoint, or the smaller
+the EC-hosted VM is, the longer it will take work to complete.
+
+This can create problems in application code:
+
+* Code often assumes that the `close()` call is fast;
+ the delays can create bottlenecks in operations.
+* Very slow uploads sometimes cause applications to time out. (generally,
+threads blocking during the upload stop reporting progress, so trigger timeouts)
+* Streaming very large amounts of data may consume all disk space before the upload begins.
+
+
+Work to addess this began in Hadoop 2.7 with the `S3AFastOutputStream`
+[HADOOP-11183](https://issues.apache.org/jira/browse/HADOOP-11183), and
+has continued with ` S3ABlockOutputStream`
+[HADOOP-13560](https://issues.apache.org/jira/browse/HADOOP-13560).
+
+
+This adds an alternative output stream, "S3a Fast Upload" which:
+
+1. Always uploads large files as blocks with the size set by
+ `fs.s3a.multipart.size`. That is: the threshold at which multipart uploads
+ begin and the size of each upload are identical.
+1. Buffers blocks to disk (default) or in on-heap or off-heap memory.
+1. Uploads blocks in parallel in background threads.
+1. Begins uploading blocks as soon as the buffered data exceeds this partition
+ size.
+1. When buffering data to disk, uses the directory/directories listed in
+ `fs.s3a.buffer.dir`. The size of data which can be buffered is limited
+ to the available disk space.
+1. Generates output statistics as metrics on the filesystem, including
+ statistics of active and pending block uploads.
+1. Has the time to `close()` set by the amount of remaning data to upload, rather
+ than the total size of the file.
+
+With incremental writes of blocks, "S3A fast upload" offers an upload
+time at least as fast as the "classic" mechanism, with significant benefits
+on long-lived output streams, and when very large amounts of data are generated.
+The in memory buffering mechanims may also offer speedup when running adjacent to
+S3 endpoints, as disks are not used for intermediate data storage.
+
+
+```xml
+<property>
+ <name>fs.s3a.fast.upload</name>
+ <value>true</value>
+ <description>
+ Use the incremental block upload mechanism with
+ the buffering mechanism set in fs.s3a.fast.upload.buffer.
+ The number of threads performing uploads in the filesystem is defined
+ by fs.s3a.threads.max; the queue of waiting uploads limited by
+ fs.s3a.max.total.tasks.
+ The size of each buffer is set by fs.s3a.multipart.size.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.fast.upload.buffer</name>
+ <value>disk</value>
+ <description>
+ The buffering mechanism to use when using S3A fast upload
+ (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
+ This configuration option has no effect if fs.s3a.fast.upload is false.
+
+ "disk" will use the directories listed in fs.s3a.buffer.dir as
+ the location(s) to save data prior to being uploaded.
+
+ "array" uses arrays in the JVM heap
+
+ "bytebuffer" uses off-heap memory within the JVM.
+
+ Both "array" and "bytebuffer" will consume memory in a single stream up to the number
+ of blocks set by:
+
+ fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
+
+ If using either of these mechanisms, keep this value low
+
+ The total number of threads performing work across all threads is set by
+ fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
+ work items.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.multipart.size</name>
+ <value>104857600</value>
+ <description>
+ How big (in bytes) to split upload or copy operations up into.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.fast.upload.active.blocks</name>
+ <value>8</value>
+ <description>
+ Maximum Number of blocks a single output stream can have
+ active (uploading, or queued to the central FileSystem
+ instance's pool of queued operations.
+
+ This stops a single stream overloading the shared thread pool.
+ </description>
+</property>
+```
+
+**Notes**
+
+* If the amount of data written to a stream is below that set in `fs.s3a.multipart.size`,
+the upload is performed in the `OutputStream.close()` operation \u2014as with
+the original output stream.
+
+* The published Hadoop metrics monitor include live queue length and
+upload operation counts, so identifying when there is a backlog of work/
+a mismatch between data generation rates and network bandwidth. Per-stream
+statistics can also be logged by calling `toString()` on the current stream.
+
+* Incremental writes are not visible; the object can only be listed
+or read when the multipart operation completes in the `close()` call, which
+will block until the upload is completed.
+
+
+#### <a name="s3a_fast_upload_disk"></a>Fast Upload with Disk Buffers `fs.s3a.fast.upload.buffer=disk`
+
+When `fs.s3a.fast.upload.buffer` is set to `disk`, all data is buffered
+to local hard disks prior to upload. This minimizes the amount of memory
+consumed, and so eliminates heap size as the limiting factor in queued uploads
+\u2014exactly as the original "direct to disk" buffering used when
+`fs.s3a.fast.upload=false`.
+
+
+```xml
+<property>
+ <name>fs.s3a.fast.upload</name>
+ <value>true</value>
+</property>
+
+<property>
+ <name>fs.s3a.fast.upload.buffer</name>
+ <value>disk</value>
+</property>
+
+```
+
+
+#### <a name="s3a_fast_upload_bytebuffer"></a>Fast Upload with ByteBuffers: `fs.s3a.fast.upload.buffer=bytebuffer`
+
+When `fs.s3a.fast.upload.buffer` is set to `bytebuffer`, all data is buffered
+in "Direct" ByteBuffers prior to upload. This *may* be faster than buffering to disk,
+and, if disk space is small (for example, tiny EC2 VMs), there may not
+be much disk space to buffer with.
+
+The ByteBuffers are created in the memory of the JVM, but not in the Java Heap itself.
+The amount of data which can be buffered is
+limited by the Java runtime, the operating system, and, for YARN applications,
+the amount of memory requested for each container.
+
+The slower the write bandwidth to S3, the greater the risk of running out
+of memory \u2014and so the more care is needed in
+[tuning the upload settings](#s3a_fast_upload_thread_tuning).
+
+
+```xml
+<property>
+ <name>fs.s3a.fast.upload</name>
+ <value>true</value>
+</property>
+
+<property>
+ <name>fs.s3a.fast.upload.buffer</name>
+ <value>bytebuffer</value>
+</property>
+```
+
+#### <a name="s3a_fast_upload_array"></a>Fast Upload with Arrays: `fs.s3a.fast.upload.buffer=array`
+
+When `fs.s3a.fast.upload.buffer` is set to `array`, all data is buffered
+in byte arrays in the JVM's heap prior to upload.
+This *may* be faster than buffering to disk.
+
+This `array` option is similar to the in-memory-only stream offered in
+Hadoop 2.7 with `fs.s3a.fast.upload=true`
+
+The amount of data which can be buffered is limited by the available
+size of the JVM heap heap. The slower the write bandwidth to S3, the greater
+the risk of heap overflows. This risk can be mitigated by
+[tuning the upload settings](#s3a_fast_upload_thread_tuning).
+
+```xml
+<property>
+ <name>fs.s3a.fast.upload</name>
+ <value>true</value>
+</property>
+
+<property>
+ <name>fs.s3a.fast.upload.buffer</name>
+ <value>array</value>
+</property>
+
+```
+#### <a name="s3a_fast_upload_thread_tuning"></a>S3A Fast Upload Thread Tuning
+
+Both the [Array](#s3a_fast_upload_array) and [Byte buffer](#s3a_fast_upload_bytebuffer)
+buffer mechanisms can consume very large amounts of memory, on-heap or
+off-heap respectively. The [disk buffer](#s3a_fast_upload_disk) mechanism
+does not use much memory up, but will consume hard disk capacity.
+
+If there are many output streams being written to in a single process, the
+amount of memory or disk used is the multiple of all stream's active memory/disk use.
+
+Careful tuning may be needed to reduce the risk of running out memory, especially
+if the data is buffered in memory.
+
+There are a number parameters which can be tuned:
+
+1. The total number of threads available in the filesystem for data
+uploads *or any other queued filesystem operation*. This is set in
+`fs.s3a.threads.max`
+
+1. The number of operations which can be queued for execution:, *awaiting
+a thread*: `fs.s3a.max.total.tasks`
+
+1. The number of blocks which a single output stream can have active,
+that is: being uploaded by a thread, or queued in the filesystem thread queue:
+`fs.s3a.fast.upload.active.blocks`
+
+1. How long an idle thread can stay in the thread pool before it is retired: `fs.s3a.threads.keepalivetime`
+
+
+When the maximum allowed number of active blocks of a single stream is reached,
+no more blocks can be uploaded from that stream until one or more of those active
+blocks' uploads completes. That is: a `write()` call which would trigger an upload
+of a now full datablock, will instead block until there is capacity in the queue.
+
+How does that come together?
+
+* As the pool of threads set in `fs.s3a.threads.max` is shared (and intended
+to be used across all threads), a larger number here can allow for more
+parallel operations. However, as uploads require network bandwidth, adding more
+threads does not guarantee speedup.
+
+* The extra queue of tasks for the thread pool (`fs.s3a.max.total.tasks`)
+covers all ongoing background S3A operations (future plans include: parallelized
+rename operations, asynchronous directory operations).
+
+* When using memory buffering, a small value of `fs.s3a.fast.upload.active.blocks`
+limits the amount of memory which can be consumed per stream.
+
+* When using disk buffering a larger value of `fs.s3a.fast.upload.active.blocks`
+does not consume much memory. But it may result in a large number of blocks to
+compete with other filesystem operations.
+
+
+We recommend a low value of `fs.s3a.fast.upload.active.blocks`; enough
+to start background upload without overloading other parts of the system,
+then experiment to see if higher values deliver more throughtput \u2014especially
+from VMs running on EC2.
+
+```xml
+
+<property>
+ <name>fs.s3a.fast.upload.active.blocks</name>
+ <value>4</value>
+ <description>
+ Maximum Number of blocks a single output stream can have
+ active (uploading, or queued to the central FileSystem
+ instance's pool of queued operations.
+
+ This stops a single stream overloading the shared thread pool.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.threads.max</name>
+ <value>10</value>
+ <description>The total number of threads available in the filesystem for data
+ uploads *or any other queued filesystem operation*.</description>
+</property>
+
+<property>
+ <name>fs.s3a.max.total.tasks</name>
+ <value>5</value>
+ <description>The number of operations which can be queued for execution</description>
+</property>
+
+<property>
+ <name>fs.s3a.threads.keepalivetime</name>
+ <value>60</value>
+ <description>Number of seconds a thread can be idle before being
+ terminated.</description>
+</property>
+
+```
+
+
+#### <a name="s3a_multipart_purge"></a>Cleaning up After Incremental Upload Failures: `fs.s3a.multipart.purge`
+
+
+If an incremental streaming operation is interrupted, there may be
+intermediate partitions uploaded to S3 \u2014data which will be billed for.
+
+These charges can be reduced by enabling `fs.s3a.multipart.purge`,
+and setting a purge time in seconds, such as 86400 seconds \u201424 hours.
+When an S3A FileSystem instance is instantiated with the purge time greater
+than zero, it will, on startup, delete all outstanding partition requests
+older than this time.
+
+```xml
+<property>
+ <name>fs.s3a.multipart.purge</name>
+ <value>true</value>
+ <description>True if you want to purge existing multipart uploads that may not have been
+ completed/aborted correctly</description>
+</property>
+
+<property>
+ <name>fs.s3a.multipart.purge.age</name>
+ <value>86400</value>
+ <description>Minimum age in seconds of multipart uploads to purge</description>
+</property>
+```
+
+If an S3A client is instantited with `fs.s3a.multipart.purge=true`,
+it will delete all out of date uploads *in the entire bucket*. That is: it will affect all
+multipart uploads to that bucket, from all applications.
-Writes are buffered in memory instead of to a file on local disk. This
-removes the throughput bottleneck of the local disk write and read cycle
-before starting the actual upload. Furthermore, it allows handling files that
-are larger than the remaining local disk space.
-
-However, non-trivial memory tuning is needed for optimal results and careless
-settings could cause memory overflow. Up to `fs.s3a.threads.max` parallel
-(part)uploads are active. Furthermore, up to `fs.s3a.max.total.tasks`
-additional part(uploads) can be waiting (and thus memory buffers are created).
-The memory buffer is uploaded as a single upload if it is not larger than
-`fs.s3a.multipart.threshold`. Else, a multi-part upload is initiated and
-parts of size `fs.s3a.multipart.size` are used to protect against overflowing
-the available memory. These settings should be tuned to the envisioned
-workflow (some large files, many small ones, ...) and the physical
-limitations of the machine and cluster (memory, network bandwidth).
+Leaving `fs.s3a.multipart.purge` to its default, `false`,
+means that the client will not make any attempt to reset or change the partition
+rate.
+
+The best practise for using this option is to disable multipart purges in
+normal use of S3A, enabling only in manual/scheduled housekeeping operations.
### S3A Experimental "fadvise" input policy support
@@ -1250,7 +1572,143 @@ can be used:
Using the explicit endpoint for the region is recommended for speed and the
ability to use the V4 signing API.
-## Visible S3 Inconsistency
+
+### "Timeout waiting for connection from pool" when writing to S3A
+
+This happens when using the Block output stream, `fs.s3a.fast.upload=true` and
+the thread pool runs out of capacity.
+
+```
+[s3a-transfer-shared-pool1-t20] INFO http.AmazonHttpClient (AmazonHttpClient.java:executeHelper(496)) - Unable to execute HTTP request: Timeout waiting for connection from poolorg.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
+ at org.apache.http.impl.conn.PoolingClientConnectionManager.leaseConnection(PoolingClientConnectionManager.java:230)
+ at org.apache.http.impl.conn.PoolingClientConnectionManager$1.getConnection(PoolingClientConnectionManager.java:199)
+ at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
+ at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
+ at java.lang.reflect.Method.invoke(Method.java:498)
+ at com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
+ at com.amazonaws.http.conn.$Proxy10.getConnection(Unknown Source)
+ at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:424)
+ at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
+ at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
+ at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
+ at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:728)
+ at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
+ at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
+ at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
+ at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:2921)
+ at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:2906)
+ at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1025)
+ at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload$1.call(S3ABlockOutputStream.java:360)
+ at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload$1.call(S3ABlockOutputStream.java:355)
+ at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239)
+ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
+ at java.lang.Thread.run(Thread.java:745)
+```
+
+Make sure that `fs.s3a.connection.maximum` is at least larger
+than `fs.s3a.threads.max`.
+
+```xml
+<property>
+ <name>fs.s3a.threads.max</name>
+ <value>20</value>
+</property>
+
+<property>
+ <name>fs.s3a.connection.maximum</name>
+ <value>30</value>
+</property>
+```
+
+### "Timeout waiting for connection from pool" when reading from S3A
+
+This happens when more threads are trying to read from an S3A system than
+the maximum number of allocated HTTP connections.
+
+Set `fs.s3a.connection.maximum` to a larger value (and at least as large as
+`fs.s3a.threads.max`)
+
+### Out of heap memory when writing to S3A via Fast Upload
+
+This can happen when using the fast upload mechanism (`fs.s3a.fast.upload=true`)
+and in-memory buffering (either `fs.s3a.fast.upload.buffer=array` or
+`fs.s3a.fast.upload.buffer=bytebuffer`).
+
+More data is being generated than in the JVM than it can upload to S3 \u2014and
+so much data has been buffered that the JVM has run out of memory.
+
+Consult [S3A Fast Upload Thread Tuning](#s3a_fast_upload_thread_tuning) for
+detail on this issue and options to address it. Consider also buffering to
+disk, rather than memory.
+
+
+### When writing to S3A: "java.io.FileNotFoundException: Completing multi-part upload"
+
+
+```
+java.io.FileNotFoundException: Completing multi-part upload on fork-5/test/multipart/1c397ca6-9dfb-4ac1-9cf7-db666673246b: com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; Request ID: 84FF8057174D9369), S3 Extended Request ID: Ij5Yn6Eq/qIERH4Z6Io3YL2t9/qNZ7z9gjPb1FrTtTovZ8k1MXqh+zCYYjqmfJ/fCY6E1+JR9jA=
+ at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1182)
+ at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:770)
+ at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
+ at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
+ at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
+ at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:2705)
+ at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.complete(S3ABlockOutputStream.java:473)
+ at org.apache.hadoop.fs.s3a.S3ABlockOutputStream$MultiPartUpload.access$200(S3ABlockOutputStream.java:382)
+ at org.apache.hadoop.fs.s3a.S3ABlockOutputStream.close(S3ABlockOutputStream.java:272)
+ at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
+ at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
+```
+
+This surfaces if, while a multipart upload was taking place, all outstanding multipart
+uploads were garbage collected. The upload operation cannot complete because
+the data uploaded has been deleted.
+
+Consult [Cleaning up After Incremental Upload Failures](#s3a_multipart_purge) for
+details on how the multipart purge timeout can be set. If multipart uploads
+are failing with the message above, it may be a sign that this value is too low.
+
+### When writing to S3A, HTTP Exceptions logged at info from `AmazonHttpClient`
+
+```
+[s3a-transfer-shared-pool4-t6] INFO http.AmazonHttpClient (AmazonHttpClient.java:executeHelper(496)) - Unable to execute HTTP request: hwdev-steve-ireland-new.s3.amazonaws.com:443 failed to respond
+org.apache.http.NoHttpResponseException: bucket.s3.amazonaws.com:443 failed to respond
+ at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:143)
+ at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
+ at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:261)
+ at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
+ at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:259)
+ at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:209)
+ at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:272)
+ at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66)
+ at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:124)
+ at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:686)
+ at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:488)
+ at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:884)
+ at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
+ at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:55)
+ at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:728)
+ at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
+ at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
+ at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
+ at com.amazonaws.services.s3.AmazonS3Client.copyPart(AmazonS3Client.java:1731)
+ at com.amazonaws.services.s3.transfer.internal.CopyPartCallable.call(CopyPartCallable.java:41)
+ at com.amazonaws.services.s3.transfer.internal.CopyPartCallable.call(CopyPartCallable.java:28)
+ at org.apache.hadoop.fs.s3a.BlockingThreadPoolExecutorService$CallableWithPermitRelease.call(BlockingThreadPoolExecutorService.java:239)
+ at java.util.concurrent.FutureTask.run(FutureTask.java:266)
+ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
+ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
+ at java.lang.Thread.run(Thread.java:745)
+```
+
+These are HTTP I/O exceptions caught and logged inside the AWS SDK. The client
+will attempt to retry the operation; it may just be a transient event. If there
+are many such exceptions in logs, it may be a symptom of connectivity or network
+problems.
+
+### Visible S3 Inconsistency
Amazon S3 is *an eventually consistent object store*. That is: not a filesystem.
@@ -1627,7 +2085,7 @@ tests or the `it.test` property for integration tests.
mvn clean test -Dtest=TestS3AInputPolicies
- mvn clean verify -Dit.test=ITestS3AFileContextStatistics
+ mvn clean verify -Dit.test=ITestS3AFileContextStatistics -Dtest=none
mvn clean verify -Dtest=TestS3A* -Dit.test=ITestS3A*
@@ -1677,7 +2135,7 @@ An alternate endpoint may be defined in `test.fs.s3a.sts.endpoint`.
The default is ""; meaning "use the amazon default value".
-#### CSV Data source Tests
+### CSV Data source Tests
The `TestS3AInputStreamPerformance` tests require read access to a multi-MB
text file. The default file for these tests is one published by amazon,
@@ -1724,18 +2182,89 @@ endpoint:
<value>s3.amazonaws.com</value>
</property>
```
+### Viewing Integration Test Reports
-#### Scale test operation count
+
+Integration test results and logs are stored in `target/failsafe-reports/`.
+An HTML report can be generated during site generation, or with the `surefire-report`
+plugin:
+
+```
+mvn surefire-report:failsafe-report-only
+```
+### Scale Tests
+
+There are a set of tests designed to measure the scalability and performance
+at scale of the S3A tests, *Scale Tests*. Tests include: creating
+and traversing directory trees, uploading large files, renaming them,
+deleting them, seeking through the files, performing random IO, and others.
+This makes them a foundational part of the benchmarking.
+
+By their very nature they are slow. And, as their execution time is often
+limited by bandwidth between the computer running the tests and the S3 endpoint,
+parallel execution does not speed these tests up.
+
+#### Enabling the Scale Tests
+
+The tests are enabled if the `scale` property is set in the maven build
+this can be done regardless of whether or not the parallel test profile
+is used
+
+```bash
+mvn verify -Dscale
+
+mvn verify -Dparallel-tests -Dscale -DtestsThreadCount=8
+```
+
+The most bandwidth intensive tests (those which upload data) always run
+sequentially; those which are slow due to HTTPS setup costs or server-side
+actionsare included in the set of parallelized tests.
+
+
+#### Maven build tuning options
+
+
+Some of the tests can be tuned from the maven build or from the
+configuration file used to run the tests.
+
+```bash
+mvn verify -Dscale -Dfs.s3a.scale.test.huge.filesize=128M
+```
+
+The algorithm is
+
+1. The value is queried from the configuration file, using a default value if
+it is not set.
+1. The value is queried from the JVM System Properties, where it is passed
+down by maven.
+1. If the system property is null, empty, or it has the value `unset`, then
+the configuration value is used. The `unset` option is used to
+[work round a quirk in maven property propagation](http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven).
+
+Only a few properties can be set this way; more will be added.
+
+| Property | Meaninging |
+|-----------|-------------|
+| `fs.s3a.scale.test.timeout`| Timeout in seconds for scale tests |
+| `fs.s3a.scale.test.huge.filesize`| Size for huge file uploads |
+| `fs.s3a.scale.test.huge.huge.partitionsize`| Size for partitions in huge file uploads |
+
+The file and partition sizes are numeric values with a k/m/g/t/p suffix depending
+on the desired size. For example: 128M, 128m, 2G, 2G, 4T or even 1P.
+
+#### Scale test configuration options
Some scale tests perform multiple operations (such as creating many directories).
The exact number of operations to perform is configurable in the option
`scale.test.operation.count`
- <property>
- <name>scale.test.operation.count</name>
- <value>10</value>
- </property>
+```xml
+<property>
+ <name>scale.test.operation.count</name>
+ <value>10</value>
+</property>
+```
Larger values generate more load, and are recommended when testing locally,
or in batch runs.
@@ -1748,19 +2277,64 @@ the width and depth of tests creating recursive directories. Larger
values create exponentially more directories, with consequent performance
impact.
- <property>
- <name>scale.test.directory.count</name>
- <value>2</value>
- </property>
+```xml
+<property>
+ <name>scale.test.directory.count</name>
+ <value>2</value>
+</property>
+```
DistCp tests targeting S3A support a configurable file size. The default is
10 MB, but the configuration value is expressed in KB so that it can be tuned
smaller to achieve faster test runs.
- <property>
- <name>scale.test.distcp.file.size.kb</name>
- <value>10240</value>
- </property>
+```xml
+<property>
+ <name>scale.test.distcp.file.size.kb</name>
+ <value>10240</value>
+</property>
+```
+
+S3A specific scale test properties are
+
+##### `fs.s3a.scale.test.huge.filesize`: size in MB for "Huge file tests".
+
+The Huge File tests validate S3A's ability to handle large files \u2014the property
+`fs.s3a.scale.test.huge.filesize` declares the file size to use.
+
+```xml
+<property>
+ <name>fs.s3a.scale.test.huge.filesize</name>
+ <value>200M</value>
+</property>
+```
+
+Amazon S3 handles files larger than 5GB differently than smaller ones.
+Setting the huge filesize to a number greater than that) validates support
+for huge files.
+
+```xml
+<property>
+ <name>fs.s3a.scale.test.huge.filesize</name>
+ <value>6G</value>
+</property>
+```
+
+Tests at this scale are slow: they are best executed from hosts running in
+the cloud infrastructure where the S3 endpoint is based.
+Otherwise, set a large timeout in `fs.s3a.scale.test.timeout`
+
+```xml
+<property>
+ <name>fs.s3a.scale.test.timeout</name>
+ <value>432000</value>
+</property>
+```
+
+
+The tests are executed in an order to only clean up created files after
+the end of all the tests. If the tests are interrupted, the test data will remain.
+
### Testing against non AWS S3 endpoints.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index 28278fe..9e14ed2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -18,24 +18,26 @@
package org.apache.hadoop.fs.contract.s3a;
-import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
-import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
/**
* Contract test suite covering S3A integration with DistCp.
+ * Uses the block output stream, buffered to disk. This is the
+ * recommended output mechanism for DistCP due to its scalability.
*/
public class ITestS3AContractDistCp extends AbstractContractDistCpTest {
- private static final long MULTIPART_SETTING = 8 * 1024 * 1024; // 8 MB
+ private static final long MULTIPART_SETTING = MULTIPART_MIN_SIZE;
@Override
protected Configuration createConfiguration() {
Configuration newConf = super.createConfiguration();
- newConf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_SETTING);
newConf.setLong(MULTIPART_SIZE, MULTIPART_SETTING);
+ newConf.setBoolean(FAST_UPLOAD, true);
+ newConf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_DISK);
return newConf;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
index b7973b3..e049fd1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
@@ -48,6 +48,7 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
@Override
public void teardown() throws Exception {
super.teardown();
+ describe("closing file system");
IOUtils.closeStream(getFileSystem());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
index b0b8a65..b1b8240 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,18 +20,23 @@ package org.apache.hadoop.fs.s3a;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.util.StopWatch;
-import org.junit.*;
+
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
/**
- * Basic unit test for S3A's blocking executor service.
+ * Basic test for S3A's blocking executor service.
*/
public class ITestBlockingThreadPoolExecutorService {
@@ -47,7 +52,10 @@ public class ITestBlockingThreadPoolExecutorService {
private static final Integer SOME_VALUE = 1337;
- private static BlockingThreadPoolExecutorService tpe = null;
+ private static BlockingThreadPoolExecutorService tpe;
+
+ @Rule
+ public Timeout testTimeout = new Timeout(60 * 1000);
@AfterClass
public static void afterClass() throws Exception {
@@ -71,13 +79,23 @@ public class ITestBlockingThreadPoolExecutorService {
@Test
public void testSubmitRunnable() throws Exception {
ensureCreated();
- int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
+ verifyQueueSize(tpe, NUM_ACTIVE_TASKS + NUM_WAITING_TASKS);
+ }
+
+ /**
+ * Verify the size of the executor's queue, by verifying that the first
+ * submission to block is {@code expectedQueueSize + 1}.
+ * @param executorService executor service to test
+ * @param expectedQueueSize size of queue
+ */
+ protected void verifyQueueSize(ExecutorService executorService,
+ int expectedQueueSize) {
StopWatch stopWatch = new StopWatch().start();
- for (int i = 0; i < totalTasks; i++) {
- tpe.submit(sleeper);
+ for (int i = 0; i < expectedQueueSize; i++) {
+ executorService.submit(sleeper);
assertDidntBlock(stopWatch);
}
- tpe.submit(sleeper);
+ executorService.submit(sleeper);
assertDidBlock(stopWatch);
}
@@ -93,6 +111,15 @@ public class ITestBlockingThreadPoolExecutorService {
ensureDestroyed();
}
+ @Test
+ public void testChainedQueue() throws Throwable {
+ ensureCreated();
+ int size = 2;
+ ExecutorService wrapper = new SemaphoredDelegatingExecutor(tpe,
+ size, true);
+ verifyQueueSize(wrapper, size);
+ }
+
// Helper functions, etc.
private void assertDidntBlock(StopWatch sw) {
@@ -141,8 +168,9 @@ public class ITestBlockingThreadPoolExecutorService {
private static void ensureCreated() throws Exception {
if (tpe == null) {
LOG.debug("Creating thread pool");
- tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
- NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
+ tpe = BlockingThreadPoolExecutorService.newInstance(
+ NUM_ACTIVE_TASKS, NUM_WAITING_TASKS,
+ 1, TimeUnit.SECONDS, "btpetest");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
new file mode 100644
index 0000000..74cad00
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Tests small file upload functionality for
+ * {@link S3ABlockOutputStream} with the blocks buffered in byte arrays.
+ *
+ * File sizes are kept small to reduce test duration on slow connections;
+ * multipart tests are kept in scale tests.
+ */
+public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
+ conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
+ conf.setBoolean(Constants.FAST_UPLOAD, true);
+ conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
+ return conf;
+ }
+
+ protected String getBlockOutputBufferName() {
+ return FAST_UPLOAD_BUFFER_ARRAY;
+ }
+
+ @Test
+ public void testZeroByteUpload() throws IOException {
+ verifyUpload("0", 0);
+ }
+
+ @Test
+ public void testRegularUpload() throws IOException {
+ verifyUpload("regular", 1024);
+ }
+
+ @Test(expected = IOException.class)
+ public void testDoubleStreamClose() throws Throwable {
+ Path dest = path("testDoubleStreamClose");
+ describe(" testDoubleStreamClose");
+ FSDataOutputStream stream = getFileSystem().create(dest, true);
+ byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+ try {
+ stream.write(data);
+ stream.close();
+ stream.write(data);
+ } finally {
+ IOUtils.closeStream(stream);
+ }
+ }
+
+ public void verifyUpload(String name, int fileSize) throws IOException {
+ Path dest = path(name);
+ describe(name + " upload to " + dest);
+ ContractTestUtils.createAndVerifyFile(
+ getFileSystem(),
+ dest,
+ fileSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
new file mode 100644
index 0000000..504426b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
+ */
+public class ITestS3ABlockOutputByteBuffer extends ITestS3ABlockOutputArray {
+
+ protected String getBlockOutputBufferName() {
+ return Constants.FAST_UPLOAD_BYTEBUFFER;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
new file mode 100644
index 0000000..550706d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering.
+ */
+public class ITestS3ABlockOutputDisk extends ITestS3ABlockOutputArray {
+
+ protected String getBlockOutputBufferName() {
+ return Constants.FAST_UPLOAD_BUFFER_DISK;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
index 4444d0c..991135e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
@@ -72,6 +72,8 @@ public class ITestS3ABlockingThreadPool {
@Test
public void testFastMultiPartUpload() throws Exception {
conf.setBoolean(Constants.FAST_UPLOAD, true);
+ conf.set(Constants.FAST_UPLOAD_BUFFER,
+ Constants.FAST_UPLOAD_BYTEBUFFER);
fs = S3ATestUtils.createTestFileSystem(conf);
ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
1024);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index b08bfe9..30d4bf6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@@ -35,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
@@ -417,6 +419,33 @@ public class ITestS3AConfiguration {
fs.close();
}
+ @Test
+ public void testDirectoryAllocatorDefval() throws Throwable {
+ conf = new Configuration();
+ conf.unset(Constants.BUFFER_DIR);
+ fs = S3ATestUtils.createTestFileSystem(conf);
+ File tmp = fs.createTmpFileForWrite("out-", 1024, conf);
+ assertTrue("not found: " + tmp, tmp.exists());
+ tmp.delete();
+ }
+
+ @Test
+ public void testDirectoryAllocatorRR() throws Throwable {
+ File dir1 = GenericTestUtils.getRandomizedTestDir();
+ File dir2 = GenericTestUtils.getRandomizedTestDir();
+ dir1.mkdirs();
+ dir2.mkdirs();
+ conf = new Configuration();
+ conf.set(Constants.BUFFER_DIR, dir1 +", " + dir2);
+ fs = S3ATestUtils.createTestFileSystem(conf);
+ File tmp1 = fs.createTmpFileForWrite("out-", 1024, conf);
+ tmp1.delete();
+ File tmp2 = fs.createTmpFileForWrite("out-", 1024, conf);
+ tmp2.delete();
+ assertNotEquals("round robin not working",
+ tmp1.getParent(), tmp2.getParent());
+ }
+
/**
* Reads and returns a field from an object using reflection. If the field
* cannot be found, is null, or is not the expected type, then this method
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java
new file mode 100644
index 0000000..5239f30
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the encryption tests against the block output stream.
+ */
+public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption {
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ conf.setBoolean(Constants.FAST_UPLOAD, true);
+ conf.set(Constants.FAST_UPLOAD_BUFFER,
+ Constants.FAST_UPLOAD_BYTEBUFFER);
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java
deleted file mode 100644
index c06fed1..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Run the encryption tests against the Fast output stream.
- * This verifies that both file writing paths can encrypt their data.
- */
-public class ITestS3AEncryptionFastOutputStream extends ITestS3AEncryption {
-
- @Override
- protected Configuration createConfiguration() {
- Configuration conf = super.createConfiguration();
- conf.setBoolean(Constants.FAST_UPLOAD, true);
- return conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java
deleted file mode 100644
index b5fa1c3..0000000
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-
-/**
- * Tests regular and multi-part upload functionality for S3AFastOutputStream.
- * File sizes are kept small to reduce test duration on slow connections
- */
-public class ITestS3AFastOutputStream {
- private FileSystem fs;
-
-
- @Rule
- public Timeout testTimeout = new Timeout(30 * 60 * 1000);
-
- @Before
- public void setUp() throws Exception {
- Configuration conf = new Configuration();
- conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
- conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
- conf.setBoolean(Constants.FAST_UPLOAD, true);
- fs = S3ATestUtils.createTestFileSystem(conf);
- }
-
- @After
- public void tearDown() throws Exception {
- if (fs != null) {
- fs.delete(getTestPath(), true);
- }
- }
-
- protected Path getTestPath() {
- return new Path("/tests3a");
- }
-
- @Test
- public void testRegularUpload() throws IOException {
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
- }
-
- @Test
- public void testMultiPartUpload() throws IOException {
- ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 *
- 1024);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java
new file mode 100644
index 0000000..88204b2
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATestUtils.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
+/**
+ * Test the test utils. Why an integration test? it's needed to
+ * verify property pushdown.
+ */
+public class ITestS3ATestUtils extends Assert {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestS3ATestUtils.class);
+ public static final String KEY = "undefined.property";
+
+ @Before
+ public void clear() {
+ System.clearProperty(KEY);
+ }
+
+ @Test
+ public void testGetTestProperty() throws Throwable {
+ Configuration conf = new Configuration(false);
+ assertEquals("a", getTestProperty(conf, KEY, "a"));
+ conf.set(KEY, "\t b \n");
+ assertEquals("b", getTestProperty(conf, KEY, "a"));
+ System.setProperty(KEY, "c");
+ assertEquals("c", getTestProperty(conf, KEY, "a"));
+ unsetSysprop();
+ assertEquals("b", getTestProperty(conf, KEY, "a"));
+ }
+
+ @Test
+ public void testGetTestPropertyLong() throws Throwable {
+ Configuration conf = new Configuration(false);
+ assertEquals(1, getTestPropertyLong(conf, KEY, 1));
+ conf.setInt(KEY, 2);
+ assertEquals(2, getTestPropertyLong(conf, KEY, 1));
+ System.setProperty(KEY, "3");
+ assertEquals(3, getTestPropertyLong(conf, KEY, 1));
+ }
+
+ @Test
+ public void testGetTestPropertyInt() throws Throwable {
+ Configuration conf = new Configuration(false);
+ assertEquals(1, getTestPropertyInt(conf, KEY, 1));
+ conf.setInt(KEY, 2);
+ assertEquals(2, getTestPropertyInt(conf, KEY, 1));
+ System.setProperty(KEY, "3");
+ assertEquals(3, getTestPropertyInt(conf, KEY, 1));
+ conf.unset(KEY);
+ assertEquals(3, getTestPropertyInt(conf, KEY, 1));
+ unsetSysprop();
+ assertEquals(5, getTestPropertyInt(conf, KEY, 5));
+ }
+
+ @Test
+ public void testGetTestPropertyBool() throws Throwable {
+ Configuration conf = new Configuration(false);
+ assertTrue(getTestPropertyBool(conf, KEY, true));
+ conf.set(KEY, "\tfalse \n");
+ assertFalse(getTestPropertyBool(conf, KEY, true));
+ System.setProperty(KEY, "true");
+ assertTrue(getTestPropertyBool(conf, KEY, true));
+ unsetSysprop();
+ assertEquals("false", getTestProperty(conf, KEY, "true"));
+ conf.unset(KEY);
+ assertTrue(getTestPropertyBool(conf, KEY, true));
+ }
+
+ protected void unsetSysprop() {
+ System.setProperty(KEY, UNSET_PROPERTY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 6a4e68c..6894bb0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -44,14 +44,35 @@ public interface S3ATestConstants {
String TEST_FS_S3A_NAME = TEST_FS_S3A + "name";
/**
+ * Run the encryption tests?
+ */
+ String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled";
+
+ /**
+ * Tell tests that they are being executed in parallel: {@value}.
+ */
+ String KEY_PARALLEL_TEST_EXECUTION = "test.parallel.execution";
+
+ /**
+ * A property set to true in maven if scale tests are enabled: {@value}.
+ */
+ String KEY_SCALE_TESTS_ENABLED = S3A_SCALE_TEST + "enabled";
+
+ /**
* The number of operations to perform: {@value}.
*/
String KEY_OPERATION_COUNT = SCALE_TEST + "operation.count";
/**
+ * The number of directory operations to perform: {@value}.
+ */
+ String KEY_DIRECTORY_COUNT = SCALE_TEST + "directory.count";
+
+ /**
* The readahead buffer: {@value}.
*/
String KEY_READ_BUFFER_SIZE = S3A_SCALE_TEST + "read.buffer.size";
+
int DEFAULT_READ_BUFFER_SIZE = 16384;
/**
@@ -65,12 +86,62 @@ public interface S3ATestConstants {
String DEFAULT_CSVTEST_FILE = "s3a://landsat-pds/scene_list.gz";
/**
+ * Endpoint for the S3 CSV/scale tests. This defaults to
+ * being us-east.
+ */
+ String KEY_CSVTEST_ENDPOINT = S3A_SCALE_TEST + "csvfile.endpoint";
+
+ /**
+ * Endpoint for the S3 CSV/scale tests. This defaults to
+ * being us-east.
+ */
+ String DEFAULT_CSVTEST_ENDPOINT = "s3.amazonaws.com";
+
+ /**
+ * Name of the property to define the timeout for scale tests: {@value}.
+ * Measured in seconds.
+ */
+ String KEY_TEST_TIMEOUT = S3A_SCALE_TEST + "timeout";
+
+ /**
+ * Name of the property to define the file size for the huge file
+ * tests: {@value}.
+ * Measured in KB; a suffix like "M", or "G" will change the unit.
+ */
+ String KEY_HUGE_FILESIZE = S3A_SCALE_TEST + "huge.filesize";
+
+ /**
+ * Name of the property to define the partition size for the huge file
+ * tests: {@value}.
+ * Measured in KB; a suffix like "M", or "G" will change the unit.
+ */
+ String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize";
+
+ /**
+ * The default huge size is small \u2014full 5GB+ scale tests are something
+ * to run in long test runs on EC2 VMs. {@value}.
+ */
+ String DEFAULT_HUGE_FILESIZE = "10M";
+
+ /**
* The default number of operations to perform: {@value}.
*/
long DEFAULT_OPERATION_COUNT = 2005;
/**
- * Run the encryption tests?
+ * Default number of directories to create when performing
+ * directory performance/scale tests.
*/
- String KEY_ENCRYPTION_TESTS = TEST_FS_S3A + "encryption.enabled";
+ int DEFAULT_DIRECTORY_COUNT = 2;
+
+ /**
+ * Default scale test timeout in seconds: {@value}.
+ */
+ int DEFAULT_TEST_TIMEOUT = 30 * 60;
+
+ /**
+ * Default policy on scale tests: {@value}.
+ */
+ boolean DEFAULT_SCALE_TESTS_ENABLED = false;
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 95f6d4b..c67e118 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -40,6 +40,12 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
public class S3ATestUtils {
/**
+ * Value to set a system property to (in maven) to declare that
+ * a property has been unset.
+ */
+ public static final String UNSET_PROPERTY = "unset";
+
+ /**
* Create the test filesystem.
*
* If the test.fs.s3a.name property is not set, this will
@@ -53,8 +59,25 @@ public class S3ATestUtils {
*/
public static S3AFileSystem createTestFileSystem(Configuration conf)
throws IOException {
- String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
+ return createTestFileSystem(conf, true);
+ }
+ /**
+ * Create the test filesystem with or without multipart purging
+ *
+ * If the test.fs.s3a.name property is not set, this will
+ * trigger a JUnit failure.
+ * @param conf configuration
+ * @param purge flag to enable Multipart purging
+ * @return the FS
+ * @throws IOException IO Problems
+ * @throws AssumptionViolatedException if the FS is not named
+ */
+ public static S3AFileSystem createTestFileSystem(Configuration conf,
+ boolean purge)
+ throws IOException {
+
+ String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
boolean liveTest = !StringUtils.isEmpty(fsname);
URI testURI = null;
@@ -70,8 +93,12 @@ public class S3ATestUtils {
}
S3AFileSystem fs1 = new S3AFileSystem();
//enable purging in tests
- conf.setBoolean(PURGE_EXISTING_MULTIPART, true);
- conf.setInt(PURGE_EXISTING_MULTIPART_AGE, 0);
+ if (purge) {
+ conf.setBoolean(PURGE_EXISTING_MULTIPART, true);
+ // but a long delay so that parallel multipart tests don't
+ // suddenly start timing out
+ conf.setInt(PURGE_EXISTING_MULTIPART_AGE, 30 * 60);
+ }
fs1.initialize(testURI, conf);
return fs1;
}
@@ -149,6 +176,121 @@ public class S3ATestUtils {
}
/**
+ * Get a long test property.
+ * <ol>
+ * <li>Look up configuration value (which can pick up core-default.xml),
+ * using {@code defVal} as the default value (if conf != null).
+ * </li>
+ * <li>Fetch the system property.</li>
+ * <li>If the system property is not empty or "(unset)":
+ * it overrides the conf value.
+ * </li>
+ * </ol>
+ * This puts the build properties in charge of everything. It's not a
+ * perfect design; having maven set properties based on a file, as ant let
+ * you do, is better for customization.
+ *
+ * As to why there's a special (unset) value, see
+ * {@link http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven}
+ * @param conf config: may be null
+ * @param key key to look up
+ * @param defVal default value
+ * @return the evaluated test property.
+ */
+ public static long getTestPropertyLong(Configuration conf,
+ String key, long defVal) {
+ return Long.valueOf(
+ getTestProperty(conf, key, Long.toString(defVal)));
+ }
+ /**
+ * Get a test property value in bytes, using k, m, g, t, p, e suffixes.
+ * {@link org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix#string2long(String)}
+ * <ol>
+ * <li>Look up configuration value (which can pick up core-default.xml),
+ * using {@code defVal} as the default value (if conf != null).
+ * </li>
+ * <li>Fetch the system property.</li>
+ * <li>If the system property is not empty or "(unset)":
+ * it overrides the conf value.
+ * </li>
+ * </ol>
+ * This puts the build properties in charge of everything. It's not a
+ * perfect design; having maven set properties based on a file, as ant let
+ * you do, is better for customization.
+ *
+ * As to why there's a special (unset) value, see
+ * {@link http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven}
+ * @param conf config: may be null
+ * @param key key to look up
+ * @param defVal default value
+ * @return the evaluated test property.
+ */
+ public static long getTestPropertyBytes(Configuration conf,
+ String key, String defVal) {
+ return org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix
+ .string2long(getTestProperty(conf, key, defVal));
+ }
+
+ /**
+ * Get an integer test property; algorithm described in
+ * {@link #getTestPropertyLong(Configuration, String, long)}.
+ * @param key key to look up
+ * @param defVal default value
+ * @return the evaluated test property.
+ */
+ public static int getTestPropertyInt(Configuration conf,
+ String key, int defVal) {
+ return (int) getTestPropertyLong(conf, key, defVal);
+ }
+
+ /**
+ * Get a boolean test property; algorithm described in
+ * {@link #getTestPropertyLong(Configuration, String, long)}.
+ * @param key key to look up
+ * @param defVal default value
+ * @return the evaluated test property.
+ */
+ public static boolean getTestPropertyBool(Configuration conf,
+ String key,
+ boolean defVal) {
+ return Boolean.valueOf(
+ getTestProperty(conf, key, Boolean.toString(defVal)));
+ }
+
+ /**
+ * Get a string test property.
+ * <ol>
+ * <li>Look up configuration value (which can pick up core-default.xml),
+ * using {@code defVal} as the default value (if conf != null).
+ * </li>
+ * <li>Fetch the system property.</li>
+ * <li>If the system property is not empty or "(unset)":
+ * it overrides the conf value.
+ * </li>
+ * </ol>
+ * This puts the build properties in charge of everything. It's not a
+ * perfect design; having maven set properties based on a file, as ant let
+ * you do, is better for customization.
+ *
+ * As to why there's a special (unset) value, see
+ * @see <a href="http://stackoverflow.com/questions/7773134/null-versus-empty-arguments-in-maven">
+ * Stack Overflow</a>
+ * @param conf config: may be null
+ * @param key key to look up
+ * @param defVal default value
+ * @return the evaluated test property.
+ */
+
+ public static String getTestProperty(Configuration conf,
+ String key,
+ String defVal) {
+ String confVal = conf != null ? conf.getTrimmed(key, defVal) : defVal;
+ String propval = System.getProperty(key);
+ return StringUtils.isNotEmpty(propval) && !UNSET_PROPERTY.equals(propval)
+ ? propval : confVal;
+ }
+
+ /**
* The exception to raise so as to exit fast from
* {@link #eventually(int, Callable)}.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
new file mode 100644
index 0000000..9fa95fd
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Unit tests for {@link S3ADataBlocks}.
+ */
+public class TestDataBlocks extends Assert {
+
+ @Rule
+ public Timeout testTimeout = new Timeout(30 * 1000);
+
+ @Before
+ public void nameThread() {
+ Thread.currentThread().setName("JUnit");
+ }
+
+ /**
+ * Test the {@link S3ADataBlocks.ByteBufferBlockFactory}.
+ * That code implements an input stream over a ByteBuffer, and has to
+ * return the buffer to the pool after the read complete.
+ *
+ * This test verifies the basic contract of the process.
+ */
+ @Test
+ public void testByteBufferIO() throws Throwable {
+ try (S3ADataBlocks.ByteBufferBlockFactory factory =
+ new S3ADataBlocks.ByteBufferBlockFactory(null)) {
+ int limit = 128;
+ S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
+ = factory.create(limit);
+ assertEquals("outstanding buffers in " + factory,
+ 1, factory.getOutstandingBufferCount());
+
+ byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
+ int bufferLen = buffer.length;
+ block.write(buffer, 0, bufferLen);
+ assertEquals(bufferLen, block.dataSize());
+ assertEquals("capacity in " + block,
+ limit - bufferLen, block.remainingCapacity());
+ assertTrue("hasCapacity(64) in " + block, block.hasCapacity(64));
+ assertTrue("No capacity in " + block,
+ block.hasCapacity(limit - bufferLen));
+
+ // now start the write
+ S3ADataBlocks.ByteBufferBlockFactory.ByteBufferInputStream
+ stream = block.startUpload();
+ assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());
+ int expected = bufferLen;
+ assertEquals("wrong available() in " + stream,
+ expected, stream.available());
+
+ assertEquals('t', stream.read());
+ expected--;
+ assertEquals("wrong available() in " + stream,
+ expected, stream.available());
+
+ // close the block. The buffer must remain outstanding here;
+ // the stream manages the lifecycle of it now
+ block.close();
+ assertEquals("outstanding buffers in " + factory,
+ 1, factory.getOutstandingBufferCount());
+ block.close();
+
+ // read into a byte array with an offset
+ int offset = 5;
+ byte[] in = new byte[limit];
+ assertEquals(2, stream.read(in, offset, 2));
+ assertEquals('e', in[offset]);
+ assertEquals('s', in[offset + 1]);
+ expected -= 2;
+ assertEquals("wrong available() in " + stream,
+ expected, stream.available());
+
+ // read to end
+ byte[] remainder = new byte[limit];
+ int c;
+ int index = 0;
+ while ((c = stream.read()) >= 0) {
+ remainder[index++] = (byte) c;
+ }
+ assertEquals(expected, index);
+ assertEquals('a', remainder[--index]);
+
+ assertEquals("wrong available() in " + stream,
+ 0, stream.available());
+ assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
+
+ // when the stream is closed, the data should be returned
+ stream.close();
+ assertEquals("outstanding buffers in " + factory,
+ 0, factory.getOutstandingBufferCount());
+ stream.close();
+ assertEquals("outstanding buffers in " + factory,
+ 0, factory.getOutstandingBufferCount());
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bc176961/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 5e88aba..e1aef75 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -34,6 +34,7 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
fc = S3ATestUtils.createTestFileContext(conf);
fc.mkdir(fileContextTestHelper.getTestRootPath(fc, "test"),
FileContext.DEFAULT_PERM, true);
+ FileContext.clearStatistics();
}
@After
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org