You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/07 20:26:07 UTC

[15/50] [abbrv] hadoop git commit: HADOOP-13171. Add StorageStatistics to S3A; instrument some more operations. Contributed by Steve Loughran.

HADOOP-13171. Add StorageStatistics to S3A; instrument some more operations. Contributed by Steve Loughran.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c58a59f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c58a59f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c58a59f7

Branch: refs/heads/YARN-4757
Commit: c58a59f7081d55dd2108545ebf9ee48cf43ca944
Parents: 97e2449
Author: Chris Nauroth <cn...@apache.org>
Authored: Fri Jun 3 08:55:33 2016 -0700
Committer: Chris Nauroth <cn...@apache.org>
Committed: Fri Jun 3 08:55:33 2016 -0700

----------------------------------------------------------------------
 .../hadoop/fs/contract/ContractTestUtils.java   | 420 +++++++++++++++
 .../fs/s3a/ProgressableProgressListener.java    |  94 ++++
 .../hadoop/fs/s3a/S3AFastOutputStream.java      |  65 +--
 .../org/apache/hadoop/fs/s3a/S3AFileStatus.java |   7 +
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 507 ++++++++++++++-----
 .../hadoop/fs/s3a/S3AInstrumentation.java       | 218 +++++---
 .../apache/hadoop/fs/s3a/S3AOutputStream.java   |  98 +---
 .../hadoop/fs/s3a/S3AStorageStatistics.java     | 104 ++++
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java |  48 ++
 .../org/apache/hadoop/fs/s3a/Statistic.java     | 143 ++++++
 .../src/site/markdown/tools/hadoop-aws/index.md |  12 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  | 153 ++++++
 .../hadoop/fs/s3a/TestS3AFileOperationCost.java | 191 +++++++
 .../hadoop/fs/s3a/scale/S3AScaleTestBase.java   | 154 ++----
 .../fs/s3a/scale/TestS3ADeleteManyFiles.java    |  10 +-
 .../s3a/scale/TestS3ADirectoryPerformance.java  | 189 +++++++
 .../scale/TestS3AInputStreamPerformance.java    |   6 +-
 .../src/test/resources/log4j.properties         |   4 +-
 18 files changed, 1984 insertions(+), 439 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 6343d40..20ba075 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
 import org.junit.internal.AssumptionViolatedException;
@@ -34,8 +36,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -892,4 +900,416 @@ public class ContractTestUtils extends Assert {
       fs.delete(objectPath, false);
     }
   }
