You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/02/25 16:11:32 UTC

[2/2] hadoop git commit: HADOOP-14028. S3A BlockOutputStreams doesn't delete temporary files in multipart uploads or handle part upload failures. Contributed by Steve Loughran.

HADOOP-14028. S3A BlockOutputStreams doesn't delete temporary files in multipart uploads or handle part upload failures.
Contributed by Steve Loughran.

(cherry picked from commit 29fe5af017b945d8750c074ca39031b5b777eddd)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dab00da1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dab00da1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dab00da1

Branch: refs/heads/trunk
Commit: dab00da19f25619ccc71c7f803a235b21766bf1e
Parents: 120bef7
Author: Steve Loughran <st...@apache.org>
Authored: Sat Feb 25 15:35:19 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Sat Feb 25 15:35:19 2017 +0000

----------------------------------------------------------------------
 .../hadoop/fs/s3a/S3ABlockOutputStream.java     |  68 ++-
 .../org/apache/hadoop/fs/s3a/S3ADataBlocks.java | 514 +++++++++++--------
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  78 ++-
 .../hadoop/fs/s3a/S3AInstrumentation.java       |  40 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java |  26 +
 .../hadoop/fs/s3a/ITestS3ABlockOutputArray.java |  76 ++-
 .../fs/s3a/ITestS3ABlockOutputByteBuffer.java   |   5 +-
 .../hadoop/fs/s3a/ITestS3ABlockOutputDisk.java  |  12 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  |  13 +
 .../apache/hadoop/fs/s3a/TestDataBlocks.java    |  45 +-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java |  28 +-
 11 files changed, 639 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 89b9b29..1b0929b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.fs.s3a;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.util.Progressable;
@@ -178,10 +176,10 @@ class S3ABlockOutputStream extends OutputStream {
     if (activeBlock == null) {
       blockCount++;
       if (blockCount>= Constants.MAX_MULTIPART_COUNT) {
-        LOG.error("Number of partitions in stream exceeds limit for S3: " +
+        LOG.error("Number of partitions in stream exceeds limit for S3: "
              + Constants.MAX_MULTIPART_COUNT +  " write may fail.");
       }
-      activeBlock = blockFactory.create(this.blockSize);
+      activeBlock = blockFactory.create(blockCount, this.blockSize, statistics);
     }
     return activeBlock;
   }
@@ -206,7 +204,9 @@ class S3ABlockOutputStream extends OutputStream {
    * Clear the active block.
    */
   private void clearActiveBlock() {
-    LOG.debug("Clearing active block");
+    if (activeBlock != null) {
+      LOG.debug("Clearing active block");
+    }
     synchronized (this) {
       activeBlock = null;
     }
@@ -356,11 +356,9 @@ class S3ABlockOutputStream extends OutputStream {
       writeOperationHelper.writeFailed(ioe);
       throw ioe;
     } finally {
-      LOG.debug("Closing block and factory");
-      IOUtils.closeStream(block);
-      IOUtils.closeStream(blockFactory);
+      closeAll(LOG, block, blockFactory);
       LOG.debug("Statistics: {}", statistics);
-      IOUtils.closeStream(statistics);
+      closeAll(LOG, statistics);
       clearActiveBlock();
     }
     // All end of write operations, including deleting fake parent directories
@@ -378,10 +376,10 @@ class S3ABlockOutputStream extends OutputStream {
 
     final S3ADataBlocks.DataBlock block = getActiveBlock();
     int size = block.dataSize();
-    final PutObjectRequest putObjectRequest =
-        writeOperationHelper.newPutRequest(
-            block.startUpload(),
-            size);
+    final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
+    final PutObjectRequest putObjectRequest = uploadData.hasFile() ?
+        writeOperationHelper.newPutRequest(uploadData.getFile())
+        : writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
     fs.setOptionalPutRequestParameters(putObjectRequest);
     long transferQueueTime = now();
     BlockUploadProgress callback =
@@ -393,8 +391,14 @@ class S3ABlockOutputStream extends OutputStream {
         executorService.submit(new Callable<PutObjectResult>() {
           @Override
           public PutObjectResult call() throws Exception {
-            PutObjectResult result = fs.putObjectDirect(putObjectRequest);
-            block.close();
+            PutObjectResult result;
+            try {
+              // the putObject call automatically closes the input
+              // stream afterwards.
+              result = writeOperationHelper.putObject(putObjectRequest);
+            } finally {
+              closeAll(LOG, uploadData, block);
+            }
             return result;
           }
         });
@@ -438,13 +442,21 @@ class S3ABlockOutputStream extends OutputStream {
   }
 
   /**
+   * Get the statistics for this stream.
+   * @return stream statistics
+   */
+  S3AInstrumentation.OutputStreamStatistics getStatistics() {
+    return statistics;
+  }
+
+  /**
    * Multiple partition upload.
    */
   private class MultiPartUpload {
     private final String uploadId;
     private final List<ListenableFuture<PartETag>> partETagsFutures;
 
-    public MultiPartUpload() throws IOException {
+    MultiPartUpload() throws IOException {
       this.uploadId = writeOperationHelper.initiateMultiPartUpload();
       this.partETagsFutures = new ArrayList<>(2);
       LOG.debug("Initiated multi-part upload for {} with " +
@@ -461,14 +473,16 @@ class S3ABlockOutputStream extends OutputStream {
         throws IOException {
       LOG.debug("Queueing upload of {}", block);
       final int size = block.dataSize();
-      final InputStream uploadStream = block.startUpload();
+      final S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
       final int currentPartNumber = partETagsFutures.size() + 1;
       final UploadPartRequest request =
           writeOperationHelper.newUploadPartRequest(
               uploadId,
-              uploadStream,
               currentPartNumber,
-              size);
+              size,
+              uploadData.getUploadStream(),
+              uploadData.getFile());
+
       long transferQueueTime = now();
       BlockUploadProgress callback =
           new BlockUploadProgress(
@@ -483,12 +497,16 @@ class S3ABlockOutputStream extends OutputStream {
               LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
                   uploadId);
               // do the upload
-              PartETag partETag = fs.uploadPart(request).getPartETag();
-              LOG.debug("Completed upload of {}", block);
-              LOG.debug("Stream statistics of {}", statistics);
-
-              // close the block
-              block.close();
+              PartETag partETag;
+              try {
+                partETag = fs.uploadPart(request).getPartETag();
+                LOG.debug("Completed upload of {} to part {}", block,
+                    partETag.getETag());
+                LOG.debug("Stream statistics of {}", statistics);
+              } finally {
+                // close the stream and block
+                closeAll(LOG, uploadData, block);
+              }
               return partETag;
             }
           });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
index 05f8efe..9bc8dcd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ADataBlocks.java
@@ -24,10 +24,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -42,10 +40,11 @@ import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.util.DirectBufferPool;
 
 import static org.apache.hadoop.fs.s3a.S3ADataBlocks.DataBlock.DestState.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.closeAll;
 
 /**
  * Set of classes to support output streaming into blocks which are then
- * uploaded as partitions.
+ * uploaded as to S3 as a single PUT, or as part of a multipart request.
  */
 final class S3ADataBlocks {
 
@@ -97,6 +96,70 @@ final class S3ADataBlocks {
   }
 
   /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  static final class BlockUploadData implements Closeable {
+    private final File file;
+    private final InputStream uploadStream;
+
+    /**
+     * File constructor; input stream will be null.
+     * @param file file to upload
+     */
+    BlockUploadData(File file) {
+      Preconditions.checkArgument(file.exists(), "No file: " + file);
+      this.file = file;
+      this.uploadStream = null;
+    }
+
+    /**
+     * Stream constructor, file field will be null.
+     * @param uploadStream stream to upload
+     */
+    BlockUploadData(InputStream uploadStream) {
+      Preconditions.checkNotNull(uploadStream, "rawUploadStream");
+      this.uploadStream = uploadStream;
+      this.file = null;
+    }
+
+    /**
+     * Predicate: does this instance contain a file reference.
+     * @return true if there is a file.
+     */
+    boolean hasFile() {
+      return file != null;
+    }
+
+    /**
+     * Get the file, if there is one.
+     * @return the file for uploading, or null.
+     */
+    File getFile() {
+      return file;
+    }
+
+    /**
+     * Get the raw upload stream, if the object was
+     * created with one.
+     * @return the upload stream or null.
+     */
+    InputStream getUploadStream() {
+      return uploadStream;
+    }
+
+    /**
+     * Close: closes any upload stream provided in the constructor.
+     * @throws IOException inherited exception
+     */
+    @Override
+    public void close() throws IOException {
+      closeAll(LOG, uploadStream);
+    }
+  }
+
+  /**
    * Base class for block factories.
    */
   static abstract class BlockFactory implements Closeable {
@@ -110,15 +173,21 @@ final class S3ADataBlocks {
 
     /**
      * Create a block.
+     *
+     * @param index index of block
      * @param limit limit of the block.
+     * @param statistics stats to work with
      * @return a new block.
      */
-    abstract DataBlock create(int limit) throws IOException;
+    abstract DataBlock create(long index, int limit,
+        S3AInstrumentation.OutputStreamStatistics statistics)
+        throws IOException;
 
     /**
      * Implement any close/cleanup operation.
      * Base class is a no-op
-     * @throws IOException -ideally, it shouldn't.
+     * @throws IOException Inherited exception; implementations should
+     * avoid raising it.
      */
     @Override
     public void close() throws IOException {
@@ -140,6 +209,14 @@ final class S3ADataBlocks {
     enum DestState {Writing, Upload, Closed}
 
     private volatile DestState state = Writing;
+    protected final long index;
+    protected final S3AInstrumentation.OutputStreamStatistics statistics;
+
+    protected DataBlock(long index,
+        S3AInstrumentation.OutputStreamStatistics statistics) {
+      this.index = index;
+      this.statistics = statistics;
+    }
 
     /**
      * Atomically enter a state, verifying current state.
@@ -243,8 +320,8 @@ final class S3ADataBlocks {
      * @return the stream
      * @throws IOException trouble
      */
-    InputStream startUpload() throws IOException {
-      LOG.debug("Start datablock upload");
+    BlockUploadData startUpload() throws IOException {
+      LOG.debug("Start datablock[{}] upload", index);
       enterState(Writing, Upload);
       return null;
     }
@@ -278,6 +355,23 @@ final class S3ADataBlocks {
 
     }
 
+    /**
+     * A block has been allocated.
+     */
+    protected void blockAllocated() {
+      if (statistics != null) {
+        statistics.blockAllocated();
+      }
+    }
+
+    /**
+     * A block has been released.
+     */
+    protected void blockReleased() {
+      if (statistics != null) {
+        statistics.blockReleased();
+      }
+    }
   }
 
   // ====================================================================
@@ -292,8 +386,10 @@ final class S3ADataBlocks {
     }
 
     @Override
-    DataBlock create(int limit) throws IOException {
-      return new ByteArrayBlock(limit);
+    DataBlock create(long index, int limit,
+        S3AInstrumentation.OutputStreamStatistics statistics)
+        throws IOException {
+      return new ByteArrayBlock(0, limit, statistics);
     }
 
   }
@@ -334,9 +430,13 @@ final class S3ADataBlocks {
     // cache data size so that it is consistent after the buffer is reset.
     private Integer dataSize;
 
-    ByteArrayBlock(int limit) {
+    ByteArrayBlock(long index,
+        int limit,
+        S3AInstrumentation.OutputStreamStatistics statistics) {
+      super(index, statistics);
       this.limit = limit;
       buffer = new S3AByteArrayOutputStream(limit);
+      blockAllocated();
     }
 
     /**
@@ -349,12 +449,12 @@ final class S3ADataBlocks {
     }
 
     @Override
-    InputStream startUpload() throws IOException {
+    BlockUploadData startUpload() throws IOException {
       super.startUpload();
       dataSize = buffer.size();
       ByteArrayInputStream bufferData = buffer.getInputStream();
       buffer = null;
-      return bufferData;
+      return new BlockUploadData(bufferData);
     }
 
     @Override
@@ -378,12 +478,14 @@ final class S3ADataBlocks {
     @Override
     protected void innerClose() {
       buffer = null;
+      blockReleased();
     }
 
     @Override
     public String toString() {
-      return "ByteArrayBlock{" +
-          "state=" + getState() +
+      return "ByteArrayBlock{"
+          +"index=" + index +
+          ", state=" + getState() +
           ", limit=" + limit +
           ", dataSize=" + dataSize +
           '}';
@@ -395,12 +497,6 @@ final class S3ADataBlocks {
   /**
    * Stream via Direct ByteBuffers; these are allocated off heap
    * via {@link DirectBufferPool}.
-   * This is actually the most complex of all the block factories,
-   * due to the need to explicitly recycle buffers; in comparison, the
-   * {@link DiskBlock} buffer delegates the work of deleting files to
-   * the {@link DiskBlock.FileDeletingInputStream}. Here the
-   * input stream {@link ByteBufferInputStream} has a similar task, along
-   * with the foundational work of streaming data from a byte array.
    */
 
   static class ByteBufferBlockFactory extends BlockFactory {
@@ -413,8 +509,10 @@ final class S3ADataBlocks {
     }
 
     @Override
-    ByteBufferBlock create(int limit) throws IOException {
-      return new ByteBufferBlock(limit);
+    ByteBufferBlock create(long index, int limit,
+        S3AInstrumentation.OutputStreamStatistics statistics)
+        throws IOException {
+      return new ByteBufferBlock(index, limit, statistics);
     }
 
     private ByteBuffer requestBuffer(int limit) {
@@ -446,21 +544,27 @@ final class S3ADataBlocks {
 
     /**
      * A DataBlock which requests a buffer from pool on creation; returns
-     * it when the output stream is closed.
+     * it when it is closed.
      */
     class ByteBufferBlock extends DataBlock {
-      private ByteBuffer buffer;
+      private ByteBuffer blockBuffer;
       private final int bufferSize;
       // cache data size so that it is consistent after the buffer is reset.
       private Integer dataSize;
 
       /**
        * Instantiate. This will request a ByteBuffer of the desired size.
+       * @param index block index
        * @param bufferSize buffer size
+       * @param statistics statistics to update
        */
-      ByteBufferBlock(int bufferSize) {
+      ByteBufferBlock(long index,
+          int bufferSize,
+          S3AInstrumentation.OutputStreamStatistics statistics) {
+        super(index, statistics);
         this.bufferSize = bufferSize;
-        buffer = requestBuffer(bufferSize);
+        blockBuffer = requestBuffer(bufferSize);
+        blockAllocated();
       }
 
       /**
@@ -473,13 +577,14 @@ final class S3ADataBlocks {
       }
 
       @Override
-      ByteBufferInputStream startUpload() throws IOException {
+      BlockUploadData startUpload() throws IOException {
         super.startUpload();
         dataSize = bufferCapacityUsed();
         // set the buffer up from reading from the beginning
-        buffer.limit(buffer.position());
-        buffer.position(0);
-        return new ByteBufferInputStream(dataSize, buffer);
+        blockBuffer.limit(blockBuffer.position());
+        blockBuffer.position(0);
+        return new BlockUploadData(
+            new ByteBufferInputStream(dataSize, blockBuffer));
       }
 
       @Override
@@ -489,182 +594,190 @@ final class S3ADataBlocks {
 
       @Override
       public int remainingCapacity() {
-        return buffer != null ? buffer.remaining() : 0;
+        return blockBuffer != null ? blockBuffer.remaining() : 0;
       }
 
       private int bufferCapacityUsed() {
-        return buffer.capacity() - buffer.remaining();
+        return blockBuffer.capacity() - blockBuffer.remaining();
       }
 
       @Override
       int write(byte[] b, int offset, int len) throws IOException {
         super.write(b, offset, len);
         int written = Math.min(remainingCapacity(), len);
-        buffer.put(b, offset, written);
+        blockBuffer.put(b, offset, written);
         return written;
       }
 
+      /**
+       * Closing the block will release the buffer.
+       */
       @Override
       protected void innerClose() {
-        buffer = null;
+        if (blockBuffer != null) {
+          blockReleased();
+          releaseBuffer(blockBuffer);
+          blockBuffer = null;
+        }
       }
 
       @Override
       public String toString() {
         return "ByteBufferBlock{"
-            + "state=" + getState() +
+            + "index=" + index +
+            ", state=" + getState() +
             ", dataSize=" + dataSize() +
             ", limit=" + bufferSize +
             ", remainingCapacity=" + remainingCapacity() +
             '}';
       }
 
-    }
-
-    /**
-     * Provide an input stream from a byte buffer; supporting
-     * {@link #mark(int)}, which is required to enable replay of failed
-     * PUT attempts.
-     * This input stream returns the buffer to the pool afterwards.
-     */
-    class ByteBufferInputStream extends InputStream {
+      /**
+       * Provide an input stream from a byte buffer; supporting
+       * {@link #mark(int)}, which is required to enable replay of failed
+       * PUT attempts.
+       */
+      class ByteBufferInputStream extends InputStream {
 
-      private final int size;
-      private ByteBuffer byteBuffer;
+        private final int size;
+        private ByteBuffer byteBuffer;
 
-      ByteBufferInputStream(int size, ByteBuffer byteBuffer) {
-        LOG.debug("Creating ByteBufferInputStream of size {}", size);
-        this.size = size;
-        this.byteBuffer = byteBuffer;
-      }
+        ByteBufferInputStream(int size,
+            ByteBuffer byteBuffer) {
+          LOG.debug("Creating ByteBufferInputStream of size {}", size);
+          this.size = size;
+          this.byteBuffer = byteBuffer;
+        }
 
-      /**
-       * Return the buffer to the pool after the stream is closed.
-       */
-      @Override
-      public synchronized void close() {
-        if (byteBuffer != null) {
-          LOG.debug("releasing buffer");
-          releaseBuffer(byteBuffer);
+        /**
+         * After the stream is closed, set the local reference to the byte
+         * buffer to null; this guarantees that future attempts to use
+         * stream methods will fail.
+         */
+        @Override
+        public synchronized void close() {
+          LOG.debug("ByteBufferInputStream.close() for {}",
+              ByteBufferBlock.super.toString());
           byteBuffer = null;
         }
-      }
 
-      /**
-       * Verify that the stream is open.
-       * @throws IOException if the stream is closed
-       */
-      private void verifyOpen() throws IOException {
-        if (byteBuffer == null) {
-          throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+        /**
+         * Verify that the stream is open.
+         * @throws IOException if the stream is closed
+         */
+        private void verifyOpen() throws IOException {
+          if (byteBuffer == null) {
+            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+          }
         }
-      }
 
-      public synchronized int read() throws IOException {
-        if (available() > 0) {
-          return byteBuffer.get() & 0xFF;
-        } else {
-          return -1;
+        public synchronized int read() throws IOException {
+          if (available() > 0) {
+            return byteBuffer.get() & 0xFF;
+          } else {
+            return -1;
+          }
         }
-      }
 
-      @Override
-      public synchronized long skip(long offset) throws IOException {
-        verifyOpen();
-        long newPos = position() + offset;
-        if (newPos < 0) {
-          throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+        @Override
+        public synchronized long skip(long offset) throws IOException {
+          verifyOpen();
+          long newPos = position() + offset;
+          if (newPos < 0) {
+            throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+          }
+          if (newPos > size) {
+            throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+          }
+          byteBuffer.position((int) newPos);
+          return newPos;
         }
-        if (newPos > size) {
-          throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+
+        @Override
+        public synchronized int available() {
+          Preconditions.checkState(byteBuffer != null,
+              FSExceptionMessages.STREAM_IS_CLOSED);
+          return byteBuffer.remaining();
         }
-        byteBuffer.position((int) newPos);
-        return newPos;
-      }
 
-      @Override
-      public synchronized int available() {
-        Preconditions.checkState(byteBuffer != null,
-            FSExceptionMessages.STREAM_IS_CLOSED);
-        return byteBuffer.remaining();
-      }
+        /**
+         * Get the current buffer position.
+         * @return the buffer position
+         */
+        public synchronized int position() {
+          return byteBuffer.position();
+        }
 
-      /**
-       * Get the current buffer position.
-       * @return the buffer position
-       */
-      public synchronized int position() {
-        return byteBuffer.position();
-      }
+        /**
+         * Check if there is data left.
+         * @return true if there is data remaining in the buffer.
+         */
+        public synchronized boolean hasRemaining() {
+          return byteBuffer.hasRemaining();
+        }
 
-      /**
-       * Check if there is data left.
-       * @return true if there is data remaining in the buffer.
-       */
-      public synchronized boolean hasRemaining() {
-        return byteBuffer.hasRemaining();
-      }
+        @Override
+        public synchronized void mark(int readlimit) {
+          LOG.debug("mark at {}", position());
+          byteBuffer.mark();
+        }
 
-      @Override
-      public synchronized void mark(int readlimit) {
-        LOG.debug("mark at {}", position());
-        byteBuffer.mark();
-      }
+        @Override
+        public synchronized void reset() throws IOException {
+          LOG.debug("reset");
+          byteBuffer.reset();
+        }
 
-      @Override
-      public synchronized void reset() throws IOException {
-        LOG.debug("reset");
-        byteBuffer.reset();
-      }
+        @Override
+        public boolean markSupported() {
+          return true;
+        }
 
-      @Override
-      public boolean markSupported() {
-        return true;
-      }
+        /**
+         * Read in data.
+         * @param b destination buffer
+         * @param offset offset within the buffer
+         * @param length length of bytes to read
+         * @throws EOFException if the position is negative
+         * @throws IndexOutOfBoundsException if there isn't space for the
+         * amount of data requested.
+         * @throws IllegalArgumentException other arguments are invalid.
+         */
+        @SuppressWarnings("NullableProblems")
+        public synchronized int read(byte[] b, int offset, int length)
+            throws IOException {
+          Preconditions.checkArgument(length >= 0, "length is negative");
+          Preconditions.checkArgument(b != null, "Null buffer");
+          if (b.length - offset < length) {
+            throw new IndexOutOfBoundsException(
+                FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+                    + ": request length =" + length
+                    + ", with offset =" + offset
+                    + "; buffer capacity =" + (b.length - offset));
+          }
+          verifyOpen();
+          if (!hasRemaining()) {
+            return -1;
+          }
 
-      /**
-       * Read in data.
-       * @param buffer destination buffer
-       * @param offset offset within the buffer
-       * @param length length of bytes to read
-       * @throws EOFException if the position is negative
-       * @throws IndexOutOfBoundsException if there isn't space for the
-       * amount of data requested.
-       * @throws IllegalArgumentException other arguments are invalid.
-       */
-      @SuppressWarnings("NullableProblems")
-      public synchronized int read(byte[] buffer, int offset, int length)
-          throws IOException {
-        Preconditions.checkArgument(length >= 0, "length is negative");
-        Preconditions.checkArgument(buffer != null, "Null buffer");
-        if (buffer.length - offset < length) {
-          throw new IndexOutOfBoundsException(
-              FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
-                  + ": request length =" + length
-                  + ", with offset =" + offset
-                  + "; buffer capacity =" + (buffer.length - offset));
+          int toRead = Math.min(length, available());
+          byteBuffer.get(b, offset, toRead);
+          return toRead;
         }
-        verifyOpen();
-        if (!hasRemaining()) {
-          return -1;
-        }
-
-        int toRead = Math.min(length, available());
-        byteBuffer.get(buffer, offset, toRead);
-        return toRead;
-      }
 
-      @Override
-      public String toString() {
-        final StringBuilder sb = new StringBuilder(
-            "ByteBufferInputStream{");
-        sb.append("size=").append(size);
-        ByteBuffer buffer = this.byteBuffer;
-        if (buffer != null) {
-          sb.append(", available=").append(buffer.remaining());
+        @Override
+        public String toString() {
+          final StringBuilder sb = new StringBuilder(
+              "ByteBufferInputStream{");
+          sb.append("size=").append(size);
+          ByteBuffer buf = this.byteBuffer;
+          if (buf != null) {
+            sb.append(", available=").append(buf.remaining());
+          }
+          sb.append(", ").append(ByteBufferBlock.super.toString());
+          sb.append('}');
+          return sb.toString();
         }
-        sb.append('}');
-        return sb.toString();
       }
     }
   }
@@ -681,22 +794,29 @@ final class S3ADataBlocks {
     }
 
     /**
-     * Create a temp file and a block which writes to it.
+     * Create a temp file and a {@link DiskBlock} instance to manage it.
+     *
+     * @param index block index
      * @param limit limit of the block.
+     * @param statistics statistics to update
      * @return the new block
      * @throws IOException IO problems
      */
     @Override
-    DataBlock create(int limit) throws IOException {
+    DataBlock create(long index,
+        int limit,
+        S3AInstrumentation.OutputStreamStatistics statistics)
+        throws IOException {
       File destFile = getOwner()
-          .createTmpFileForWrite("s3ablock", limit, getOwner().getConf());
-      return new DiskBlock(destFile, limit);
+          .createTmpFileForWrite(String.format("s3ablock-%04d-", index),
+              limit, getOwner().getConf());
+      return new DiskBlock(destFile, limit, index, statistics);
     }
   }
 
   /**
    * Stream to a file.
-   * This will stop at the limit; the caller is expected to create a new block
+   * This will stop at the limit; the caller is expected to create a new block.
    */
   static class DiskBlock extends DataBlock {
 
@@ -704,12 +824,17 @@ final class S3ADataBlocks {
     private final File bufferFile;
     private final int limit;
     private BufferedOutputStream out;
-    private InputStream uploadStream;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
 
-    DiskBlock(File bufferFile, int limit)
+    DiskBlock(File bufferFile,
+        int limit,
+        long index,
+        S3AInstrumentation.OutputStreamStatistics statistics)
         throws FileNotFoundException {
+      super(index, statistics);
       this.limit = limit;
       this.bufferFile = bufferFile;
+      blockAllocated();
       out = new BufferedOutputStream(new FileOutputStream(bufferFile));
     }
 
@@ -738,7 +863,7 @@ final class S3ADataBlocks {
     }
 
     @Override
-    InputStream startUpload() throws IOException {
+    BlockUploadData startUpload() throws IOException {
       super.startUpload();
       try {
         out.flush();
@@ -746,8 +871,7 @@ final class S3ADataBlocks {
         out.close();
         out = null;
       }
-      uploadStream = new FileInputStream(bufferFile);
-      return new FileDeletingInputStream(uploadStream);
+      return new BlockUploadData(bufferFile);
     }
 
     /**
@@ -755,6 +879,7 @@ final class S3ADataBlocks {
      * exists.
      * @throws IOException IO problems
      */
+    @SuppressWarnings("UnnecessaryDefault")
     @Override
     protected void innerClose() throws IOException {
       final DestState state = getState();
@@ -763,20 +888,19 @@ final class S3ADataBlocks {
       case Writing:
         if (bufferFile.exists()) {
           // file was not uploaded
-          LOG.debug("Deleting buffer file as upload did not start");
-          boolean deleted = bufferFile.delete();
-          if (!deleted && bufferFile.exists()) {
-            LOG.warn("Failed to delete buffer file {}", bufferFile);
-          }
+          LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
+              index);
+          closeBlock();
         }
         break;
 
       case Upload:
-        LOG.debug("Buffer file {} exists \u2014close upload stream", bufferFile);
+        LOG.debug("Block[{}]: Buffer file {} exists \u2014close upload stream",
+            index, bufferFile);
         break;
 
       case Closed:
-        // no-op
+        closeBlock();
         break;
 
       default:
@@ -798,7 +922,8 @@ final class S3ADataBlocks {
     @Override
     public String toString() {
       String sb = "FileBlock{"
-          + "destFile=" + bufferFile +
+          + "index=" + index
+          + ", destFile=" + bufferFile +
           ", state=" + getState() +
           ", dataSize=" + dataSize() +
           ", limit=" + limit +
@@ -807,31 +932,20 @@ final class S3ADataBlocks {
     }
 
     /**
-     * An input stream which deletes the buffer file when closed.
+     * Close the block.
+     * This will delete the block's buffer file if the block has
+     * not previously been closed.
      */
-    private final class FileDeletingInputStream extends FilterInputStream {
-      private final AtomicBoolean closed = new AtomicBoolean(false);
-
-      FileDeletingInputStream(InputStream source) {
-        super(source);
-      }
-
-      /**
-       * Delete the input file when closed.
-       * @throws IOException IO problem
-       */
-      @Override
-      public void close() throws IOException {
-        try {
-          super.close();
-        } finally {
-          if (!closed.getAndSet(true)) {
-            if (!bufferFile.delete()) {
-              LOG.warn("delete({}) returned false",
-                  bufferFile.getAbsoluteFile());
-            }
-          }
+    void closeBlock() {
+      LOG.debug("block[{}]: closeBlock()", index);
+      if (!closed.getAndSet(true)) {
+        blockReleased();
+        if (!bufferFile.delete() && bufferFile.exists()) {
+          LOG.warn("delete({}) returned false",
+              bufferFile.getAbsoluteFile());
         }
+      } else {
+        LOG.debug("block[{}]: skipping re-entrant closeBlock()", index);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index bc47918..1786e68 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1022,6 +1022,7 @@ public class S3AFileSystem extends FileSystem {
    */
   public PutObjectRequest newPutObjectRequest(String key,
       ObjectMetadata metadata, File srcfile) {
+    Preconditions.checkNotNull(srcfile);
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         srcfile);
     setOptionalPutRequestParameters(putObjectRequest);
@@ -1039,8 +1040,9 @@ public class S3AFileSystem extends FileSystem {
    * @param inputStream source data.
    * @return the request
    */
-  PutObjectRequest newPutObjectRequest(String key,
+  private PutObjectRequest newPutObjectRequest(String key,
       ObjectMetadata metadata, InputStream inputStream) {
+    Preconditions.checkNotNull(inputStream);
     PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
         inputStream, metadata);
     setOptionalPutRequestParameters(putObjectRequest);
@@ -1077,12 +1079,16 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
-   * PUT an object, incrementing the put requests and put bytes
+   * Start a transfer-manager managed async PUT of an object,
+   * incrementing the put requests and put bytes
    * counters.
    * It does not update the other counters,
    * as existing code does that as progress callbacks come in.
    * Byte length is calculated from the file length, or, if there is no
    * file, from the content length of the header.
+   * Because the operation is async, any stream supplied in the request
+   * must reference data (files, buffers) which stay valid until the upload
+   * completes.
    * @param putObjectRequest the request
    * @return the upload initiated
    */
@@ -1108,6 +1114,7 @@ public class S3AFileSystem extends FileSystem {
    * PUT an object directly (i.e. not via the transfer manager).
    * Byte length is calculated from the file length, or, if there is no
    * file, from the content length of the header.
+   * <i>Important: this call will close any input stream in the request.</i>
    * @param putObjectRequest the request
    * @return the upload initiated
    * @throws AmazonClientException on problems
@@ -1133,7 +1140,8 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Upload part of a multi-partition file.
-   * Increments the write and put counters
+   * Increments the write and put counters.
+   * <i>Important: this call does not close any input stream in the request.</i>
    * @param request request
    * @return the result of the operation.
    * @throws AmazonClientException on problems
@@ -2309,14 +2317,28 @@ public class S3AFileSystem extends FileSystem {
 
     /**
      * Create a {@link PutObjectRequest} request.
-     * The metadata is assumed to have been configured with the size of the
-     * operation.
+     * If {@code length} is set, the metadata is configured with the size of
+     * the upload.
      * @param inputStream source data.
      * @param length size, if known. Use -1 for not known
      * @return the request
      */
     PutObjectRequest newPutRequest(InputStream inputStream, long length) {
-      return newPutObjectRequest(key, newObjectMetadata(length), inputStream);
+      PutObjectRequest request = newPutObjectRequest(key,
+          newObjectMetadata(length), inputStream);
+      return request;
+    }
+
+    /**
+     * Create a {@link PutObjectRequest} request to upload a file.
+     * @param sourceFile source file
+     * @return the request
+     */
+    PutObjectRequest newPutRequest(File sourceFile) {
+      int length = (int) sourceFile.length();
+      PutObjectRequest request = newPutObjectRequest(key,
+          newObjectMetadata(length), sourceFile);
+      return request;
     }
 
     /**
@@ -2379,6 +2401,8 @@ public class S3AFileSystem extends FileSystem {
       Preconditions.checkNotNull(partETags);
       Preconditions.checkArgument(!partETags.isEmpty(),
           "No partitions have been uploaded");
+      LOG.debug("Completing multipart upload {} with {} parts",
+          uploadId, partETags.size());
       return s3.completeMultipartUpload(
           new CompleteMultipartUploadRequest(bucket,
               key,
@@ -2389,42 +2413,51 @@ public class S3AFileSystem extends FileSystem {
     /**
      * Abort a multipart upload operation.
      * @param uploadId multipart operation Id
-     * @return the result
      * @throws AmazonClientException on problems.
      */
     void abortMultipartUpload(String uploadId) throws AmazonClientException {
+      LOG.debug("Aborting multipart upload {}", uploadId);
       s3.abortMultipartUpload(
           new AbortMultipartUploadRequest(bucket, key, uploadId));
     }
 
     /**
      * Create and initialize a part request of a multipart upload.
+     * Exactly one of: {@code uploadStream} or {@code sourceFile}
+     * must be specified.
      * @param uploadId ID of ongoing upload
-     * @param uploadStream source of data to upload
      * @param partNumber current part number of the upload
      * @param size amount of data
+     * @param uploadStream source of data to upload
+     * @param sourceFile optional source file.
      * @return the request.
      */
     UploadPartRequest newUploadPartRequest(String uploadId,
-        InputStream uploadStream,
-        int partNumber,
-        int size) {
+        int partNumber, int size, InputStream uploadStream, File sourceFile) {
       Preconditions.checkNotNull(uploadId);
-      Preconditions.checkNotNull(uploadStream);
+      // exactly one source must be set; xor verifies this
+      Preconditions.checkArgument((uploadStream != null) ^ (sourceFile != null),
+          "Data source");
       Preconditions.checkArgument(size > 0, "Invalid partition size %s", size);
-      Preconditions.checkArgument(partNumber> 0 && partNumber <=10000,
+      Preconditions.checkArgument(partNumber > 0 && partNumber <= 10000,
           "partNumber must be between 1 and 10000 inclusive, but is %s",
           partNumber);
 
       LOG.debug("Creating part upload request for {} #{} size {}",
           uploadId, partNumber, size);
-      return new UploadPartRequest()
+      UploadPartRequest request = new UploadPartRequest()
           .withBucketName(bucket)
           .withKey(key)
           .withUploadId(uploadId)
-          .withInputStream(uploadStream)
           .withPartNumber(partNumber)
           .withPartSize(size);
+      if (uploadStream != null) {
+        // there's an upload stream. Bind to it.
+        request.setInputStream(uploadStream);
+      } else {
+        request.setFile(sourceFile);
+      }
+      return request;
     }
 
     /**
@@ -2439,6 +2472,21 @@ public class S3AFileSystem extends FileSystem {
       sb.append('}');
       return sb.toString();
     }
+
+    /**
+     * PUT an object directly (i.e. not via the transfer manager).
+     * @param putObjectRequest the request
+     * @return the upload initiated
+     * @throws IOException on problems
+     */
+    PutObjectResult putObject(PutObjectRequest putObjectRequest)
+        throws IOException {
+      try {
+        return putObjectDirect(putObjectRequest);
+      } catch (AmazonClientException e) {
+        throw translateException("put", putObjectRequest.getKey(), e);
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index fb8c852..d2e7a88 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -36,6 +36,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 
@@ -428,7 +429,7 @@ public class S3AInstrumentation {
     if (gauge != null) {
       gauge.decr(count);
     } else {
-      LOG.debug("No Gauge: " + op);
+      LOG.debug("No Gauge: {}", op);
     }
   }
 
@@ -676,6 +677,8 @@ public class S3AInstrumentation {
     private final AtomicLong transferDuration = new AtomicLong(0);
     private final AtomicLong queueDuration = new AtomicLong(0);
     private final AtomicLong exceptionsInMultipartFinalize = new AtomicLong(0);
+    private final AtomicInteger blocksAllocated = new AtomicInteger(0);
+    private final AtomicInteger blocksReleased = new AtomicInteger(0);
 
     private Statistics statistics;
 
@@ -684,6 +687,20 @@ public class S3AInstrumentation {
     }
 
     /**
+     * A block has been allocated.
+     */
+    void blockAllocated() {
+      blocksAllocated.incrementAndGet();
+    }
+
+    /**
+     * A block has been released.
+     */
+    void blockReleased() {
+      blocksReleased.incrementAndGet();
+    }
+
+    /**
      * Block is queued for upload.
      */
     void blockUploadQueued(int blockSize) {
@@ -778,6 +795,24 @@ public class S3AInstrumentation {
       return queueDuration.get() + transferDuration.get();
     }
 
+    public int blocksAllocated() {
+      return blocksAllocated.get();
+    }
+
+    public int blocksReleased() {
+      return blocksReleased.get();
+    }
+
+    /**
+     * Get counters of blocks actively allocated; my be inaccurate
+     * if the numbers change during the (non-synchronized) calculation.
+     * @return the number of actively allocated blocks.
+     */
+    public int blocksActivelyAllocated() {
+      return blocksAllocated.get() - blocksReleased.get();
+    }
+
+
     @Override
     public String toString() {
       final StringBuilder sb = new StringBuilder(
@@ -789,6 +824,9 @@ public class S3AInstrumentation {
       sb.append(", blockUploadsFailed=").append(blockUploadsFailed);
       sb.append(", bytesPendingUpload=").append(bytesPendingUpload);
       sb.append(", bytesUploaded=").append(bytesUploaded);
+      sb.append(", blocksAllocated=").append(blocksAllocated);
+      sb.append(", blocksReleased=").append(blocksReleased);
+      sb.append(", blocksActivelyAllocated=").append(blocksActivelyAllocated());
       sb.append(", exceptionsInMultipartFinalize=").append(
           exceptionsInMultipartFinalize);
       sb.append(", transferDuration=").append(transferDuration).append(" ms");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 5311211..84f3c99 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -733,4 +733,30 @@ public final class S3AUtils {
     }
     return null;
   }
+
+  /**
+   * Close the Closeable objects and <b>ignore</b> any Exception or
+   * null pointers.
+   * (This is the SLF4J equivalent of that in {@code IOUtils}).
+   * @param log the log to log at debug level. Can be null.
+   * @param closeables the objects to close
+   */
+  public static void closeAll(Logger log,
+      java.io.Closeable... closeables) {
+    for (java.io.Closeable c : closeables) {
+      if (c != null) {
+        try {
+          if (log != null) {
+            log.debug("Closing {}", c);
+          }
+          c.close();
+        } catch (Exception e) {
+          if (log != null && log.isDebugEnabled()) {
+            log.debug("Exception in closing {}", c, e);
+          }
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
index 74cad00..87f676c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputArray.java
@@ -24,9 +24,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.io.IOUtils;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 
@@ -38,6 +41,14 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
  * multipart tests are kept in scale tests.
  */
 public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
+  private static final int BLOCK_SIZE = 256 * 1024;
+
+  private static byte[] dataset;
+
+  @BeforeClass
+  public static void setupDataset() {
+    dataset = ContractTestUtils.dataset(BLOCK_SIZE, 0, 256);
+  }
 
   @Override
   protected Configuration createConfiguration() {
@@ -65,9 +76,9 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
   }
 
   @Test(expected = IOException.class)
-  public void testDoubleStreamClose() throws Throwable {
-    Path dest = path("testDoubleStreamClose");
-    describe(" testDoubleStreamClose");
+  public void testWriteAfterStreamClose() throws Throwable {
+    Path dest = path("testWriteAfterStreamClose");
+    describe(" testWriteAfterStreamClose");
     FSDataOutputStream stream = getFileSystem().create(dest, true);
     byte[] data = ContractTestUtils.dataset(16, 'a', 26);
     try {
@@ -79,7 +90,25 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
     }
   }
 
-  public void verifyUpload(String name, int fileSize) throws IOException {
+  @Test
+  public void testBlocksClosed() throws Throwable {
+    Path dest = path("testBlocksClosed");
+    describe(" testBlocksClosed");
+    FSDataOutputStream stream = getFileSystem().create(dest, true);
+    S3AInstrumentation.OutputStreamStatistics statistics
+        = S3ATestUtils.getOutputStreamStatistics(stream);
+    byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+    stream.write(data);
+    LOG.info("closing output stream");
+    stream.close();
+    assertEquals("total allocated blocks in " + statistics,
+        1, statistics.blocksAllocated());
+    assertEquals("actively allocated blocks in " + statistics,
+        0, statistics.blocksActivelyAllocated());
+    LOG.info("end of test case");
+  }
+
+  private void verifyUpload(String name, int fileSize) throws IOException {
     Path dest = path(name);
     describe(name + " upload to " + dest);
     ContractTestUtils.createAndVerifyFile(
@@ -87,4 +116,43 @@ public class ITestS3ABlockOutputArray extends AbstractS3ATestBase {
         dest,
         fileSize);
   }
+
+  /**
+   * Create a factory for used in mark/reset tests.
+   * @param fileSystem source FS
+   * @return the factory
+   */
+  protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
+    return new S3ADataBlocks.ArrayBlockFactory(fileSystem);
+  }
+
+  private void markAndResetDatablock(S3ADataBlocks.BlockFactory factory)
+      throws Exception {
+    S3AInstrumentation instrumentation =
+        new S3AInstrumentation(new URI("s3a://example"));
+    S3AInstrumentation.OutputStreamStatistics outstats
+        = instrumentation.newOutputStreamStatistics(null);
+    S3ADataBlocks.DataBlock block = factory.create(1, BLOCK_SIZE, outstats);
+    block.write(dataset, 0, dataset.length);
+    S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
+    InputStream stream = uploadData.getUploadStream();
+    assertNotNull(stream);
+    assertTrue("Mark not supported in " + stream, stream.markSupported());
+    assertEquals(0, stream.read());
+    stream.mark(BLOCK_SIZE);
+    // read a lot
+    long l = 0;
+    while (stream.read() != -1) {
+      // do nothing
+      l++;
+    }
+    stream.reset();
+    assertEquals(1, stream.read());
+  }
+
+  @Test
+  public void testMarkReset() throws Throwable {
+    markAndResetDatablock(createFactory(getFileSystem()));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
index 504426b..02f3de0 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputByteBuffer.java
@@ -17,7 +17,6 @@
  */
 
 package org.apache.hadoop.fs.s3a;
-
 /**
  * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
  */
@@ -27,4 +26,8 @@ public class ITestS3ABlockOutputByteBuffer extends ITestS3ABlockOutputArray {
     return Constants.FAST_UPLOAD_BYTEBUFFER;
   }
 
+  protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
+    return new S3ADataBlocks.ByteBufferBlockFactory(fileSystem);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
index 550706d..abe8656 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockOutputDisk.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import org.junit.Assume;
+
 /**
  * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering.
  */
@@ -27,4 +29,14 @@ public class ITestS3ABlockOutputDisk extends ITestS3ABlockOutputArray {
     return Constants.FAST_UPLOAD_BUFFER_DISK;
   }
 
+  /**
+   * The disk stream doesn't support mark/reset; calls
+   * {@code Assume} to skip the test.
+   * @param fileSystem source FS
+   * @return null
+   */
+  protected S3ADataBlocks.BlockFactory createFactory(S3AFileSystem fileSystem) {
+    Assume.assumeTrue("mark/reset nopt supoprted", false);
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 567bacb..9528967 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
@@ -544,4 +545,16 @@ public final class S3ATestUtils {
     }
     Assume.assumeTrue(message, condition);
   }
+
+  /**
+   * Get the statistics from a wrapped block output stream.
+   * @param out output stream
+   * @return the (active) stats of the write
+   */
+  public static S3AInstrumentation.OutputStreamStatistics
+      getOutputStreamStatistics(FSDataOutputStream out) {
+    S3ABlockOutputStream blockOutputStream
+        = (S3ABlockOutputStream) out.getWrappedStream();
+    return blockOutputStream.getStatistics();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
index 9fa95fd..700ef5c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestDataBlocks.java
@@ -51,9 +51,8 @@ public class TestDataBlocks extends Assert {
              new S3ADataBlocks.ByteBufferBlockFactory(null)) {
       int limit = 128;
       S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
-          = factory.create(limit);
-      assertEquals("outstanding buffers in " + factory,
-          1, factory.getOutstandingBufferCount());
+          = factory.create(1, limit, null);
+      assertOutstandingBuffers(factory, 1);
 
       byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
       int bufferLen = buffer.length;
@@ -66,24 +65,23 @@ public class TestDataBlocks extends Assert {
           block.hasCapacity(limit - bufferLen));
 
       // now start the write
-      S3ADataBlocks.ByteBufferBlockFactory.ByteBufferInputStream
-          stream = block.startUpload();
+      S3ADataBlocks.BlockUploadData blockUploadData = block.startUpload();
+      S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream
+          stream =
+          (S3ADataBlocks.ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream)
+              blockUploadData.getUploadStream();
+      assertTrue("Mark not supported in " + stream, stream.markSupported());
       assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());
       int expected = bufferLen;
       assertEquals("wrong available() in " + stream,
           expected, stream.available());
 
       assertEquals('t', stream.read());
+      stream.mark(limit);
       expected--;
       assertEquals("wrong available() in " + stream,
           expected, stream.available());
 
-      // close the block. The buffer must remain outstanding here;
-      // the stream manages the lifecycle of it now
-      block.close();
-      assertEquals("outstanding buffers in " + factory,
-          1, factory.getOutstandingBufferCount());
-      block.close();
 
       // read into a byte array with an offset
       int offset = 5;
@@ -109,16 +107,31 @@ public class TestDataBlocks extends Assert {
           0, stream.available());
       assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
 
+      // go the mark point
+      stream.reset();
+      assertEquals('e', stream.read());
+
       // when the stream is closed, the data should be returned
       stream.close();
-      assertEquals("outstanding buffers in " + factory,
-          0, factory.getOutstandingBufferCount());
+      assertOutstandingBuffers(factory, 1);
+      block.close();
+      assertOutstandingBuffers(factory, 0);
       stream.close();
-      assertEquals("outstanding buffers in " + factory,
-          0, factory.getOutstandingBufferCount());
-
+      assertOutstandingBuffers(factory, 0);
     }
 
   }
 
+  /**
+   * Assert the number of buffers active for a block factory.
+   * @param factory factory
+   * @param expectedCount expected count.
+   */
+  private static void assertOutstandingBuffers(
+      S3ADataBlocks.ByteBufferBlockFactory factory,
+      int expectedCount) {
+    assertEquals("outstanding buffers in " + factory,
+        expectedCount, factory.getOutstandingBufferCount());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dab00da1/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
index fcb6444..89fae82 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -34,11 +34,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.util.Progressable;
 
@@ -159,13 +161,20 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
 
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
-
+    S3AInstrumentation.OutputStreamStatistics streamStatistics;
     long blocksPer10MB = blocksPerMB * 10;
     ProgressCallback progress = new ProgressCallback(timer);
     try (FSDataOutputStream out = fs.create(hugefile,
         true,
         uploadBlockSize,
         progress)) {
+      try {
+        streamStatistics = getOutputStreamStatistics(out);
+      } catch (ClassCastException e) {
+        LOG.info("Wrapped output stream is not block stream: {}",
+            out.getWrappedStream());
+        streamStatistics = null;
+      }
 
       for (long block = 1; block <= blocks; block++) {
         out.write(data);
@@ -190,7 +199,8 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
         }
       }
       // now close the file
-      LOG.info("Closing file and completing write operation");
+      LOG.info("Closing stream {}", out);
+      LOG.info("Statistics : {}", streamStatistics);
       ContractTestUtils.NanoTimer closeTimer
           = new ContractTestUtils.NanoTimer();
       out.close();
@@ -201,6 +211,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
         filesizeMB, uploadBlockSize);
     logFSState();
     bandwidth(timer, filesize);
+    LOG.info("Statistics after stream closed: {}", streamStatistics);
     long putRequestCount = storageStatistics.getLong(putRequests);
     Long putByteCount = storageStatistics.getLong(putBytes);
     LOG.info("PUT {} bytes in {} operations; {} MB/operation",
@@ -214,7 +225,14 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     S3AFileStatus status = fs.getFileStatus(hugefile);
     ContractTestUtils.assertIsFile(hugefile, status);
     assertEquals("File size in " + status, filesize, status.getLen());
-    progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize);
+    if (progress != null) {
+      progress.verifyNoFailures("Put file " + hugefile
+          + " of size " + filesize);
+    }
+    if (streamStatistics != null) {
+      assertEquals("actively allocated blocks in " + streamStatistics,
+          0, streamStatistics.blocksActivelyAllocated());
+    }
   }
 
   /**
@@ -285,7 +303,9 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
   void assumeHugeFileExists() throws IOException {
     S3AFileSystem fs = getFileSystem();
     ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
-    ContractTestUtils.assertIsFile(fs, hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
+    ContractTestUtils.assertIsFile(hugefile, status);
+    assertTrue("File " + hugefile + " is empty", status.getLen() > 0);
   }
 
   private void logFSState() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org