+
+  /**
+   * Make times more readable, by adding a "," every three digits.
+   * @param nanos nanos or other large number
+   * @return a string for logging
+   */
+  public static String toHuman(long nanos) {
+    return String.format(Locale.ENGLISH, "%,d", nanos);
+  }
+
+  /**
+   * Log the bandwidth of a timer as inferred from the number of
+   * bytes processed.
+   * @param timer timer
+   * @param bytes bytes processed in the time period
+   */
+  public static void bandwidth(NanoTimer timer, long bytes) {
+    LOG.info("Bandwidth = {}  MB/S",
+        timer.bandwidthDescription(bytes));
+  }
+
+  /**
+   * Work out the bandwidth in MB/s.
+   * @param bytes bytes
+   * @param durationNS duration in nanos
+   * @return the number of megabytes/second of the recorded operation
+   */
+  public static double bandwidthMBs(long bytes, long durationNS) {
+    return (bytes * 1000.0) / durationNS;
+  }
+
+  /**
+   * Recursively create a directory tree.
+   * Return the details about the created tree. The files and directories
+   * are those created under the path, not the base directory created. That
+   * is retrievable via {@link TreeScanResults#getBasePath()}.
+   * @param fs filesystem
+   * @param current parent dir
+   * @param depth depth of directory tree
+   * @param width width: subdirs per entry
+   * @param files number of files per entry
+   * @param filesize size of files to create in bytes.
+   * @return the details about the created tree.
+   * @throws IOException IO Problems
+   */
+  public static TreeScanResults createSubdirs(FileSystem fs,
+      Path current,
+      int depth,
+      int width,
+      int files,
+      int filesize) throws IOException {
+    return createSubdirs(fs, current, depth, width, files,
+        filesize, "dir-", "file-", "0");
+  }
+
+  /**
+   * Recursively create a directory tree.
+   * @param fs filesystem
+   * @param current the current dir in the walk
+   * @param depth depth of directory tree
+   * @param width width: subdirs per entry
+   * @param files number of files per entry
+   * @param filesize size of files to create in bytes.
+   * @param dirPrefix prefix for directory entries
+   * @param filePrefix prefix for file entries
+   * @param marker string which is slowly built up to uniquely name things
+   * @return the details about the created tree.
+   * @throws IOException IO Problems
+   */
+  public static TreeScanResults createSubdirs(FileSystem fs,
+      Path current,
+      int depth,
+      int width,
+      int files,
+      int filesize,
+      String dirPrefix,
+      String filePrefix,
+      String marker) throws IOException {
+    fs.mkdirs(current);
+    TreeScanResults results = new TreeScanResults(current);
+    if (depth > 0) {
+      byte[] data = dataset(filesize, 'a', 'z');
+      for (int i = 0; i < files; i++) {
+        String name = String.format("%s-%s-%04d.txt", filePrefix, marker, i);
+        Path path = new Path(current, name);
+        createFile(fs, path, true, data);
+        results.add(fs, path);
+      }
+      for (int w = 0; w < width; w++) {
+        String marker2 = String.format("%s-%04d", marker, w);
+        Path child = new Path(current, dirPrefix + marker2);
+        results.add(createSubdirs(fs, child, depth - 1, width, files,
+            filesize, dirPrefix, filePrefix, marker2));
+        results.add(fs, child);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Predicate to determine if two lists are equivalent, that is, they
+   * contain the same entries.
+   * @param left first collection of paths
+   * @param right second collection of paths
+   * @return true if all entries are in each collection of path.
+   */
+  public static boolean collectionsEquivalent(Collection<Path> left,
+      Collection<Path> right) {
+    Set<Path> leftSet = new HashSet<>(left);
+    Set<Path> rightSet = new HashSet<>(right);
+    return leftSet.containsAll(right) && rightSet.containsAll(left);
+  }
+
+  /**
+   * Predicate to determine if two lists are equivalent, that is, they
+   * contain the same entries.
+   * @param left first collection of paths
+   * @param right second collection of paths
+   * @return true if all entries are in each collection of path.
+   */
+  public static boolean collectionsEquivalentNoDuplicates(Collection<Path> left,
+      Collection<Path> right) {
+    return collectionsEquivalent(left, right) &&
+        !containsDuplicates(left) && !containsDuplicates(right);
+  }
+
+
+  /**
+   * Predicate to test for a collection of paths containing duplicate entries.
+   * @param paths collection of paths
+   * @return true if there are duplicates.
+   */
+  public static boolean containsDuplicates(Collection<Path> paths) {
+    return new HashSet<>(paths).size() != paths.size();
+  }
+
+  /**
+   * Recursively list all entries, with a depth first traversal of the
+   * directory tree.
+   * @param path path
+   * @return the number of entries listed
+   * @throws IOException IO problems
+   */
+  public static TreeScanResults treeWalk(FileSystem fs, Path path)
+      throws IOException {
+    TreeScanResults dirsAndFiles = new TreeScanResults();
+
+    FileStatus[] statuses = fs.listStatus(path);
+    for (FileStatus status : statuses) {
+      LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : "");
+    }
+    for (FileStatus status : statuses) {
+      dirsAndFiles.add(status);
+      if (status.isDirectory()) {
+        dirsAndFiles.add(treeWalk(fs, status.getPath()));
+      }
+    }
+    return dirsAndFiles;
+  }
+
+  /**
+   * Results of recursive directory creation/scan operations.
+   */
+  public static final class TreeScanResults {
+
+    private Path basePath;
+    private final List<Path> files = new ArrayList<>();
+    private final List<Path> directories = new ArrayList<>();
+    private final List<Path> other = new ArrayList<>();
+
+
+    public TreeScanResults() {
+    }
+
+    public TreeScanResults(Path basePath) {
+      this.basePath = basePath;
+    }
+
+    /**
+     * Build from a located file status iterator.
+     * @param results results of the listFiles/listStatus call.
+     * @throws IOException IO problems during the iteration.
+     */
+    public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
+        throws IOException {
+      while (results.hasNext()) {
+        add(results.next());
+      }
+    }
+
+    /**
+     * Construct results from an array of statistics.
+     * @param stats statistics array. Must not be null.
+     */
+    public TreeScanResults(FileStatus[] stats) {
+      assertNotNull("Null file status array", stats);
+      for (FileStatus stat : stats) {
+        add(stat);
+      }
+    }
+
+    /**
+     * Add all paths in the other set of results to this instance.
+     * @param that the other instance
+     * @return this instance
+     */
+    public TreeScanResults add(TreeScanResults that) {
+      files.addAll(that.files);
+      directories.addAll(that.directories);
+      other.addAll(that.other);
+      return this;
+    }
+
+    /**
+     * Increment the counters based on the file status.
+     * @param status path status to count.
+     */
+    public void add(FileStatus status) {
+      if (status.isFile()) {
+        files.add(status.getPath());
+      } else if (status.isDirectory()) {
+        directories.add(status.getPath());
+      } else {
+        other.add(status.getPath());
+      }
+    }
+
+    public void add(FileSystem fs, Path path) throws IOException {
+      add(fs.getFileStatus(path));
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%d director%s and %d file%s",
+          getDirCount(),
+          getDirCount() == 1 ? "y" : "ies",
+          getFileCount(),
+          getFileCount() == 1 ? "" : "s");
+    }
+
+    /**
+     * Assert that the state of a listing has the specific number of files,
+     * directories and other entries. The error text will include
+     * the {@code text} param, the field in question, and the entire object's
+     * string value.
+     * @param text text prefix for assertions.
+     * @param f file count
+     * @param d expected directory count
+     * @param o expected other entries.
+     */
+    public void assertSizeEquals(String text, long f, long d, long o) {
+      String self = toString();
+      Assert.assertEquals(text + ": file count in " + self,
+          f, getFileCount());
+      Assert.assertEquals(text + ": directory count in " + self,
+          d, getDirCount());
+      Assert.assertEquals(text + ": 'other' count in " + self,
+          o, getOtherCount());
+    }
+
+    /**
+     * Assert that the trees are equivalent: that every list matches (and
+     * that neither has any duplicates).
+     * @param that the other entry
+     */
+    public void assertEquivalent(TreeScanResults that) {
+      String details = "this= " + this + "; that=" + that;
+      assertFieldsEquivalent("files", that, files, that.files);
+      assertFieldsEquivalent("directories", that,
+          directories, that.directories);
+      assertFieldsEquivalent("other", that, other, that.other);
+    }
+
+    /**
+     * Assert that a field in two instances are equivalent.
+     * @param fieldname field name for error messages
+     * @param that the other instance to scan
+     * @param ours our field's contents
+     * @param theirs the other instance's field constants
+     */
+    public void assertFieldsEquivalent(String fieldname,
+        TreeScanResults that,
+        List<Path> ours, List<Path> theirs) {
+      assertFalse("Duplicate  " + files + " in " + this,
+          containsDuplicates(ours));
+      assertFalse("Duplicate  " + files + " in other " + that,
+          containsDuplicates(theirs));
+      assertTrue(fieldname + " mismatch: between {" + this + "}" +
+              " and {" + that + "}",
+          collectionsEquivalent(files, that.files));
+    }
+
+    public List<Path> getFiles() {
+      return files;
+    }
+
+    public List<Path> getDirectories() {
+      return directories;
+    }
+
+    public List<Path> getOther() {
+      return other;
+    }
+
+    public Path getBasePath() {
+      return basePath;
+    }
+
+    public long getFileCount() {
+      return files.size();
+    }
+
+    public long getDirCount() {
+      return directories.size();
+    }
+
+    public long getOtherCount() {
+      return other.size();
+    }
+
+    /**
+     * Total count of entries.
+     * @return the total number of entries
+     */
+    public long totalCount() {
+      return getFileCount() + getDirCount() + getOtherCount();
+    }
+
+  }
+
+  /**
+   * A simple class for timing operations in nanoseconds, and for
+   * printing some useful results in the process.
+   */
+  public static final class NanoTimer {
+    private final long startTime;
+    private long endTime;
+
+    public NanoTimer() {
+      startTime = now();
+    }
+
+    /**
+     * End the operation.
+     * @return the duration of the operation
+     */
+    public long end() {
+      endTime = now();
+      return duration();
+    }
+
+    /**
+     * End the operation; log the duration.
+     * @param format message
+     * @param args any arguments
+     * @return the duration of the operation
+     */
+    public long end(String format, Object... args) {
+      long d = end();
+      LOG.info("Duration of {}: {} nS",
+          String.format(format, args), toHuman(d));
+      return d;
+    }
+
+    public long now() {
+      return System.nanoTime();
+    }
+
+    public long duration() {
+      return endTime - startTime;
+    }
+
+    public double bandwidth(long bytes) {
+      return bandwidthMBs(bytes, duration());
+    }
+
+    /**
+     * Bandwidth as bytes per second.
+     * @param bytes bytes in
+     * @return the number of bytes per second this operation timed.
+     */
+    public double bandwidthBytes(long bytes) {
+      return (bytes * 1.0) / duration();
+    }
+
+    /**
+     * How many nanoseconds per IOP, byte, etc.
+     * @param operations operations processed in this time period
+     * @return the nanoseconds it took each byte to be processed
+     */
+    public long nanosPerOperation(long operations) {
+      return duration() / operations;
+    }
+
+    /**
+     * Get a description of the bandwidth, even down to fractions of
+     * a MB.
+     * @param bytes bytes processed
+     * @return bandwidth
+     */
+    public String bandwidthDescription(long bytes) {
+      return String.format("%,.6f", bandwidth(bytes));
+    }
+
+    public long getStartTime() {
+      return startTime;
+    }
+
+    public long getEndTime() {
+      return endTime;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java
new file mode 100644
index 0000000..0ce022a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.transfer.Upload;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+
+import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
+import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
+
+/**
+ * Listener to progress from AWS regarding transfers.
+ */
+public class ProgressableProgressListener implements ProgressListener {
+  private static final Logger LOG = S3AFileSystem.LOG;
+  private final S3AFileSystem fs;
+  private final String key;
+  private final Progressable progress;
+  private long lastBytesTransferred;
+  private final Upload upload;
+
+  /**
+   * Instantiate.
+   * @param fs filesystem: will be invoked with statistics updates
+   * @param key key for the upload
+   * @param upload source of events
+   * @param progress optional callback for progress.
+   */
+  public ProgressableProgressListener(S3AFileSystem fs,
+      String key,
+      Upload upload,
+      Progressable progress) {
+    this.fs = fs;
+    this.key = key;
+    this.upload = upload;
+    this.progress = progress;
+    this.lastBytesTransferred = 0;
+  }
+
+  @Override
+  public void progressChanged(ProgressEvent progressEvent) {
+    if (progress != null) {
+      progress.progress();
+    }
+
+    // There are 3 http ops here, but this should be close enough for now
+    ProgressEventType pet = progressEvent.getEventType();
+    if (pet == TRANSFER_PART_STARTED_EVENT ||
+        pet == TRANSFER_COMPLETED_EVENT) {
+      fs.incrementWriteOperations();
+    }
+
+    long transferred = upload.getProgress().getBytesTransferred();
+    long delta = transferred - lastBytesTransferred;
+    fs.incrementPutProgressStatistics(key, delta);
+    lastBytesTransferred = transferred;
+  }
+
+  /**
+   * Method to invoke after upload has completed.
+   * This can handle race conditions in setup/teardown.
+   * @return the number of bytes which were transferred after the notification
+   */
+  public long uploadCompleted() {
+    long delta = upload.getProgress().getBytesTransferred() -
+        lastBytesTransferred;
+    if (delta > 0) {
+      LOG.debug("S3A write delta changed after finished: {} bytes", delta);
+      fs.incrementPutProgressStatistics(key, delta);
+    }
+    return delta;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 6a59f3f..5509d36 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -35,10 +35,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
 
@@ -54,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
 
 /**
  * Upload files/parts asap directly from a memory buffer (instead of buffering
@@ -77,8 +76,6 @@ public class S3AFastOutputStream extends OutputStream {
   private final int multiPartThreshold;
   private final S3AFileSystem fs;
   private final CannedAccessControlList cannedACL;
-  private final FileSystem.Statistics statistics;
-  private final String serverSideEncryptionAlgorithm;
   private final ProgressListener progressListener;
   private final ListeningExecutorService executorService;
   private MultiPartUpload multiPartUpload;
@@ -98,28 +95,28 @@ public class S3AFastOutputStream extends OutputStream {
    * @param bucket S3 bucket name
    * @param key S3 key name
    * @param progress report progress in order to prevent timeouts
-   * @param statistics track FileSystem.Statistics on the performed operations
    * @param cannedACL used CannedAccessControlList
-   * @param serverSideEncryptionAlgorithm algorithm for server side encryption
    * @param partSize size of a single part in a multi-part upload (except
    * last part)
    * @param multiPartThreshold files at least this size use multi-part upload
    * @param threadPoolExecutor thread factory
    * @throws IOException on any problem
    */
-  public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
-      String bucket, String key, Progressable progress,
-      FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
-      String serverSideEncryptionAlgorithm, long partSize,
-      long multiPartThreshold, ExecutorService threadPoolExecutor)
+  public S3AFastOutputStream(AmazonS3Client client,
+      S3AFileSystem fs,
+      String bucket,
+      String key,
+      Progressable progress,
+      CannedAccessControlList cannedACL,
+      long partSize,
+      long multiPartThreshold,
+      ExecutorService threadPoolExecutor)
       throws IOException {
     this.bucket = bucket;
     this.key = key;
     this.client = client;
     this.fs = fs;
     this.cannedACL = cannedACL;
-    this.statistics = statistics;
-    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
     //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
     if (partSize > Integer.MAX_VALUE) {
       this.partSize = Integer.MAX_VALUE;
@@ -246,16 +243,17 @@ public class S3AFastOutputStream extends OutputStream {
       if (multiPartUpload == null) {
         putObject();
       } else {
-        if (buffer.size() > 0) {
+        int size = buffer.size();
+        if (size > 0) {
+          fs.incrementPutStartStatistics(size);
           //send last part
           multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
-              .toByteArray()), buffer.size());
+              .toByteArray()), size);
         }
         final List<PartETag> partETags = multiPartUpload
             .waitForAllPartUploads();
         multiPartUpload.complete(partETags);
       }
-      statistics.incrementWriteOps(1);
       // This will delete unnecessary fake parent directories
       fs.finishedWrite(key);
       LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
@@ -265,18 +263,19 @@ public class S3AFastOutputStream extends OutputStream {
     }
   }
 
+  /**
+   * Create the default metadata for a multipart upload operation.
+   * @return the metadata to use/extend.
+   */
   private ObjectMetadata createDefaultMetadata() {
-    ObjectMetadata om = new ObjectMetadata();
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-    }
-    return om;
+    return fs.newObjectMetadata();
   }
 
   private MultiPartUpload initiateMultiPartUpload() throws IOException {
-    final ObjectMetadata om = createDefaultMetadata();
     final InitiateMultipartUploadRequest initiateMPURequest =
-        new InitiateMultipartUploadRequest(bucket, key, om);
+        new InitiateMultipartUploadRequest(bucket,
+            key,
+            createDefaultMetadata());
     initiateMPURequest.setCannedACL(cannedACL);
     try {
       return new MultiPartUpload(
@@ -290,15 +289,18 @@ public class S3AFastOutputStream extends OutputStream {
     LOG.debug("Executing regular upload for bucket '{}' key '{}'",
         bucket, key);
     final ObjectMetadata om = createDefaultMetadata();
-    om.setContentLength(buffer.size());
-    final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
-        new ByteArrayInputStream(buffer.toByteArray()), om);
-    putObjectRequest.setCannedAcl(cannedACL);
+    final int size = buffer.size();
+    om.setContentLength(size);
+    final PutObjectRequest putObjectRequest =
+        fs.newPutObjectRequest(key,
+            om,
+            new ByteArrayInputStream(buffer.toByteArray()));
     putObjectRequest.setGeneralProgressListener(progressListener);
     ListenableFuture<PutObjectResult> putObjectResult =
         executorService.submit(new Callable<PutObjectResult>() {
           @Override
           public PutObjectResult call() throws Exception {
+            fs.incrementPutStartStatistics(size);
             return client.putObject(putObjectRequest);
           }
         });
@@ -306,7 +308,7 @@ public class S3AFastOutputStream extends OutputStream {
     try {
       putObjectResult.get();
     } catch (InterruptedException ie) {
-      LOG.warn("Interrupted object upload:" + ie, ie);
+      LOG.warn("Interrupted object upload: {}", ie, ie);
       Thread.currentThread().interrupt();
     } catch (ExecutionException ee) {
       throw extractException("regular upload", key, ee);
@@ -339,7 +341,7 @@ public class S3AFastOutputStream extends OutputStream {
             public PartETag call() throws Exception {
               LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
                   uploadId);
-              return client.uploadPart(request).getPartETag();
+              return fs.uploadPart(request).getPartETag();
             }
           });
       partETagsFutures.add(partETagFuture);
@@ -349,7 +351,7 @@ public class S3AFastOutputStream extends OutputStream {
       try {
         return Futures.allAsList(partETagsFutures).get();
       } catch (InterruptedException ie) {
-        LOG.warn("Interrupted partUpload:" + ie, ie);
+        LOG.warn("Interrupted partUpload: {}", ie, ie);
         Thread.currentThread().interrupt();
         return null;
       } catch (ExecutionException ee) {
@@ -382,11 +384,12 @@ public class S3AFastOutputStream extends OutputStream {
     public void abort() {
       LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
       try {
+        fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
         client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
             key, uploadId));
       } catch (Exception e2) {
         LOG.warn("Unable to abort multipart upload, you may need to purge  " +
-            "uploaded parts: " + e2, e2);
+            "uploaded parts: {}", e2, e2);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
index 9ecca33..75a6500 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -93,4 +93,11 @@ public class S3AFileStatus extends FileStatus {
       return super.getModificationTime();
     }
   }
+
+  @Override
+  public String toString() {
+    return super.toString() +
+        String.format(" isEmptyDirectory=%s", isEmptyDirectory());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 28d9843..c028544 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -42,7 +42,6 @@ import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
@@ -50,6 +49,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
 import com.amazonaws.services.s3.transfer.Copy;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
@@ -68,8 +69,13 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.util.Progressable;
@@ -77,6 +83,7 @@ import org.apache.hadoop.util.VersionInfo;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +122,7 @@ public class S3AFileSystem extends FileSystem {
   private CannedAccessControlList cannedACL;
   private String serverSideEncryptionAlgorithm;
   private S3AInstrumentation instrumentation;
+  private S3AStorageStatistics storageStatistics;
   private long readAhead;
 
   // The maximum number of entries that can be deleted in any call to s3
@@ -182,6 +190,15 @@ public class S3AFileSystem extends FileSystem {
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
 
       readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
+      storageStatistics = (S3AStorageStatistics)
+          GlobalStorageStatistics.INSTANCE
+              .put(S3AStorageStatistics.NAME,
+                  new GlobalStorageStatistics.StorageStatisticsProvider() {
+                    @Override
+                    public StorageStatistics provide() {
+                      return new S3AStorageStatistics();
+                    }
+                  });
 
       int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
       if (maxThreads < 2) {
@@ -284,6 +301,14 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Get S3A Instrumentation. For test purposes.
+   * @return this instance's instrumentation.
+   */
+  public S3AInstrumentation getInstrumentation() {
+    return instrumentation;
+  }
+
+  /**
    * Initializes the User-Agent header to send in HTTP requests to the S3
    * back-end.  We always include the Hadoop version number.  The user also may
    * set an optional custom prefix to put in front of the Hadoop version number.
@@ -559,23 +584,26 @@ public class S3AFileSystem extends FileSystem {
     }
     instrumentation.fileCreated();
     if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
-      return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
-          key, progress, statistics, cannedACL,
-          serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
-          threadPoolExecutor), statistics);
+      return new FSDataOutputStream(
+          new S3AFastOutputStream(s3,
+              this,
+              bucket,
+              key,
+              progress,
+              cannedACL,
+              partSize,
+              multiPartThreshold,
+              threadPoolExecutor),
+          statistics);
     }
     // We pass null to FSDataOutputStream so it won't count writes that
     // are being buffered to a file
     return new FSDataOutputStream(
         new S3AOutputStream(getConf(),
-            transfers,
             this,
-            bucket,
             key,
-            progress,
-            cannedACL,
-            statistics,
-            serverSideEncryptionAlgorithm),
+            progress
+        ),
         null);
   }
 
@@ -631,6 +659,7 @@ public class S3AFileSystem extends FileSystem {
   private boolean innerRename(Path src, Path dst) throws IOException,
       AmazonClientException {
     LOG.debug("Rename path {} to {}", src, dst);
+    incrementStatistic(INVOCATION_RENAME);
 
     String srcKey = pathToKey(src);
     String dstKey = pathToKey(dst);
@@ -731,8 +760,7 @@ public class S3AFileSystem extends FileSystem {
       request.setPrefix(srcKey);
       request.setMaxKeys(maxKeys);
 
-      ObjectListing objects = s3.listObjects(request);
-      statistics.incrementReadOps(1);
+      ObjectListing objects = listObjects(request);
 
       while (true) {
         for (S3ObjectSummary summary : objects.getObjectSummaries()) {
@@ -746,8 +774,7 @@ public class S3AFileSystem extends FileSystem {
         }
 
         if (objects.isTruncated()) {
-          objects = s3.listNextBatchOfObjects(objects);
-          statistics.incrementReadOps(1);
+          objects = continueListObjects(objects);
         } else {
           if (!keysToDelete.isEmpty()) {
             removeKeys(keysToDelete, false);
@@ -776,17 +803,223 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Increment a statistic by 1.
+   * @param statistic The operation to increment
+   */
+  protected void incrementStatistic(Statistic statistic) {
+    incrementStatistic(statistic, 1);
+  }
+
+  /**
+   * Increment a statistic by a specific value.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementStatistic(Statistic statistic, long count) {
+    instrumentation.incrementCounter(statistic, count);
+    storageStatistics.incrementCounter(statistic, count);
+  }
+
+  /**
    * Request object metadata; increments counters in the process.
    * @param key key
    * @return the metadata
    */
-  private ObjectMetadata getObjectMetadata(String key) {
+  protected ObjectMetadata getObjectMetadata(String key) {
+    incrementStatistic(OBJECT_METADATA_REQUESTS);
     ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
-    statistics.incrementReadOps(1);
+    incrementReadOperations();
     return meta;
   }
 
   /**
+   * Initiate a {@code listObjects} operation, incrementing metrics
+   * in the process.
+   * @param request request to initiate
+   * @return the results
+   */
+  protected ObjectListing listObjects(ListObjectsRequest request) {
+    incrementStatistic(OBJECT_LIST_REQUESTS);
+    incrementReadOperations();
+    return s3.listObjects(request);
+  }
+
+  /**
+   * List the next set of objects.
+   * @param objects paged result
+   * @return the next result object
+   */
+  protected ObjectListing continueListObjects(ObjectListing objects) {
+    incrementStatistic(OBJECT_LIST_REQUESTS);
+    incrementReadOperations();
+    return s3.listNextBatchOfObjects(objects);
+  }
+
+  /**
+   * Increment read operations.
+   */
+  public void incrementReadOperations() {
+    statistics.incrementReadOps(1);
+  }
+
+  /**
+   * Increment the write operation counter.
+   * This is somewhat inaccurate, as it appears to be invoked more
+   * often than needed in progress callbacks.
+   */
+  public void incrementWriteOperations() {
+    statistics.incrementWriteOps(1);
+  }
+
+  /**
+   * Delete an object.
+   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+   * operation statistics.
+   * @param key key to blob to delete.
+   */
+  private void deleteObject(String key) {
+    incrementWriteOperations();
+    incrementStatistic(OBJECT_DELETE_REQUESTS);
+    s3.deleteObject(bucket, key);
+  }
+
+  /**
+   * Perform a bulk object delete operation.
+   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+   * operation statistics.
+   * @param deleteRequest keys to delete on the s3-backend
+   */
+  private void deleteObjects(DeleteObjectsRequest deleteRequest) {
+    incrementWriteOperations();
+    incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
+    s3.deleteObjects(deleteRequest);
+  }
+
+  /**
+   * Create a putObject request.
+   * Adds the ACL and metadata
+   * @param key key of object
+   * @param metadata metadata header
+   * @param srcfile source file
+   * @return the request
+   */
+  public PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata, File srcfile) {
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+        srcfile);
+    putObjectRequest.setCannedAcl(cannedACL);
+    putObjectRequest.setMetadata(metadata);
+    return putObjectRequest;
+  }
+
+  /**
+   * Create a {@link PutObjectRequest} request.
+   * The metadata is assumed to have been configured with the size of the
+   * operation.
+   * @param key key of object
+   * @param metadata metadata header
+   * @param inputStream source data.
+   * @return the request
+   */
+  PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata, InputStream inputStream) {
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+        inputStream, metadata);
+    putObjectRequest.setCannedAcl(cannedACL);
+    return putObjectRequest;
+  }
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   * @return a new metadata instance
+   */
+  public ObjectMetadata newObjectMetadata() {
+    final ObjectMetadata om = new ObjectMetadata();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
+    }
+    return om;
+  }
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   *
+   * @param length length of data to set in header.
+   * @return a new metadata instance
+   */
+  public ObjectMetadata newObjectMetadata(long length) {
+    final ObjectMetadata om = newObjectMetadata();
+    om.setContentLength(length);
+    return om;
+  }
+
+  /**
+   * PUT an object, incrementing the put requests and put bytes
+   * counters.
+   * It does not update the other counters,
+   * as existing code does that as progress callbacks come in.
+   * Byte length is calculated from the file length, or, if there is no
+   * file, from the content length of the header.
+   * @param putObjectRequest the request
+   * @return the upload initiated
+   */
+  public Upload putObject(PutObjectRequest putObjectRequest) {
+    long len;
+    if (putObjectRequest.getFile() != null) {
+      len = putObjectRequest.getFile().length();
+    } else {
+      len = putObjectRequest.getMetadata().getContentLength();
+    }
+    incrementPutStartStatistics(len);
+    return transfers.upload(putObjectRequest);
+  }
+
+  /**
+   * Upload part of a multi-partition file.
+   * Increments the write and put counters
+   * @param request request
+   * @return the result of the operation.
+   */
+  public UploadPartResult uploadPart(UploadPartRequest request) {
+    incrementPutStartStatistics(request.getPartSize());
+    return s3.uploadPart(request);
+  }
+
+  /**
+   * At the start of a put/multipart upload operation, update the
+   * relevant counters.
+   *
+   * @param bytes bytes in the request.
+   */
+  public void incrementPutStartStatistics(long bytes) {
+    LOG.debug("PUT start {} bytes", bytes);
+    incrementWriteOperations();
+    incrementStatistic(OBJECT_PUT_REQUESTS);
+    if (bytes > 0) {
+      incrementStatistic(OBJECT_PUT_BYTES, bytes);
+    }
+  }
+
+  /**
+   * Callback for use in progress callbacks from put/multipart upload events.
+   * Increments those statistics which are expected to be updated during
+   * the ongoing upload operation.
+   * @param key key to file that is being written (for logging)
+   * @param bytes bytes successfully uploaded.
+   */
+  public void incrementPutProgressStatistics(String key, long bytes) {
+    LOG.debug("PUT {}: {} bytes", key, bytes);
+    incrementWriteOperations();
+    if (bytes > 0) {
+      statistics.incrementBytesWritten(bytes);
+    }
+  }
+
+  /**
    * A helper method to delete a list of keys on a s3-backend.
    *
    * @param keysToDelete collection of keys to delete on the s3-backend
@@ -796,21 +1029,13 @@ public class S3AFileSystem extends FileSystem {
   private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
           boolean clearKeys) throws AmazonClientException {
     if (enableMultiObjectsDelete) {
-      DeleteObjectsRequest deleteRequest
-          = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
-      s3.deleteObjects(deleteRequest);
+      deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete));
       instrumentation.fileDeleted(keysToDelete.size());
-      statistics.incrementWriteOps(1);
     } else {
-      int writeops = 0;
-
       for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
-        s3.deleteObject(
-            new DeleteObjectRequest(bucket, keyVersion.getKey()));
-        writeops++;
+        deleteObject(keyVersion.getKey());
       }
       instrumentation.fileDeleted(keysToDelete.size());
-      statistics.incrementWriteOps(writeops);
     }
     if (clearKeys) {
       keysToDelete.clear();
@@ -880,9 +1105,8 @@ public class S3AFileSystem extends FileSystem {
 
       if (status.isEmptyDirectory()) {
         LOG.debug("Deleting fake empty directory {}", key);
-        s3.deleteObject(bucket, key);
+        deleteObject(key);
         instrumentation.directoryDeleted();
-        statistics.incrementWriteOps(1);
       } else {
         LOG.debug("Getting objects for directory prefix {} to delete", key);
 
@@ -893,9 +1117,9 @@ public class S3AFileSystem extends FileSystem {
         //request.setDelimiter("/");
         request.setMaxKeys(maxKeys);
 
-        List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>();
-        ObjectListing objects = s3.listObjects(request);
-        statistics.incrementReadOps(1);
+        ObjectListing objects = listObjects(request);
+        List<DeleteObjectsRequest.KeyVersion> keys =
+            new ArrayList<>(objects.getObjectSummaries().size());
         while (true) {
           for (S3ObjectSummary summary : objects.getObjectSummaries()) {
             keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
@@ -907,8 +1131,7 @@ public class S3AFileSystem extends FileSystem {
           }
 
           if (objects.isTruncated()) {
-            objects = s3.listNextBatchOfObjects(objects);
-            statistics.incrementReadOps(1);
+            objects = continueListObjects(objects);
           } else {
             if (!keys.isEmpty()) {
               removeKeys(keys, false);
@@ -919,13 +1142,11 @@ public class S3AFileSystem extends FileSystem {
       }
     } else {
       LOG.debug("delete: Path is a file");
-      s3.deleteObject(bucket, key);
       instrumentation.fileDeleted(1);
-      statistics.incrementWriteOps(1);
+      deleteObject(key);
     }
 
     createFakeDirectoryIfNecessary(f.getParent());
-
     return true;
   }
 
@@ -934,7 +1155,7 @@ public class S3AFileSystem extends FileSystem {
     String key = pathToKey(f);
     if (!key.isEmpty() && !exists(f)) {
       LOG.debug("Creating new fake directory at {}", f);
-      createFakeDirectory(bucket, key);
+      createFakeDirectory(key);
     }
   }
 
@@ -970,6 +1191,7 @@ public class S3AFileSystem extends FileSystem {
       IOException, AmazonClientException {
     String key = pathToKey(f);
     LOG.debug("List status for path: {}", f);
+    incrementStatistic(INVOCATION_LIST_STATUS);
 
     final List<FileStatus> result = new ArrayList<FileStatus>();
     final FileStatus fileStatus =  getFileStatus(f);
@@ -987,8 +1209,7 @@ public class S3AFileSystem extends FileSystem {
 
       LOG.debug("listStatus: doing listObjects for directory {}", key);
 
-      ObjectListing objects = s3.listObjects(request);
-      statistics.incrementReadOps(1);
+      ObjectListing objects = listObjects(request);
 
       Path fQualified = f.makeQualified(uri, workingDir);
 
@@ -999,33 +1220,25 @@ public class S3AFileSystem extends FileSystem {
           if (keyPath.equals(fQualified) ||
               summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
             LOG.debug("Ignoring: {}", keyPath);
-            continue;
-          }
-
-          if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
-            result.add(new S3AFileStatus(true, true, keyPath));
-            LOG.debug("Adding: fd: {}", keyPath);
           } else {
-            result.add(new S3AFileStatus(summary.getSize(),
-                dateToLong(summary.getLastModified()), keyPath,
-                getDefaultBlockSize(fQualified)));
-            LOG.debug("Adding: fi: {}", keyPath);
+            S3AFileStatus status = createFileStatus(keyPath, summary,
+                getDefaultBlockSize(keyPath));
+            result.add(status);
+            LOG.debug("Adding: {}", status);
           }
         }
 
         for (String prefix : objects.getCommonPrefixes()) {
           Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
-          if (keyPath.equals(f)) {
-            continue;
+          if (!keyPath.equals(f)) {
+            result.add(new S3AFileStatus(true, false, keyPath));
+            LOG.debug("Adding: rd: {}", keyPath);
           }
-          result.add(new S3AFileStatus(true, false, keyPath));
-          LOG.debug("Adding: rd: {}", keyPath);
         }
 
         if (objects.isTruncated()) {
           LOG.debug("listStatus: list truncated - getting next batch");
-          objects = s3.listNextBatchOfObjects(objects);
-          statistics.incrementReadOps(1);
+          objects = continueListObjects(objects);
         } else {
           break;
         }
@@ -1038,8 +1251,6 @@ public class S3AFileSystem extends FileSystem {
     return result.toArray(new FileStatus[result.size()]);
   }
 
-
-
   /**
    * Set the current working directory for the given file system. All relative
    * paths will be resolved relative to it.
@@ -1061,7 +1272,7 @@ public class S3AFileSystem extends FileSystem {
   /**
    *
    * Make the given path and all non-existent parents into
-   * directories. Has the semantics of Unix @{code 'mkdir -p'}.
+   * directories. Has the semantics of Unix {@code 'mkdir -p'}.
    * Existence of the directory hierarchy is not an error.
    * @param path path to create
    * @param permission to apply to f
@@ -1096,7 +1307,7 @@ public class S3AFileSystem extends FileSystem {
   private boolean innerMkdirs(Path f, FsPermission permission)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
     LOG.debug("Making directory: {}", f);
-
+    incrementStatistic(INVOCATION_MKDIRS);
     try {
       FileStatus fileStatus = getFileStatus(f);
 
@@ -1125,7 +1336,7 @@ public class S3AFileSystem extends FileSystem {
       } while (fPart != null);
 
       String key = pathToKey(f);
-      createFakeDirectory(bucket, key);
+      createFakeDirectory(key);
       return true;
     }
   }
@@ -1139,12 +1350,12 @@ public class S3AFileSystem extends FileSystem {
    */
   public S3AFileStatus getFileStatus(Path f) throws IOException {
     String key = pathToKey(f);
+    incrementStatistic(INVOCATION_GET_FILE_STATUS);
     LOG.debug("Getting path status for {}  ({})", f , key);
 
     if (!key.isEmpty()) {
       try {
-        ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
-        statistics.incrementReadOps(1);
+        ObjectMetadata meta = getObjectMetadata(key);
 
         if (objectRepresentsDirectory(key, meta.getContentLength())) {
           LOG.debug("Found exact file: fake directory");
@@ -1169,8 +1380,7 @@ public class S3AFileSystem extends FileSystem {
       if (!key.endsWith("/")) {
         String newKey = key + "/";
         try {
-          ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
-          statistics.incrementReadOps(1);
+          ObjectMetadata meta = getObjectMetadata(newKey);
 
           if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
             LOG.debug("Found file (with /): fake directory");
@@ -1203,8 +1413,7 @@ public class S3AFileSystem extends FileSystem {
       request.setDelimiter("/");
       request.setMaxKeys(1);
 
-      ObjectListing objects = s3.listObjects(request);
-      statistics.incrementReadOps(1);
+      ObjectListing objects = listObjects(request);
 
       if (!objects.getCommonPrefixes().isEmpty()
           || !objects.getObjectSummaries().isEmpty()) {
@@ -1287,7 +1496,8 @@ public class S3AFileSystem extends FileSystem {
   private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
       Path src, Path dst)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
-    String key = pathToKey(dst);
+    incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
+    final String key = pathToKey(dst);
 
     if (!overwrite && exists(dst)) {
       throw new FileAlreadyExistsException(dst + " already exists");
@@ -1298,35 +1508,19 @@ public class S3AFileSystem extends FileSystem {
     LocalFileSystem local = getLocal(getConf());
     File srcfile = local.pathToFile(src);
 
-    final ObjectMetadata om = new ObjectMetadata();
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-    }
-    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
-    putObjectRequest.setCannedAcl(cannedACL);
-    putObjectRequest.setMetadata(om);
-
-    ProgressListener progressListener = new ProgressListener() {
-      public void progressChanged(ProgressEvent progressEvent) {
-        switch (progressEvent.getEventType()) {
-          case TRANSFER_PART_COMPLETED_EVENT:
-            statistics.incrementWriteOps(1);
-            break;
-          default:
-            break;
-        }
-      }
-    };
-
-    statistics.incrementWriteOps(1);
-    Upload up = transfers.upload(putObjectRequest);
-    up.addProgressListener(progressListener);
+    final ObjectMetadata om = newObjectMetadata();
+    PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
+    Upload up = putObject(putObjectRequest);
+    ProgressableProgressListener listener = new ProgressableProgressListener(
+        this, key, up, null);
+    up.addProgressListener(listener);
     try {
       up.waitForUploadResult();
     } catch (InterruptedException e) {
       throw new InterruptedIOException("Interrupted copying " + src
           + " to "  + dst + ", cancelling");
     }
+    listener.uploadCompleted();
 
     // This will delete unnecessary fake parent directories
     finishedWrite(key);
@@ -1375,7 +1569,7 @@ public class S3AFileSystem extends FileSystem {
     LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
 
     try {
-      ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
+      ObjectMetadata srcom = getObjectMetadata(srcKey);
       ObjectMetadata dstom = cloneObjectMetadata(srcom);
       if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
         dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
@@ -1389,7 +1583,7 @@ public class S3AFileSystem extends FileSystem {
         public void progressChanged(ProgressEvent progressEvent) {
           switch (progressEvent.getEventType()) {
             case TRANSFER_PART_COMPLETED_EVENT:
-              statistics.incrementWriteOps(1);
+              incrementWriteOperations();
               break;
             default:
               break;
@@ -1401,7 +1595,7 @@ public class S3AFileSystem extends FileSystem {
       copy.addProgressListener(progressListener);
       try {
         copy.waitForCopyResult();
-        statistics.incrementWriteOps(1);
+        incrementWriteOperations();
         instrumentation.filesCopied(1, size);
       } catch (InterruptedException e) {
         throw new InterruptedIOException("Interrupted copying " + srcKey
@@ -1413,26 +1607,12 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
-  private boolean objectRepresentsDirectory(final String name, final long size) {
-    return !name.isEmpty()
-        && name.charAt(name.length() - 1) == '/'
-        && size == 0L;
-  }
-
-  // Handles null Dates that can be returned by AWS
-  private static long dateToLong(final Date date) {
-    if (date == null) {
-      return 0L;
-    }
-
-    return date.getTime();
-  }
-
   /**
    * Perform post-write actions.
    * @param key key written to
    */
   public void finishedWrite(String key) {
+    LOG.debug("Finished write to {}", key);
     deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
   }
 
@@ -1454,8 +1634,7 @@ public class S3AFileSystem extends FileSystem {
 
         if (status.isDirectory() && status.isEmptyDirectory()) {
           LOG.debug("Deleting fake directory {}/", key);
-          s3.deleteObject(bucket, key + "/");
-          statistics.incrementWriteOps(1);
+          deleteObject(key + "/");
         }
       } catch (IOException | AmazonClientException e) {
         LOG.debug("While deleting key {} ", key, e);
@@ -1471,18 +1650,20 @@ public class S3AFileSystem extends FileSystem {
   }
 
 
-  private void createFakeDirectory(final String bucketName, final String objectName)
-      throws AmazonClientException, AmazonServiceException {
+  private void createFakeDirectory(final String objectName)
+      throws AmazonClientException, AmazonServiceException,
+      InterruptedIOException {
     if (!objectName.endsWith("/")) {
-      createEmptyObject(bucketName, objectName + "/");
+      createEmptyObject(objectName + "/");
     } else {
-      createEmptyObject(bucketName, objectName);
+      createEmptyObject(objectName);
     }
   }
 
   // Used to create an empty file that represents an empty directory
-  private void createEmptyObject(final String bucketName, final String objectName)
-      throws AmazonClientException, AmazonServiceException {
+  private void createEmptyObject(final String objectName)
+      throws AmazonClientException, AmazonServiceException,
+      InterruptedIOException {
     final InputStream im = new InputStream() {
       @Override
       public int read() throws IOException {
@@ -1490,16 +1671,16 @@ public class S3AFileSystem extends FileSystem {
       }
     };
 
-    final ObjectMetadata om = new ObjectMetadata();
-    om.setContentLength(0L);
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
+    PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
+        newObjectMetadata(0L),
+        im);
+    Upload upload = putObject(putObjectRequest);
+    try {
+      upload.waitForUploadResult();
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException("Interrupted creating " + objectName);
     }
-    PutObjectRequest putObjectRequest =
-        new PutObjectRequest(bucketName, objectName, im, om);
-    putObjectRequest.setCannedAcl(cannedACL);
-    s3.putObject(putObjectRequest);
-    statistics.incrementWriteOps(1);
+    incrementPutProgressStatistics(objectName, 0);
     instrumentation.directoryCreated();
   }
 
@@ -1514,10 +1695,7 @@ public class S3AFileSystem extends FileSystem {
     // This approach may be too brittle, especially if
     // in future there are new attributes added to ObjectMetadata
     // that we do not explicitly call to set here
-    ObjectMetadata ret = new ObjectMetadata();
-
-    // Non null attributes
-    ret.setContentLength(source.getContentLength());
+    ObjectMetadata ret = newObjectMetadata(source.getContentLength());
 
     // Possibly null attributes
     // Allowing nulls to pass breaks it during later use
@@ -1627,6 +1805,75 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    incrementStatistic(INVOCATION_GLOB_STATUS);
+    return super.globStatus(pathPattern);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+      throws IOException {
+    incrementStatistic(INVOCATION_GLOB_STATUS);
+    return super.globStatus(pathPattern, filter);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+      throws FileNotFoundException, IOException {
+    incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
+    return super.listLocatedStatus(f);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listFiles(Path f,
+      boolean recursive) throws FileNotFoundException, IOException {
+    incrementStatistic(INVOCATION_LIST_FILES);
+    return super.listFiles(f, recursive);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean exists(Path f) throws IOException {
+    incrementStatistic(INVOCATION_EXISTS);
+    return super.exists(f);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isDirectory(Path f) throws IOException {
+    incrementStatistic(INVOCATION_IS_DIRECTORY);
+    return super.isDirectory(f);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isFile(Path f) throws IOException {
+    incrementStatistic(INVOCATION_IS_FILE);
+    return super.isFile(f);
+  }
+
+  /**
    * Get a integer option >= the minimum allowed value.
    * @param conf configuration
    * @param key key to look up

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 285f228..8892f0e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.MetricStringBuilder;
@@ -26,49 +27,30 @@ import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableMetric;
 
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
 /**
  * Instrumentation of S3a.
- * Derived from the {@code AzureFileSystemInstrumentation}
+ * Derived from the {@code AzureFileSystemInstrumentation}.
+ *
+ * Counters and metrics are generally addressed in code by their name or
+ * {@link Statistic} key. There <i>may</i> be some Statistics which do
+ * not have an entry here. To avoid attempts to access such counters failing,
+ * the operations to increment/query metric values are designed to handle
+ * lookup failures.
  */
 @Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AInstrumentation {
   public static final String CONTEXT = "S3AFileSystem";
-
-  public static final String STREAM_OPENED = "streamOpened";
-  public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations";
-  public static final String STREAM_CLOSED = "streamClosed";
-  public static final String STREAM_ABORTED = "streamAborted";
-  public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions";
-  public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations";
-  public static final String STREAM_FORWARD_SEEK_OPERATIONS
-      = "streamForwardSeekOperations";
-  public static final String STREAM_BACKWARD_SEEK_OPERATIONS
-      = "streamBackwardSeekOperations";
-  public static final String STREAM_SEEK_BYTES_SKIPPED =
-      "streamBytesSkippedOnSeek";
-  public static final String STREAM_SEEK_BYTES_BACKWARDS =
-      "streamBytesBackwardsOnSeek";
-  public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead";
-  public static final String STREAM_READ_OPERATIONS = "streamReadOperations";
-  public static final String STREAM_READ_FULLY_OPERATIONS
-      = "streamReadFullyOperations";
-  public static final String STREAM_READ_OPERATIONS_INCOMPLETE
-      = "streamReadOperationsIncomplete";
-  public static final String FILES_CREATED = "files_created";
-  public static final String FILES_COPIED = "files_copied";
-  public static final String FILES_COPIED_BYTES = "files_copied_bytes";
-  public static final String FILES_DELETED = "files_deleted";
-  public static final String DIRECTORIES_CREATED = "directories_created";
-  public static final String DIRECTORIES_DELETED = "directories_deleted";
-  public static final String IGNORED_ERRORS = "ignored_errors";
   private final MetricsRegistry registry =
       new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
   private final MutableCounterLong streamOpenOperations;
@@ -95,6 +77,27 @@ public class S3AInstrumentation {
   private final MutableCounterLong numberOfDirectoriesDeleted;
   private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>();
 
+  private static final Statistic[] COUNTERS_TO_CREATE = {
+      INVOCATION_COPY_FROM_LOCAL_FILE,
+      INVOCATION_EXISTS,
+      INVOCATION_GET_FILE_STATUS,
+      INVOCATION_GLOB_STATUS,
+      INVOCATION_IS_DIRECTORY,
+      INVOCATION_IS_FILE,
+      INVOCATION_LIST_FILES,
+      INVOCATION_LIST_LOCATED_STATUS,
+      INVOCATION_LIST_STATUS,
+      INVOCATION_MKDIRS,
+      INVOCATION_RENAME,
+      OBJECT_COPY_REQUESTS,
+      OBJECT_DELETE_REQUESTS,
+      OBJECT_LIST_REQUESTS,
+      OBJECT_METADATA_REQUESTS,
+      OBJECT_MULTIPART_UPLOAD_ABORTED,
+      OBJECT_PUT_BYTES,
+      OBJECT_PUT_REQUESTS
+  };
+
   public S3AInstrumentation(URI name) {
     UUID fileSystemInstanceId = UUID.randomUUID();
     registry.tag("FileSystemId",
@@ -103,50 +106,35 @@ public class S3AInstrumentation {
     registry.tag("fsURI",
         "URI of this filesystem",
         name.toString());
-    streamOpenOperations = streamCounter(STREAM_OPENED,
-        "Total count of times an input stream to object store was opened");
-    streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS,
-        "Total count of times an attempt to close a data stream was made");
-    streamClosed = streamCounter(STREAM_CLOSED,
-        "Count of times the TCP stream was closed");
-    streamAborted = streamCounter(STREAM_ABORTED,
-        "Count of times the TCP stream was aborted");
-    streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS,
-        "Number of seek operations invoked on input streams");
-    streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS,
-        "Number of read exceptions caught and attempted to recovered from");
-    streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS,
-        "Number of executed seek operations which went forward in a stream");
-    streamBackwardSeekOperations = streamCounter(
-        STREAM_BACKWARD_SEEK_OPERATIONS,
-        "Number of executed seek operations which went backwards in a stream");
-    streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED,
-        "Count of bytes skipped during forward seek operations");
-    streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS,
-        "Count of bytes moved backwards during seek operations");
-    streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ,
-        "Count of bytes read during seek() in stream operations");
-    streamReadOperations = streamCounter(STREAM_READ_OPERATIONS,
-        "Count of read() operations in streams");
-    streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS,
-        "Count of readFully() operations in streams");
-    streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE,
-        "Count of incomplete read() operations in streams");
-
-    numberOfFilesCreated = counter(FILES_CREATED,
-            "Total number of files created through the object store.");
-    numberOfFilesCopied = counter(FILES_COPIED,
-            "Total number of files copied within the object store.");
-    bytesOfFilesCopied = counter(FILES_COPIED_BYTES,
-            "Total number of bytes copied within the object store.");
-    numberOfFilesDeleted = counter(FILES_DELETED,
-            "Total number of files deleted through from the object store.");
-    numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED,
-        "Total number of directories created through the object store.");
-    numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED,
-        "Total number of directories deleted through the object store.");
-    ignoredErrors = counter(IGNORED_ERRORS,
-        "Total number of errors caught and ingored.");
+    streamOpenOperations = streamCounter(STREAM_OPENED);
+    streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
+    streamClosed = streamCounter(STREAM_CLOSED);
+    streamAborted = streamCounter(STREAM_ABORTED);
+    streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS);
+    streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS);
+    streamForwardSeekOperations =
+        streamCounter(STREAM_FORWARD_SEEK_OPERATIONS);
+    streamBackwardSeekOperations =
+        streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS);
+    streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED);
+    streamBytesBackwardsOnSeek =
+        streamCounter(STREAM_SEEK_BYTES_BACKWARDS);
+    streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ);
+    streamReadOperations = streamCounter(STREAM_READ_OPERATIONS);
+    streamReadFullyOperations =
+        streamCounter(STREAM_READ_FULLY_OPERATIONS);
+    streamReadsIncomplete =
+        streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
+    numberOfFilesCreated = counter(FILES_CREATED);
+    numberOfFilesCopied = counter(FILES_COPIED);
+    bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
+    numberOfFilesDeleted = counter(FILES_DELETED);
+    numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED);
+    numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED);
+    ignoredErrors = counter(IGNORED_ERRORS);
+    for (Statistic statistic : COUNTERS_TO_CREATE) {
+      counter(statistic);
+    }
   }
 
   /**
@@ -174,6 +162,25 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Create a counter in the registry.
+   * @param op statistic to count
+   * @return a new counter
+   */
+  protected final MutableCounterLong counter(Statistic op) {
+    return counter(op.getSymbol(), op.getDescription());
+  }
+
+  /**
+   * Create a counter in the stream map: these are unregistered in the public
+   * metrics.
+   * @param op statistic to count
+   * @return a new counter
+   */
+  protected final MutableCounterLong streamCounter(Statistic op) {
+    return streamCounter(op.getSymbol(), op.getDescription());
+  }
+
+  /**
    * Create a gauge in the registry.
    * @param name name gauge name
    * @param desc description
@@ -216,6 +223,58 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Get the value of a counter.
+   * @param statistic the operation
+   * @return its value, or 0 if not found.
+   */
+  public long getCounterValue(Statistic statistic) {
+    return getCounterValue(statistic.getSymbol());
+  }
+
+  /**
+   * Get the value of a counter.
+   * If the counter is null, return 0.
+   * @param name the name of the counter
+   * @return its value.
+   */
+  public long getCounterValue(String name) {
+    MutableCounterLong counter = lookupCounter(name);
+    return counter == null ? 0 : counter.value();
+  }
+
+  /**
+   * Lookup a counter by name. Return null if it is not known.
+   * @param name counter name
+   * @return the counter
+   */
+  private MutableCounterLong lookupCounter(String name) {
+    MutableMetric metric = lookupMetric(name);
+    if (metric == null) {
+      return null;
+    }
+    Preconditions.checkNotNull(metric, "not found: " + name);
+    if (!(metric instanceof MutableCounterLong)) {
+      throw new IllegalStateException("Metric " + name
+          + " is not a MutableCounterLong: " + metric);
+    }
+    return (MutableCounterLong) metric;
+  }
+
+  /**
+   * Look up a metric from both the registered set and the lighter weight
+   * stream entries.
+   * @param name metric name
+   * @return the metric or null
+   */
+  public MutableMetric lookupMetric(String name) {
+    MutableMetric metric = getRegistry().get(name);
+    if (metric == null) {
+      metric = streamMetrics.get(name);
+    }
+    return metric;
+  }
+
+  /**
    * Indicate that S3A created a file.
    */
   public void fileCreated() {
@@ -263,6 +322,19 @@ public class S3AInstrumentation {
   }
 
   /**
+   * Increment a specific counter.
+   * No-op if not defined.
+   * @param op operation
+   * @param count increment value
+   */
+  public void incrementCounter(Statistic op, long count) {
+    MutableCounterLong counter = lookupCounter(op.getSymbol());
+    if (counter != null) {
+      counter.incr(count);
+    }
+  }
+
+  /**
    * Create a stream input statistics instance.
    * @return the new instance
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 593e9e8..23ba682 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -19,19 +19,11 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.event.ProgressEvent;
-import com.amazonaws.event.ProgressEventType;
-import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.Upload;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.util.Progressable;
 
@@ -44,8 +36,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 
-import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
-import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
@@ -59,32 +49,20 @@ public class S3AOutputStream extends OutputStream {
   private File backupFile;
   private boolean closed;
   private String key;
-  private String bucket;
-  private TransferManager transfers;
   private Progressable progress;
   private long partSize;
   private long partSizeThreshold;
   private S3AFileSystem fs;
-  private CannedAccessControlList cannedACL;
-  private FileSystem.Statistics statistics;
   private LocalDirAllocator lDirAlloc;
-  private String serverSideEncryptionAlgorithm;
 
   public static final Logger LOG = S3AFileSystem.LOG;
 
-  public S3AOutputStream(Configuration conf, TransferManager transfers,
-      S3AFileSystem fs, String bucket, String key, Progressable progress,
-      CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
-      String serverSideEncryptionAlgorithm)
+  public S3AOutputStream(Configuration conf,
+      S3AFileSystem fs, String key, Progressable progress)
       throws IOException {
-    this.bucket = bucket;
     this.key = key;
-    this.transfers = transfers;
     this.progress = progress;
     this.fs = fs;
-    this.cannedACL = cannedACL;
-    this.statistics = statistics;
-    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
 
     partSize = fs.getPartitionSize();
     partSizeThreshold = fs.getMultiPartThreshold();
@@ -124,30 +102,18 @@ public class S3AOutputStream extends OutputStream {
 
 
     try {
-      final ObjectMetadata om = new ObjectMetadata();
-      if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-        om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-      }
-      PutObjectRequest putObjectRequest =
-          new PutObjectRequest(bucket, key, backupFile);
-      putObjectRequest.setCannedAcl(cannedACL);
-      putObjectRequest.setMetadata(om);
-
-      Upload upload = transfers.upload(putObjectRequest);
-
-      ProgressableProgressListener listener = 
-          new ProgressableProgressListener(upload, progress, statistics);
+      final ObjectMetadata om = fs.newObjectMetadata();
+      Upload upload = fs.putObject(
+          fs.newPutObjectRequest(
+              key,
+              om,
+              backupFile));
+      ProgressableProgressListener listener =
+          new ProgressableProgressListener(fs, key, upload, progress);
       upload.addProgressListener(listener);
 
       upload.waitForUploadResult();
-
-      long delta = upload.getProgress().getBytesTransferred() -
-          listener.getLastBytesTransferred();
-      if (statistics != null && delta != 0) {
-        LOG.debug("S3A write delta changed after finished: {} bytes", delta);
-        statistics.incrementBytesWritten(delta);
-      }
-
+      listener.uploadCompleted();
       // This will delete unnecessary fake parent directories
       fs.finishedWrite(key);
     } catch (InterruptedException e) {
@@ -175,46 +141,4 @@ public class S3AOutputStream extends OutputStream {
     backupStream.write(b, off, len);
   }
 
-  /**
-   * Listener to progress from AWS regarding transfers.
-   */
-  public static class ProgressableProgressListener implements ProgressListener {
-    private Progressable progress;
-    private FileSystem.Statistics statistics;
-    private long lastBytesTransferred;
-    private Upload upload;
-
-    public ProgressableProgressListener(Upload upload, Progressable progress, 
-        FileSystem.Statistics statistics) {
-      this.upload = upload;
-      this.progress = progress;
-      this.statistics = statistics;
-      this.lastBytesTransferred = 0;
-    }
-
-    public void progressChanged(ProgressEvent progressEvent) {
-      if (progress != null) {
-        progress.progress();
-      }
-
-      // There are 3 http ops here, but this should be close enough for now
-      ProgressEventType pet = progressEvent.getEventType();
-      if (pet == TRANSFER_PART_STARTED_EVENT ||
-          pet == TRANSFER_COMPLETED_EVENT) {
-        statistics.incrementWriteOps(1);
-      }
-
-      long transferred = upload.getProgress().getBytesTransferred();
-      long delta = transferred - lastBytesTransferred;
-      if (statistics != null && delta != 0) {
-        statistics.incrementBytesWritten(delta);
-      }
-
-      lastBytesTransferred = transferred;
-    }
-
-    public long getLastBytesTransferred() {
-      return lastBytesTransferred;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
new file mode 100644
index 0000000..f69159a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageStatistics;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Storage statistics for S3A.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class S3AStorageStatistics extends StorageStatistics {
+  private static final Logger LOG = S3AFileSystem.LOG;
+
+  public static final String NAME = "S3AStorageStatistics";
+  private final Map<Statistic, AtomicLong> opsCount =
+      new EnumMap<>(Statistic.class);
+
+  public S3AStorageStatistics() {
+    super(NAME);
+    for (Statistic opType : Statistic.values()) {
+      opsCount.put(opType, new AtomicLong(0));
+    }
+  }
+
+  /**
+   * Increment a specific counter.
+   * @param op operation
+   * @param count increment value
+   * @return the new value
+   */
+  public long incrementCounter(Statistic op, long count) {
+    long updated = opsCount.get(op).addAndGet(count);
+    LOG.debug("{} += {}  ->  {}", op, count, updated);
+    return updated;
+  }
+
+  private class LongIterator implements Iterator<LongStatistic> {
+    private Iterator<Map.Entry<Statistic, AtomicLong>> iterator =
+        Collections.unmodifiableSet(opsCount.entrySet()).iterator();
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public LongStatistic next() {
+      if (!iterator.hasNext()) {
+        throw new NoSuchElementException();
+      }
+      final Map.Entry<Statistic, AtomicLong> entry = iterator.next();
+      return new LongStatistic(entry.getKey().name(), entry.getValue().get());
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public Iterator<LongStatistic> getLongStatistics() {
+    return new LongIterator();
+  }
+
+  @Override
+  public Long getLong(String key) {
+    final Statistic type = Statistic.fromSymbol(key);
+    return type == null ? null : opsCount.get(type).get();
+  }
+
+  @Override
+  public boolean isTracked(String key) {
+    return Statistic.fromSymbol(key) == null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c58a59f7/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 12d14e2..062fca4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
@@ -29,6 +30,7 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.AccessDeniedException;
+import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
@@ -186,4 +188,50 @@ public final class S3AUtils {
     }
     return builder.toString();
   }
+
+  /**
+   * Create a files status instance from a listing.
+   * @param keyPath path to entry
+   * @param summary summary from AWS
+   * @param blockSize block size to declare.
+   * @return a status entry
+   */
+  public static S3AFileStatus createFileStatus(Path keyPath,
+      S3ObjectSummary summary,
+      long blockSize) {
+    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+      return new S3AFileStatus(true, true, keyPath);
+    } else {
+      return new S3AFileStatus(summary.getSize(),
+          dateToLong(summary.getLastModified()), keyPath,
+          blockSize);
+    }
+  }
+
+  /**
+   * Predicate: does the object represent a directory?.
+   * @param name object name
+   * @param size object size
+   * @return true if it meets the criteria for being an object
+   */
+  public static boolean objectRepresentsDirectory(final String name,
+      final long size) {
+    return !name.isEmpty()
+        && name.charAt(name.length() - 1) == '/'
+        && size == 0L;
+  }
+
+  /**
+   * Date to long conversion.
+   * Handles null Dates that can be returned by AWS by returning 0
+   * @param date date from AWS query
+   * @return timestamp of the object
+   */
+  public static long dateToLong(final Date date) {
+    if (date == null) {
+      return 0L;
+    }
+
+    return date.getTime();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org