You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2018/07/20 10:33:16 UTC

[44/50] hadoop git commit: HADOOP-15547/ WASB: improve listStatus performance. Contributed by Thomas Marquardt.

HADOOP-15547/ WASB: improve listStatus performance.
Contributed by Thomas Marquardt.

(cherry picked from commit 749fff577ed9afb4ef8a54b8948f74be083cc620)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/45d9568a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/45d9568a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/45d9568a

Branch: refs/heads/HDFS-13572
Commit: 45d9568aaaf532a6da11bd7c1844ff81bf66bab1
Parents: 5836e0a
Author: Steve Loughran <st...@apache.org>
Authored: Thu Jul 19 12:31:19 2018 -0700
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Jul 19 12:31:19 2018 -0700

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |  10 +
 hadoop-tools/hadoop-azure/pom.xml               |  12 +
 .../fs/azure/AzureNativeFileSystemStore.java    | 182 ++++-----
 .../apache/hadoop/fs/azure/FileMetadata.java    |  77 ++--
 .../hadoop/fs/azure/NativeAzureFileSystem.java  | 376 ++++++++-----------
 .../hadoop/fs/azure/NativeFileSystemStore.java  |  15 +-
 .../apache/hadoop/fs/azure/PartialListing.java  |  61 ---
 .../hadoop/fs/azure/ITestListPerformance.java   | 196 ++++++++++
 8 files changed, 514 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
index cde1734..38de35e 100644
--- a/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-azure/dev-support/findbugs-exclude.xml
@@ -47,4 +47,14 @@
        <Bug pattern="WMI_WRONG_MAP_ITERATOR" />
        <Priority value="2" />
      </Match>
+
+    <!-- FileMetadata is used internally for storing metadata but also
+    subclasses FileStatus to reduce allocations when listing a large number
+    of files.  When it is returned to an external caller as a FileStatus, the
+    extra metadata is no longer useful and we want the equals and hashCode
+    methods of FileStatus to be used. -->
+    <Match>
+        <Class name="org.apache.hadoop.fs.azure.FileMetadata" />
+        <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS" />
+    </Match>
  </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 44b67a0..52b5b72 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -43,6 +43,8 @@
     <fs.azure.scale.test.huge.partitionsize>unset</fs.azure.scale.test.huge.partitionsize>
     <!-- Timeout in seconds for scale tests.-->
     <fs.azure.scale.test.timeout>7200</fs.azure.scale.test.timeout>
+    <fs.azure.scale.test.list.performance.threads>10</fs.azure.scale.test.list.performance.threads>
+    <fs.azure.scale.test.list.performance.files>1000</fs.azure.scale.test.list.performance.files>
   </properties>
 
   <build>
@@ -298,6 +300,8 @@
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
                     <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
                   </systemPropertyVariables>
                   <includes>
                     <include>**/Test*.java</include>
@@ -326,6 +330,8 @@
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
                     <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
                   </systemPropertyVariables>
                   <includes>
                     <include>**/TestRollingWindowAverage*.java</include>
@@ -367,6 +373,8 @@
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
                     <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
                   </systemPropertyVariables>
                   <!-- Some tests cannot run in parallel.  Tests that cover -->
                   <!-- access to the root directory must run in isolation -->
@@ -412,6 +420,8 @@
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
                     <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
                   </systemPropertyVariables>
                   <includes>
                     <include>**/ITestFileSystemOperationsExceptionHandlingMultiThreaded.java</include>
@@ -454,6 +464,8 @@
                     <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
                     <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
                     <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+                    <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+                    <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
                   </systemPropertyVariables>
                   <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
                   <trimStackTrace>false</trimStackTrace>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
index 197ab22..d2f9ca6 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java
@@ -30,7 +30,6 @@ import java.net.URISyntaxException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.security.InvalidKeyException;
-import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.EnumSet;
@@ -128,6 +127,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   // computed as min(2*cpu,8)
   private static final String KEY_CONCURRENT_CONNECTION_VALUE_OUT = "fs.azure.concurrentRequestCount.out";
 
+  private static final String HADOOP_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
   private static final String KEY_STREAM_MIN_READ_SIZE = "fs.azure.read.request.size";
   private static final String KEY_STORAGE_CONNECTION_TIMEOUT = "fs.azure.storage.timeout";
   private static final String KEY_WRITE_BLOCK_SIZE = "fs.azure.write.request.size";
@@ -252,6 +252,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   // Default block sizes
   public static final int DEFAULT_DOWNLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
   public static final int DEFAULT_UPLOAD_BLOCK_SIZE = 4 * 1024 * 1024;
+  public static final long DEFAULT_HADOOP_BLOCK_SIZE = 512 * 1024 * 1024L;
 
   private static final int DEFAULT_INPUT_STREAM_VERSION = 2;
 
@@ -313,6 +314,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
   private boolean tolerateOobAppends = DEFAULT_READ_TOLERATE_CONCURRENT_APPEND;
 
+  private long hadoopBlockSize = DEFAULT_HADOOP_BLOCK_SIZE;
   private int downloadBlockSizeBytes = DEFAULT_DOWNLOAD_BLOCK_SIZE;
   private int uploadBlockSizeBytes = DEFAULT_UPLOAD_BLOCK_SIZE;
   private int inputStreamVersion = DEFAULT_INPUT_STREAM_VERSION;
@@ -740,6 +742,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         KEY_STREAM_MIN_READ_SIZE, DEFAULT_DOWNLOAD_BLOCK_SIZE);
     this.uploadBlockSizeBytes = sessionConfiguration.getInt(
         KEY_WRITE_BLOCK_SIZE, DEFAULT_UPLOAD_BLOCK_SIZE);
+    this.hadoopBlockSize = sessionConfiguration.getLong(
+        HADOOP_BLOCK_SIZE_PROPERTY_NAME, DEFAULT_HADOOP_BLOCK_SIZE);
 
     this.inputStreamVersion = sessionConfiguration.getInt(
         KEY_INPUT_STREAM_VERSION, DEFAULT_INPUT_STREAM_VERSION);
@@ -1234,7 +1238,14 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     return false;
   }
 
-
+  /**
+   * Returns the file block size.  This is a fake value used for integration
+   * of the Azure store with Hadoop.
+   */
+  @Override
+  public long getHadoopBlockSize() {
+    return hadoopBlockSize;
+  }
 
   /**
    * This should be called from any method that does any modifications to the
@@ -2066,7 +2077,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         // The key refers to root directory of container.
         // Set the modification time for root to zero.
         return new FileMetadata(key, 0, defaultPermissionNoBlobMetadata(),
-            BlobMaterialization.Implicit);
+            BlobMaterialization.Implicit, hadoopBlockSize);
       }
 
       CloudBlobWrapper blob = getBlobReference(key);
@@ -2086,7 +2097,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           if (retrieveFolderAttribute(blob)) {
             LOG.debug("{} is a folder blob.", key);
             return new FileMetadata(key, properties.getLastModified().getTime(),
-                getPermissionStatus(blob), BlobMaterialization.Explicit);
+                getPermissionStatus(blob), BlobMaterialization.Explicit, hadoopBlockSize);
           } else {
 
             LOG.debug("{} is a normal blob.", key);
@@ -2095,7 +2106,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
                 key, // Always return denormalized key with metadata.
                 getDataLength(blob, properties),
                 properties.getLastModified().getTime(),
-                getPermissionStatus(blob));
+                getPermissionStatus(blob), hadoopBlockSize);
           }
         } catch(StorageException e){
           if (!NativeAzureFileSystemHelper.isFileNotFoundException(e)) {
@@ -2129,7 +2140,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           BlobProperties properties = blob.getProperties();
 
           return new FileMetadata(key, properties.getLastModified().getTime(),
-              getPermissionStatus(blob), BlobMaterialization.Implicit);
+              getPermissionStatus(blob), BlobMaterialization.Implicit, hadoopBlockSize);
         }
       }
 
@@ -2178,46 +2189,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
   }
 
   @Override
-  public PartialListing list(String prefix, final int maxListingCount,
+  public FileMetadata[] list(String prefix, final int maxListingCount,
       final int maxListingDepth) throws IOException {
-    return list(prefix, maxListingCount, maxListingDepth, null);
-  }
-
-  @Override
-  public PartialListing list(String prefix, final int maxListingCount,
-      final int maxListingDepth, String priorLastKey) throws IOException {
-    return list(prefix, PATH_DELIMITER, maxListingCount, maxListingDepth,
-        priorLastKey);
+    return listInternal(prefix, maxListingCount, maxListingDepth);
   }
 
-  @Override
-  public PartialListing listAll(String prefix, final int maxListingCount,
-      final int maxListingDepth, String priorLastKey) throws IOException {
-    return list(prefix, null, maxListingCount, maxListingDepth, priorLastKey);
-  }
-
-  /**
-   * Searches the given list of {@link FileMetadata} objects for a directory
-   * with the given key.
-   *
-   * @param list
-   *          The list to search.
-   * @param key
-   *          The key to search for.
-   * @return The wanted directory, or null if not found.
-   */
-  private static FileMetadata getFileMetadataInList(
-      final Iterable<FileMetadata> list, String key) {
-    for (FileMetadata current : list) {
-      if (current.getKey().equals(key)) {
-        return current;
-      }
-    }
-    return null;
-  }
-
-  private PartialListing list(String prefix, String delimiter,
-      final int maxListingCount, final int maxListingDepth, String priorLastKey)
+  private FileMetadata[] listInternal(String prefix, final int maxListingCount,
+      final int maxListingDepth)
       throws IOException {
     try {
       checkContainer(ContainerAccessType.PureRead);
@@ -2241,7 +2219,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
         objects = listRootBlobs(prefix, true, enableFlatListing);
       }
 
-      ArrayList<FileMetadata> fileMetadata = new ArrayList<FileMetadata>();
+      HashMap<String, FileMetadata> fileMetadata = new HashMap<>(256);
+
       for (ListBlobItem blobItem : objects) {
         // Check that the maximum listing count is not exhausted.
         //
@@ -2261,25 +2240,37 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
 
           FileMetadata metadata;
           if (retrieveFolderAttribute(blob)) {
-            metadata = new FileMetadata(blobKey,
-                properties.getLastModified().getTime(),
-                getPermissionStatus(blob),
-                BlobMaterialization.Explicit);
+              metadata = new FileMetadata(blobKey,
+                  properties.getLastModified().getTime(),
+                  getPermissionStatus(blob),
+                  BlobMaterialization.Explicit,
+                  hadoopBlockSize);
           } else {
-            metadata = new FileMetadata(
-                blobKey,
-                getDataLength(blob, properties),
-                properties.getLastModified().getTime(),
-                getPermissionStatus(blob));
+              metadata = new FileMetadata(
+                  blobKey,
+                  getDataLength(blob, properties),
+                  properties.getLastModified().getTime(),
+                  getPermissionStatus(blob),
+                  hadoopBlockSize);
           }
+          // Add the metadata but remove duplicates.  Note that the azure
+          // storage java SDK returns two types of entries: CloudBlobWrappter
+          // and CloudDirectoryWrapper.  In the case where WASB generated the
+          // data, there will be an empty blob for each "directory", and we will
+          // receive a CloudBlobWrapper.  If there are also files within this
+          // "directory", we will also receive a CloudDirectoryWrapper.  To
+          // complicate matters, the data may not be generated by WASB, in
+          // which case we may not have an empty blob for each "directory".
+          // So, sometimes we receive both a CloudBlobWrapper and a
+          // CloudDirectoryWrapper for each directory, and sometimes we receive
+          // one or the other but not both.  We remove duplicates, but
+          // prefer CloudBlobWrapper over CloudDirectoryWrapper.
+          // Furthermore, it is very unfortunate that the list results are not
+          // ordered, and it is a partial list which uses continuation.  So
+          // the HashMap is the best structure to remove the duplicates, despite
+          // its potential large size.
+          fileMetadata.put(blobKey, metadata);
 
-          // Add the metadata to the list, but remove any existing duplicate
-          // entries first that we may have added by finding nested files.
-          FileMetadata existing = getFileMetadataInList(fileMetadata, blobKey);
-          if (existing != null) {
-            fileMetadata.remove(existing);
-          }
-          fileMetadata.add(metadata);
         } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
           CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
           // Determine format of directory name depending on whether an absolute
@@ -2298,12 +2289,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           // inherit the permissions of the first non-directory blob.
           // Also, getting a proper value for last-modified is tricky.
           FileMetadata directoryMetadata = new FileMetadata(dirKey, 0,
-              defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit);
+              defaultPermissionNoBlobMetadata(), BlobMaterialization.Implicit,
+              hadoopBlockSize);
 
           // Add the directory metadata to the list only if it's not already
-          // there.
-          if (getFileMetadataInList(fileMetadata, dirKey) == null) {
-            fileMetadata.add(directoryMetadata);
+          // there.  See earlier note, we prefer CloudBlobWrapper over
+          // CloudDirectoryWrapper because it may have additional metadata (
+          // properties and ACLs).
+          if (!fileMetadata.containsKey(dirKey)) {
+            fileMetadata.put(dirKey, directoryMetadata);
           }
 
           if (!enableFlatListing) {
@@ -2314,13 +2308,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
           }
         }
       }
-      // Note: Original code indicated that this may be a hack.
-      priorLastKey = null;
-      PartialListing listing = new PartialListing(priorLastKey,
-          fileMetadata.toArray(new FileMetadata[] {}),
-          0 == fileMetadata.size() ? new String[] {}
-      : new String[] { prefix });
-      return listing;
+      return fileMetadata.values().toArray(new FileMetadata[fileMetadata.size()]);
     } catch (Exception e) {
       // Re-throw as an Azure storage exception.
       //
@@ -2334,13 +2322,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
    * the sorted order of the blob names.
    *
    * @param aCloudBlobDirectory Azure blob directory
-   * @param aFileMetadataList a list of file metadata objects for each
+   * @param metadataHashMap a map of file metadata objects for each
    *                          non-directory blob.
    * @param maxListingCount maximum length of the built up list.
    */
   private void buildUpList(CloudBlobDirectoryWrapper aCloudBlobDirectory,
-      ArrayList<FileMetadata> aFileMetadataList, final int maxListingCount,
-      final int maxListingDepth) throws Exception {
+                           HashMap<String, FileMetadata> metadataHashMap, final int maxListingCount,
+                           final int maxListingDepth) throws Exception {
 
     // Push the blob directory onto the stack.
     //
@@ -2371,12 +2359,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
     // (2) maxListingCount > 0 implies that the number of items in the
     // metadata list is less than the max listing count.
     while (null != blobItemIterator
-        && (maxListingCount <= 0 || aFileMetadataList.size() < maxListingCount)) {
+        && (maxListingCount <= 0 || metadataHashMap.size() < maxListingCount)) {
       while (blobItemIterator.hasNext()) {
         // Check if the count of items on the list exhausts the maximum
         // listing count.
         //
-        if (0 < maxListingCount && aFileMetadataList.size() >= maxListingCount) {
+        if (0 < maxListingCount && metadataHashMap.size() >= maxListingCount) {
           break;
         }
 
@@ -2399,22 +2387,34 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
             metadata = new FileMetadata(blobKey,
                 properties.getLastModified().getTime(),
                 getPermissionStatus(blob),
-                BlobMaterialization.Explicit);
+                BlobMaterialization.Explicit,
+                hadoopBlockSize);
           } else {
             metadata = new FileMetadata(
                 blobKey,
                 getDataLength(blob, properties),
                 properties.getLastModified().getTime(),
-                getPermissionStatus(blob));
+                getPermissionStatus(blob),
+                hadoopBlockSize);
           }
 
-          // Add the directory metadata to the list only if it's not already
-          // there.
-          FileMetadata existing = getFileMetadataInList(aFileMetadataList, blobKey);
-          if (existing != null) {
-            aFileMetadataList.remove(existing);
-          }
-          aFileMetadataList.add(metadata);
+          // Add the metadata but remove duplicates.  Note that the azure
+          // storage java SDK returns two types of entries: CloudBlobWrappter
+          // and CloudDirectoryWrapper.  In the case where WASB generated the
+          // data, there will be an empty blob for each "directory", and we will
+          // receive a CloudBlobWrapper.  If there are also files within this
+          // "directory", we will also receive a CloudDirectoryWrapper.  To
+          // complicate matters, the data may not be generated by WASB, in
+          // which case we may not have an empty blob for each "directory".
+          // So, sometimes we receive both a CloudBlobWrapper and a
+          // CloudDirectoryWrapper for each directory, and sometimes we receive
+          // one or the other but not both.  We remove duplicates, but
+          // prefer CloudBlobWrapper over CloudDirectoryWrapper.
+          // Furthermore, it is very unfortunate that the list results are not
+          // ordered, and it is a partial list which uses continuation.  So
+          // the HashMap is the best structure to remove the duplicates, despite
+          // its potential large size.
+          metadataHashMap.put(blobKey, metadata);
         } else if (blobItem instanceof CloudBlobDirectoryWrapper) {
           CloudBlobDirectoryWrapper directory = (CloudBlobDirectoryWrapper) blobItem;
 
@@ -2439,7 +2439,12 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
             // absolute path is being used or not.
             String dirKey = normalizeKey(directory);
 
-            if (getFileMetadataInList(aFileMetadataList, dirKey) == null) {
+            // Add the directory metadata to the list only if it's not already
+            // there.  See earlier note, we prefer CloudBlobWrapper over
+            // CloudDirectoryWrapper because it may have additional metadata (
+            // properties and ACLs).
+            if (!metadataHashMap.containsKey(dirKey)) {
+
               // Reached the targeted listing depth. Return metadata for the
               // directory using default permissions.
               //
@@ -2450,10 +2455,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
               FileMetadata directoryMetadata = new FileMetadata(dirKey,
                   0,
                   defaultPermissionNoBlobMetadata(),
-                  BlobMaterialization.Implicit);
+                  BlobMaterialization.Implicit,
+                  hadoopBlockSize);
 
               // Add the directory metadata to the list.
-              aFileMetadataList.add(directoryMetadata);
+              metadataHashMap.put(dirKey, directoryMetadata);
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
index 5085a0f..cbf3ab9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/FileMetadata.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.azure;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 
 /**
@@ -27,12 +29,9 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
  * </p>
  */
 @InterfaceAudience.Private
-class FileMetadata {
-  private final String key;
-  private final long length;
-  private final long lastModified;
-  private final boolean isDir;
-  private final PermissionStatus permissionStatus;
+class FileMetadata extends FileStatus {
+  // this is not final so that it can be cleared to save memory when not needed.
+  private String key;
   private final BlobMaterialization blobMaterialization;
 
   /**
@@ -46,16 +45,19 @@ class FileMetadata {
    *          The last modified date (milliseconds since January 1, 1970 UTC.)
    * @param permissionStatus
    *          The permission for the file.
+   * @param blockSize
+   *          The Hadoop file block size.
    */
   public FileMetadata(String key, long length, long lastModified,
-      PermissionStatus permissionStatus) {
+      PermissionStatus permissionStatus, final long blockSize) {
+    super(length, false, 1, blockSize, lastModified, 0,
+        permissionStatus.getPermission(),
+        permissionStatus.getUserName(),
+        permissionStatus.getGroupName(),
+        null);
     this.key = key;
-    this.length = length;
-    this.lastModified = lastModified;
-    this.isDir = false;
-    this.permissionStatus = permissionStatus;
-    this.blobMaterialization = BlobMaterialization.Explicit; // File are never
-                                                             // implicit.
+    // Files are never implicit.
+    this.blobMaterialization = BlobMaterialization.Explicit;
   }
 
   /**
@@ -70,37 +72,42 @@ class FileMetadata {
    * @param blobMaterialization
    *          Whether this is an implicit (no real blob backing it) or explicit
    *          directory.
+   * @param blockSize
+   *          The Hadoop file block size.
    */
   public FileMetadata(String key, long lastModified,
-      PermissionStatus permissionStatus, BlobMaterialization blobMaterialization) {
+      PermissionStatus permissionStatus, BlobMaterialization blobMaterialization,
+      final long blockSize) {
+    super(0, true, 1, blockSize, lastModified, 0,
+        permissionStatus.getPermission(),
+        permissionStatus.getUserName(),
+        permissionStatus.getGroupName(),
+        null);
     this.key = key;
-    this.isDir = true;
-    this.length = 0;
-    this.lastModified = lastModified;
-    this.permissionStatus = permissionStatus;
     this.blobMaterialization = blobMaterialization;
   }
 
-  public boolean isDir() {
-    return isDir;
+  @Override
+  public Path getPath() {
+    Path p = super.getPath();
+    if (p == null) {
+      // Don't store this yet to reduce memory usage, as it will
+      // stay in the Eden Space and later we will update it
+      // with the full canonicalized path.
+      p = NativeAzureFileSystem.keyToPath(key);
+    }
+    return p;
   }
 
+  /**
+   * Returns the Azure storage key for the file.  Used internally by the framework.
+   *
+   * @return The key for the file.
+   */
   public String getKey() {
     return key;
   }
 
-  public long getLength() {
-    return length;
-  }
-
-  public long getLastModified() {
-    return lastModified;
-  }
-
-  public PermissionStatus getPermissionStatus() {
-    return permissionStatus;
-  }
-
   /**
    * Indicates whether this is an implicit directory (no real blob backing it)
    * or an explicit one.
@@ -112,9 +119,7 @@ class FileMetadata {
     return blobMaterialization;
   }
 
-  @Override
-  public String toString() {
-    return "FileMetadata[" + key + ", " + length + ", " + lastModified + ", "
-        + permissionStatus + "]";
+  void removeKey() {
+    key = null;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
index 5202762..f8962d9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java
@@ -31,9 +31,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.EnumSet;
-import java.util.Set;
 import java.util.TimeZone;
-import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Matcher;
@@ -129,20 +127,12 @@ public class NativeAzureFileSystem extends FileSystem {
       this.dstKey = dstKey;
       this.folderLease = lease;
       this.fs = fs;
-      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
 
       // List all the files in the folder.
       long start = Time.monotonicNow();
-      String priorLastKey = null;
-      do {
-        PartialListing listing = fs.getStoreInterface().listAll(srcKey, AZURE_LIST_ALL,
-          AZURE_UNBOUNDED_DEPTH, priorLastKey);
-        for(FileMetadata file : listing.getFiles()) {
-          fileMetadataList.add(file);
-        }
-        priorLastKey = listing.getPriorLastKey();
-      } while (priorLastKey != null);
-      fileMetadata = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+      fileMetadata = fs.getStoreInterface().list(srcKey, AZURE_LIST_ALL,
+          AZURE_UNBOUNDED_DEPTH);
+
       long end = Time.monotonicNow();
       LOG.debug("Time taken to list {} blobs for rename operation is: {} ms", fileMetadata.length, (end - start));
 
@@ -669,7 +659,6 @@ public class NativeAzureFileSystem extends FileSystem {
 
   public static final Logger LOG = LoggerFactory.getLogger(NativeAzureFileSystem.class);
 
-  static final String AZURE_BLOCK_SIZE_PROPERTY_NAME = "fs.azure.block.size";
   /**
    * The time span in seconds before which we consider a temp blob to be
    * dangling (not being actively uploaded to) and up for reclamation.
@@ -685,8 +674,6 @@ public class NativeAzureFileSystem extends FileSystem {
   private static final int AZURE_LIST_ALL = -1;
   private static final int AZURE_UNBOUNDED_DEPTH = -1;
 
-  private static final long MAX_AZURE_BLOCK_SIZE = 512 * 1024 * 1024L;
-
   /**
    * The configuration property that determines which group owns files created
    * in WASB.
@@ -1196,7 +1183,6 @@ public class NativeAzureFileSystem extends FileSystem {
   private NativeFileSystemStore store;
   private AzureNativeFileSystemStore actualStore;
   private Path workingDir;
-  private long blockSize = MAX_AZURE_BLOCK_SIZE;
   private AzureFileSystemInstrumentation instrumentation;
   private String metricsSourceName;
   private boolean isClosed = false;
@@ -1361,13 +1347,10 @@ public class NativeAzureFileSystem extends FileSystem {
     this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
     this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
         .getShortUserName()).makeQualified(getUri(), getWorkingDirectory());
-    this.blockSize = conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME,
-        MAX_AZURE_BLOCK_SIZE);
 
     this.appendSupportEnabled = conf.getBoolean(APPEND_SUPPORT_ENABLE_PROPERTY_NAME, false);
     LOG.debug("NativeAzureFileSystem. Initializing.");
-    LOG.debug("  blockSize  = {}",
-        conf.getLong(AZURE_BLOCK_SIZE_PROPERTY_NAME, MAX_AZURE_BLOCK_SIZE));
+    LOG.debug("  blockSize  = {}", store.getHadoopBlockSize());
 
     // Initialize thread counts from user configuration
     deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
@@ -1491,7 +1474,7 @@ public class NativeAzureFileSystem extends FileSystem {
     }
   }
 
-  private static Path keyToPath(String key) {
+  static Path keyToPath(String key) {
     if (key.equals("/")) {
       return new Path("/"); // container
     }
@@ -1599,7 +1582,7 @@ public class NativeAzureFileSystem extends FileSystem {
       throw new FileNotFoundException(f.toString());
     }
 
-    if (meta.isDir()) {
+    if (meta.isDirectory()) {
       throw new FileNotFoundException(f.toString()
           + " is a directory not a file.");
     }
@@ -1815,7 +1798,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     FileMetadata existingMetadata = store.retrieveMetadata(key);
     if (existingMetadata != null) {
-      if (existingMetadata.isDir()) {
+      if (existingMetadata.isDirectory()) {
         throw new FileAlreadyExistsException("Cannot create file " + f
             + "; already exists as a directory.");
       }
@@ -1833,7 +1816,7 @@ public class NativeAzureFileSystem extends FileSystem {
       // already exists.
       String parentKey = pathToKey(parentFolder);
       FileMetadata parentMetadata = store.retrieveMetadata(parentKey);
-      if (parentMetadata != null && parentMetadata.isDir() &&
+      if (parentMetadata != null && parentMetadata.isDirectory() &&
         parentMetadata.getBlobMaterialization() == BlobMaterialization.Explicit) {
         if (parentFolderLease != null) {
           store.updateFolderLastModifiedTime(parentKey, parentFolderLease);
@@ -1850,7 +1833,7 @@ public class NativeAzureFileSystem extends FileSystem {
           firstExisting = firstExisting.getParent();
           metadata = store.retrieveMetadata(pathToKey(firstExisting));
         }
-        mkdirs(parentFolder, metadata.getPermissionStatus().getPermission(), true);
+        mkdirs(parentFolder, metadata.getPermission(), true);
       }
     }
 
@@ -1988,7 +1971,7 @@ public class NativeAzureFileSystem extends FileSystem {
               + parentPath + " whose metadata cannot be retrieved. Can't resolve");
       }
 
-      if (!parentMetadata.isDir()) {
+      if (!parentMetadata.isDirectory()) {
          // Invalid state: the parent path is actually a file. Throw.
          throw new AzureException("File " + f + " has a parent directory "
              + parentPath + " which is also a file. Can't resolve.");
@@ -1997,7 +1980,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     // The path exists, determine if it is a folder containing objects,
     // an empty folder, or a simple file and take the appropriate actions.
-    if (!metaFile.isDir()) {
+    if (!metaFile.isDirectory()) {
       // The path specifies a file. We need to check the parent path
       // to make sure it's a proper materialized directory before we
       // delete the file. Otherwise we may get into a situation where
@@ -2114,9 +2097,9 @@ public class NativeAzureFileSystem extends FileSystem {
       AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
         @Override
         public boolean execute(FileMetadata file) throws IOException{
-          if (!deleteFile(file.getKey(), file.isDir())) {
+          if (!deleteFile(file.getKey(), file.isDirectory())) {
             LOG.warn("Attempt to delete non-existent {} {}",
-                file.isDir() ? "directory" : "file",
+                file.isDirectory() ? "directory" : "file",
                 file.getKey());
           }
           return true;
@@ -2138,7 +2121,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
       // Delete the current directory if all underlying contents are deleted
       if (isPartialDelete || (store.retrieveMetadata(metaFile.getKey()) != null
-          && !deleteFile(metaFile.getKey(), metaFile.isDir()))) {
+          && !deleteFile(metaFile.getKey(), metaFile.isDirectory()))) {
         LOG.error("Failed delete directory : {}", f);
         return false;
       }
@@ -2191,7 +2174,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
     // The path exists, determine if it is a folder containing objects,
     // an empty folder, or a simple file and take the appropriate actions.
-    if (!metaFile.isDir()) {
+    if (!metaFile.isDirectory()) {
       // The path specifies a file. We need to check the parent path
       // to make sure it's a proper materialized directory before we
       // delete the file. Otherwise we may get into a situation where
@@ -2234,7 +2217,7 @@ public class NativeAzureFileSystem extends FileSystem {
               + parentPath + " whose metadata cannot be retrieved. Can't resolve");
         }
 
-        if (!parentMetadata.isDir()) {
+        if (!parentMetadata.isDirectory()) {
           // Invalid state: the parent path is actually a file. Throw.
           throw new AzureException("File " + f + " has a parent directory "
               + parentPath + " which is also a file. Can't resolve.");
@@ -2319,38 +2302,27 @@ public class NativeAzureFileSystem extends FileSystem {
         }
       }
 
-      // List all the blobs in the current folder.
-      String priorLastKey = null;
-
       // Start time for list operation
       long start = Time.monotonicNow();
-      ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
+      final FileMetadata[] contents;
 
       // List all the files in the folder with AZURE_UNBOUNDED_DEPTH depth.
-      do {
-        try {
-          PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
-            AZURE_UNBOUNDED_DEPTH, priorLastKey);
-          for(FileMetadata file : listing.getFiles()) {
-            fileMetadataList.add(file);
-          }
-          priorLastKey = listing.getPriorLastKey();
-        } catch (IOException e) {
-          Throwable innerException = checkForAzureStorageException(e);
-
-          if (innerException instanceof StorageException
-              && isFileNotFoundException((StorageException) innerException)) {
-            return false;
-          }
+      try {
+        contents = store.list(key, AZURE_LIST_ALL,
+            AZURE_UNBOUNDED_DEPTH);
+      } catch (IOException e) {
+        Throwable innerException = checkForAzureStorageException(e);
 
-          throw e;
+        if (innerException instanceof StorageException
+            && isFileNotFoundException((StorageException) innerException)) {
+          return false;
         }
-      } while (priorLastKey != null);
 
-      long end = Time.monotonicNow();
-      LOG.debug("Time taken to list {} blobs for delete operation: {} ms", fileMetadataList.size(), (end - start));
+        throw e;
+      }
 
-      final FileMetadata[] contents = fileMetadataList.toArray(new FileMetadata[fileMetadataList.size()]);
+      long end = Time.monotonicNow();
+      LOG.debug("Time taken to list {} blobs for delete operation: {} ms", contents.length, (end - start));
 
       if (contents.length > 0) {
         if (!recursive) {
@@ -2365,9 +2337,9 @@ public class NativeAzureFileSystem extends FileSystem {
       AzureFileSystemThreadTask task = new AzureFileSystemThreadTask() {
         @Override
         public boolean execute(FileMetadata file) throws IOException{
-          if (!deleteFile(file.getKey(), file.isDir())) {
+          if (!deleteFile(file.getKey(), file.isDirectory())) {
             LOG.warn("Attempt to delete non-existent {} {}",
-                file.isDir() ? "directory" : "file",
+                file.isDirectory() ? "directory" : "file",
                 file.getKey());
           }
           return true;
@@ -2384,7 +2356,7 @@ public class NativeAzureFileSystem extends FileSystem {
 
       // Delete the current directory
       if (store.retrieveMetadata(metaFile.getKey()) != null
-          && !deleteFile(metaFile.getKey(), metaFile.isDir())) {
+          && !deleteFile(metaFile.getKey(), metaFile.isDirectory())) {
         LOG.error("Failed delete directory : {}", f);
         return false;
       }
@@ -2456,13 +2428,13 @@ public class NativeAzureFileSystem extends FileSystem {
 
     boolean isPartialDelete = false;
 
-    Path pathToDelete = makeAbsolute(keyToPath(folderToDelete.getKey()));
+    Path pathToDelete = makeAbsolute(folderToDelete.getPath());
     foldersToProcess.push(folderToDelete);
 
     while (!foldersToProcess.empty()) {
 
       FileMetadata currentFolder = foldersToProcess.pop();
-      Path currentPath = makeAbsolute(keyToPath(currentFolder.getKey()));
+      Path currentPath = makeAbsolute(currentFolder.getPath());
       boolean canDeleteChildren = true;
 
       // If authorization is enabled, check for 'write' permission on current folder
@@ -2478,8 +2450,8 @@ public class NativeAzureFileSystem extends FileSystem {
       if (canDeleteChildren) {
 
         // get immediate children list
-        ArrayList<FileMetadata> fileMetadataList = getChildrenMetadata(currentFolder.getKey(),
-            maxListingDepth);
+        FileMetadata[] fileMetadataList = store.list(currentFolder.getKey(),
+            AZURE_LIST_ALL, maxListingDepth);
 
         // Process children of currentFolder and add them to list of contents
         // that can be deleted. We Perform stickybit check on every file and
@@ -2490,12 +2462,12 @@ public class NativeAzureFileSystem extends FileSystem {
             // This file/folder cannot be deleted and neither can the parent paths be deleted.
             // Remove parent paths from list of contents that can be deleted.
             canDeleteChildren = false;
-            Path filePath = makeAbsolute(keyToPath(childItem.getKey()));
+            Path filePath = makeAbsolute(childItem.getPath());
             LOG.error("User does not have permissions to delete {}. "
               + "Parent directory has sticky bit set.", filePath);
           } else {
             // push the child directories to the stack to process their contents
-            if (childItem.isDir()) {
+            if (childItem.isDirectory()) {
               foldersToProcess.push(childItem);
             }
             // Add items to list of contents that can be deleted.
@@ -2540,23 +2512,6 @@ public class NativeAzureFileSystem extends FileSystem {
     return isPartialDelete;
   }
 
-  private ArrayList<FileMetadata> getChildrenMetadata(String key, int maxListingDepth)
-    throws IOException {
-
-    String priorLastKey = null;
-    ArrayList<FileMetadata> fileMetadataList = new ArrayList<FileMetadata>();
-    do {
-       PartialListing listing = store.listAll(key, AZURE_LIST_ALL,
-         maxListingDepth, priorLastKey);
-       for (FileMetadata file : listing.getFiles()) {
-         fileMetadataList.add(file);
-       }
-       priorLastKey = listing.getPriorLastKey();
-    } while (priorLastKey != null);
-
-    return fileMetadataList;
-  }
-
   private boolean isStickyBitCheckViolated(FileMetadata metaData,
     FileMetadata parentMetadata, boolean throwOnException) throws IOException {
       try {
@@ -2602,13 +2557,13 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     // stickybit is not set on parent and hence cannot be violated
-    if (!parentMetadata.getPermissionStatus().getPermission().getStickyBit()) {
+    if (!parentMetadata.getPermission().getStickyBit()) {
       return false;
     }
 
     String currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-    String parentDirectoryOwner = parentMetadata.getPermissionStatus().getUserName();
-    String currentFileOwner = metaData.getPermissionStatus().getUserName();
+    String parentDirectoryOwner = parentMetadata.getOwner();
+    String currentFileOwner = metaData.getOwner();
 
     // Files/Folders with no owner set will not pass stickybit check
     if ((parentDirectoryOwner.equalsIgnoreCase(currentUser))
@@ -2687,7 +2642,15 @@ public class NativeAzureFileSystem extends FileSystem {
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     if (key.length() == 0) { // root always exists
-      return newDirectory(null, absolutePath);
+      return new FileStatus(
+          0,
+          true,
+          1,
+          store.getHadoopBlockSize(),
+          0,
+          0,
+          FsPermission.getDefault(), "", "",
+          absolutePath.makeQualified(getUri(), getWorkingDirectory()));
     }
 
     // The path is either a folder or a file. Retrieve metadata to
@@ -2709,7 +2672,7 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     if (meta != null) {
-      if (meta.isDir()) {
+      if (meta.isDirectory()) {
         // The path is a folder with files in it.
         //
 
@@ -2723,14 +2686,14 @@ public class NativeAzureFileSystem extends FileSystem {
         }
 
         // Return reference to the directory object.
-        return newDirectory(meta, absolutePath);
+        return updateFileStatusPath(meta, absolutePath);
       }
 
       // The path is a file.
       LOG.debug("Found the path: {} as a file.", f.toString());
 
       // Return with reference to a file object.
-      return newFile(meta, absolutePath);
+      return updateFileStatusPath(meta, absolutePath);
     }
 
     // File not found. Throw exception no such file or directory.
@@ -2787,7 +2750,7 @@ public class NativeAzureFileSystem extends FileSystem {
     performAuthCheck(absolutePath, WasbAuthorizationOperations.READ, "liststatus", absolutePath);
 
     String key = pathToKey(absolutePath);
-    Set<FileStatus> status = new TreeSet<FileStatus>();
+
     FileMetadata meta = null;
     try {
       meta = store.retrieveMetadata(key);
@@ -2804,101 +2767,93 @@ public class NativeAzureFileSystem extends FileSystem {
       throw ex;
     }
 
-    if (meta != null) {
-      if (!meta.isDir()) {
-
-        LOG.debug("Found path as a file");
-
-        return new FileStatus[] { newFile(meta, absolutePath) };
-      }
-
-      String partialKey = null;
-      PartialListing listing = null;
-
-      try {
-        listing  = store.list(key, AZURE_LIST_ALL, 1, partialKey);
-      } catch (IOException ex) {
-
-        Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
-
-        if (innerException instanceof StorageException
-            && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+    if (meta == null) {
+      // There is no metadata found for the path.
+      LOG.debug("Did not find any metadata for path: {}", key);
+      throw new FileNotFoundException(f + " is not found");
+    }
 
-            throw new FileNotFoundException(String.format("%s is not found", key));
-        }
+    if (!meta.isDirectory()) {
+      LOG.debug("Found path as a file");
+      return new FileStatus[] { updateFileStatusPath(meta, absolutePath) };
+    }
 
-        throw ex;
-      }
-      // NOTE: We don't check for Null condition as the Store API should return
-      // an empty list if there are not listing.
+    FileMetadata[] listing;
 
-      // For any -RenamePending.json files in the listing,
-      // push the rename forward.
-      boolean renamed = conditionalRedoFolderRenames(listing);
+    listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
 
-      // If any renames were redone, get another listing,
-      // since the current one may have changed due to the redo.
-      if (renamed) {
-       listing = null;
-       try {
-         listing = store.list(key, AZURE_LIST_ALL, 1, partialKey);
-       } catch (IOException ex) {
-         Throwable innerException = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+    // NOTE: We don't check for Null condition as the Store API should return
+    // an empty list if there are not listing.
 
-         if (innerException instanceof StorageException
-             && NativeAzureFileSystemHelper.isFileNotFoundException((StorageException) innerException)) {
+    // For any -RenamePending.json files in the listing,
+    // push the rename forward.
+    boolean renamed = conditionalRedoFolderRenames(listing);
 
-           throw new FileNotFoundException(String.format("%s is not found", key));
-         }
+    // If any renames were redone, get another listing,
+    // since the current one may have changed due to the redo.
+    if (renamed) {
+      listing = listWithErrorHandling(key, AZURE_LIST_ALL, 1);
+    }
 
-         throw ex;
-       }
-      }
+    // We only need to check for AZURE_TEMP_FOLDER if the key is the root,
+    // and if it is not the root we also know the exact size of the array
+    // of FileStatus.
 
-      // NOTE: We don't check for Null condition as the Store API should return
-      // and empty list if there are not listing.
+    FileMetadata[] result = null;
 
-      for (FileMetadata fileMetadata : listing.getFiles()) {
-        Path subpath = keyToPath(fileMetadata.getKey());
+    if (key.equals("/")) {
+      ArrayList<FileMetadata> status = new ArrayList<>(listing.length);
 
-        // Test whether the metadata represents a file or directory and
-        // add the appropriate metadata object.
-        //
-        // Note: There was a very old bug here where directories were added
-        // to the status set as files flattening out recursive listings
-        // using "-lsr" down the file system hierarchy.
-        if (fileMetadata.isDir()) {
+      for (FileMetadata fileMetadata : listing) {
+        if (fileMetadata.isDirectory()) {
           // Make sure we hide the temp upload folder
           if (fileMetadata.getKey().equals(AZURE_TEMP_FOLDER)) {
             // Don't expose that.
             continue;
           }
-          status.add(newDirectory(fileMetadata, subpath));
+          status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
         } else {
-          status.add(newFile(fileMetadata, subpath));
+          status.add(updateFileStatusPath(fileMetadata, fileMetadata.getPath()));
         }
       }
+      result = status.toArray(new FileMetadata[0]);
+    } else {
+      for (int i = 0; i < listing.length; i++) {
+        FileMetadata fileMetadata = listing[i];
+          listing[i] = updateFileStatusPath(fileMetadata, fileMetadata.getPath());
+      }
+      result = listing;
+    }
 
-      LOG.debug("Found path as a directory with {}"
-          + " files in it.", status.size());
+    LOG.debug("Found path as a directory with {}"
+        + " files in it.", result.length);
 
-    } else {
-      // There is no metadata found for the path.
-      LOG.debug("Did not find any metadata for path: {}", key);
+    return result;
+  }
 
-      throw new FileNotFoundException(f + " is not found");
+  private FileMetadata[] listWithErrorHandling(String prefix, final int maxListingCount,
+                                              final int maxListingDepth) throws IOException {
+    try {
+      return store.list(prefix, maxListingCount, maxListingDepth);
+    } catch (IOException ex) {
+      Throwable innerException
+          = NativeAzureFileSystemHelper.checkForAzureStorageException(ex);
+      if (innerException instanceof StorageException
+          && NativeAzureFileSystemHelper.isFileNotFoundException(
+          (StorageException) innerException)) {
+        throw new FileNotFoundException(String.format("%s is not found", prefix));
+      }
+      throw ex;
     }
-
-    return status.toArray(new FileStatus[0]);
   }
 
   // Redo any folder renames needed if there are rename pending files in the
   // directory listing. Return true if one or more redo operations were done.
-  private boolean conditionalRedoFolderRenames(PartialListing listing)
+  private boolean conditionalRedoFolderRenames(FileMetadata[] listing)
       throws IllegalArgumentException, IOException {
     boolean renamed = false;
-    for (FileMetadata fileMetadata : listing.getFiles()) {
-      Path subpath = keyToPath(fileMetadata.getKey());
+    for (FileMetadata fileMetadata : listing) {
+      Path subpath = fileMetadata.getPath();
       if (isRenamePendingFile(subpath)) {
         FolderRenamePending pending =
             new FolderRenamePending(subpath, this);
@@ -2914,32 +2869,11 @@ public class NativeAzureFileSystem extends FileSystem {
     return path.toString().endsWith(FolderRenamePending.SUFFIX);
   }
 
-  private FileStatus newFile(FileMetadata meta, Path path) {
-    return new FileStatus (
-        meta.getLength(),
-        false,
-        1,
-        blockSize,
-        meta.getLastModified(),
-        0,
-        meta.getPermissionStatus().getPermission(),
-        meta.getPermissionStatus().getUserName(),
-        meta.getPermissionStatus().getGroupName(),
-        path.makeQualified(getUri(), getWorkingDirectory()));
-  }
-
-  private FileStatus newDirectory(FileMetadata meta, Path path) {
-    return new FileStatus (
-        0,
-        true,
-        1,
-        blockSize,
-        meta == null ? 0 : meta.getLastModified(),
-        0,
-        meta == null ? FsPermission.getDefault() : meta.getPermissionStatus().getPermission(),
-        meta == null ? "" : meta.getPermissionStatus().getUserName(),
-        meta == null ? "" : meta.getPermissionStatus().getGroupName(),
-        path.makeQualified(getUri(), getWorkingDirectory()));
+  private FileMetadata updateFileStatusPath(FileMetadata meta, Path path) {
+    meta.setPath(path.makeQualified(getUri(), getWorkingDirectory()));
+    // reduce memory use by setting the internal-only key to null
+    meta.removeKey();
+    return meta;
   }
 
   private static enum UMaskApplyMode {
@@ -3000,8 +2934,8 @@ public class NativeAzureFileSystem extends FileSystem {
 
       String currentKey = pathToKey(current);
       FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
-      if (currentMetadata != null && currentMetadata.isDir()) {
-        Path ancestor = keyToPath(currentMetadata.getKey());
+      if (currentMetadata != null && currentMetadata.isDirectory()) {
+        Path ancestor = currentMetadata.getPath();
         LOG.debug("Found ancestor {}, for path: {}", ancestor.toString(), f.toString());
         return ancestor;
       }
@@ -3052,7 +2986,7 @@ public class NativeAzureFileSystem extends FileSystem {
         current = parent, parent = current.getParent()) {
       String currentKey = pathToKey(current);
       FileMetadata currentMetadata = store.retrieveMetadata(currentKey);
-      if (currentMetadata != null && !currentMetadata.isDir()) {
+      if (currentMetadata != null && !currentMetadata.isDirectory()) {
         throw new FileAlreadyExistsException("Cannot create directory " + f + " because "
             + current + " is an existing file.");
       } else if (currentMetadata == null) {
@@ -3099,7 +3033,7 @@ public class NativeAzureFileSystem extends FileSystem {
     if (meta == null) {
       throw new FileNotFoundException(f.toString());
     }
-    if (meta.isDir()) {
+    if (meta.isDirectory()) {
       throw new FileNotFoundException(f.toString()
           + " is a directory not a file.");
     }
@@ -3120,7 +3054,7 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     return new FSDataInputStream(new BufferedFSInputStream(
-        new NativeAzureFsInputStream(inputStream, key, meta.getLength()), bufferSize));
+        new NativeAzureFsInputStream(inputStream, key, meta.getLen()), bufferSize));
   }
 
   @Override
@@ -3196,7 +3130,7 @@ public class NativeAzureFileSystem extends FileSystem {
       }
     }
 
-    if (dstMetadata != null && dstMetadata.isDir()) {
+    if (dstMetadata != null && dstMetadata.isDirectory()) {
       // It's an existing directory.
       performAuthCheck(absoluteDstPath, WasbAuthorizationOperations.WRITE, "rename",
           absoluteDstPath);
@@ -3232,7 +3166,7 @@ public class NativeAzureFileSystem extends FileSystem {
         LOG.debug("Parent of the destination {}"
             + " doesn't exist, failing the rename.", dst);
         return false;
-      } else if (!parentOfDestMetadata.isDir()) {
+      } else if (!parentOfDestMetadata.isDirectory()) {
         LOG.debug("Parent of the destination {}"
             + " is a file, failing the rename.", dst);
         return false;
@@ -3261,7 +3195,7 @@ public class NativeAzureFileSystem extends FileSystem {
       // Source doesn't exist
       LOG.debug("Source {} doesn't exist, failing the rename.", src);
       return false;
-    } else if (!srcMetadata.isDir()) {
+    } else if (!srcMetadata.isDirectory()) {
       LOG.debug("Source {} found as a file, renaming.", src);
       try {
         // HADOOP-15086 - file rename must ensure that the destination does
@@ -3335,7 +3269,7 @@ public class NativeAzureFileSystem extends FileSystem {
       // single file. In this case, the parent folder no longer exists if the
       // file is renamed; so we can safely ignore the null pointer case.
       if (parentMetadata != null) {
-        if (parentMetadata.isDir()
+        if (parentMetadata.isDirectory()
             && parentMetadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
           store.storeEmptyFolder(parentKey,
               createPermissionStatus(FsPermission.getDefault()));
@@ -3511,7 +3445,7 @@ public class NativeAzureFileSystem extends FileSystem {
           && !isAllowedUser(currentUgi.getShortUserName(), daemonUsers)) {
 
         //Check if the user is the owner of the file.
-        String owner = metadata.getPermissionStatus().getUserName();
+        String owner = metadata.getOwner();
         if (!currentUgi.getShortUserName().equals(owner)) {
           throw new WasbAuthorizationException(
               String.format("user '%s' does not have the privilege to "
@@ -3522,16 +3456,16 @@ public class NativeAzureFileSystem extends FileSystem {
     }
 
     permission = applyUMask(permission,
-        metadata.isDir() ? UMaskApplyMode.ChangeExistingDirectory
+        metadata.isDirectory() ? UMaskApplyMode.ChangeExistingDirectory
             : UMaskApplyMode.ChangeExistingFile);
     if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
       // It's an implicit folder, need to materialize it.
       store.storeEmptyFolder(key, createPermissionStatus(permission));
-    } else if (!metadata.getPermissionStatus().getPermission().
+    } else if (!metadata.getPermission().
         equals(permission)) {
       store.changePermissionStatus(key, new PermissionStatus(
-          metadata.getPermissionStatus().getUserName(),
-          metadata.getPermissionStatus().getGroupName(),
+          metadata.getOwner(),
+          metadata.getGroup(),
           permission));
     }
   }
@@ -3579,10 +3513,10 @@ public class NativeAzureFileSystem extends FileSystem {
 
     PermissionStatus newPermissionStatus = new PermissionStatus(
         username == null ?
-            metadata.getPermissionStatus().getUserName() : username,
+            metadata.getOwner() : username,
         groupname == null ?
-            metadata.getPermissionStatus().getGroupName() : groupname,
-        metadata.getPermissionStatus().getPermission());
+            metadata.getGroup() : groupname,
+        metadata.getPermission());
     if (metadata.getBlobMaterialization() == BlobMaterialization.Implicit) {
       // It's an implicit folder, need to materialize it.
       store.storeEmptyFolder(key, newPermissionStatus);
@@ -3778,30 +3712,26 @@ public class NativeAzureFileSystem extends FileSystem {
             AZURE_TEMP_EXPIRY_DEFAULT) * 1000;
     // Go over all the blobs under the given root and look for blobs to
     // recover.
-    String priorLastKey = null;
-    do {
-      PartialListing listing = store.listAll(pathToKey(root), AZURE_LIST_ALL,
-          AZURE_UNBOUNDED_DEPTH, priorLastKey);
-
-      for (FileMetadata file : listing.getFiles()) {
-        if (!file.isDir()) { // We don't recover directory blobs
-          // See if this blob has a link in it (meaning it's a place-holder
-          // blob for when the upload to the temp blob is complete).
-          String link = store.getLinkInFileMetadata(file.getKey());
-          if (link != null) {
-            // It has a link, see if the temp blob it is pointing to is
-            // existent and old enough to be considered dangling.
-            FileMetadata linkMetadata = store.retrieveMetadata(link);
-            if (linkMetadata != null
-                && linkMetadata.getLastModified() >= cutoffForDangling) {
-              // Found one!
-              handler.handleFile(file, linkMetadata);
-            }
+    FileMetadata[] listing = store.list(pathToKey(root), AZURE_LIST_ALL,
+        AZURE_UNBOUNDED_DEPTH);
+
+    for (FileMetadata file : listing) {
+      if (!file.isDirectory()) { // We don't recover directory blobs
+        // See if this blob has a link in it (meaning it's a place-holder
+        // blob for when the upload to the temp blob is complete).
+        String link = store.getLinkInFileMetadata(file.getKey());
+        if (link != null) {
+          // It has a link, see if the temp blob it is pointing to is
+          // existent and old enough to be considered dangling.
+          FileMetadata linkMetadata = store.retrieveMetadata(link);
+          if (linkMetadata != null
+              && linkMetadata.getModificationTime() >= cutoffForDangling) {
+            // Found one!
+            handler.handleFile(file, linkMetadata);
           }
         }
       }
-      priorLastKey = listing.getPriorLastKey();
-    } while (priorLastKey != null);
+    }
   }
 
   /**
@@ -3888,7 +3818,7 @@ public class NativeAzureFileSystem extends FileSystem {
       meta = store.retrieveMetadata(key);
 
       if (meta != null) {
-        owner = meta.getPermissionStatus().getUserName();
+        owner = meta.getOwner();
         LOG.debug("Retrieved '{}' as owner for path - {}", owner, absolutePath);
       } else {
         // meta will be null if file/folder doen not exist

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
index b67ab1b..36e3819 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java
@@ -58,20 +58,21 @@ interface NativeFileSystemStore {
 
   boolean isAtomicRenameKey(String key);
 
+  /**
+   * Returns the file block size.  This is a fake value used for integration
+   * of the Azure store with Hadoop.
+   * @return The file block size.
+   */
+  long getHadoopBlockSize();
+
   void storeEmptyLinkFile(String key, String tempBlobKey,
       PermissionStatus permissionStatus) throws AzureException;
 
   String getLinkInFileMetadata(String key) throws AzureException;
 
-  PartialListing list(String prefix, final int maxListingCount,
+  FileMetadata[] list(String prefix, final int maxListingCount,
       final int maxListingDepth) throws IOException;
 
-  PartialListing list(String prefix, final int maxListingCount,
-      final int maxListingDepth, String priorLastKey) throws IOException;
-
-  PartialListing listAll(String prefix, final int maxListingCount,
-      final int maxListingDepth, String priorLastKey) throws IOException;
-
   void changePermissionStatus(String key, PermissionStatus newPermission)
       throws AzureException;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
deleted file mode 100644
index 4a80d2e..0000000
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/PartialListing.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azure;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * <p>
- * Holds information on a directory listing for a {@link NativeFileSystemStore}.
- * This includes the {@link FileMetadata files} and directories (their names)
- * contained in a directory.
- * </p>
- * <p>
- * This listing may be returned in chunks, so a <code>priorLastKey</code> is
- * provided so that the next chunk may be requested.
- * </p>
- *
- * @see NativeFileSystemStore#list(String, int, String)
- */
-@InterfaceAudience.Private
-class PartialListing {
-
-  private final String priorLastKey;
-  private final FileMetadata[] files;
-  private final String[] commonPrefixes;
-
-  public PartialListing(String priorLastKey, FileMetadata[] files,
-      String[] commonPrefixes) {
-    this.priorLastKey = priorLastKey;
-    this.files = files;
-    this.commonPrefixes = commonPrefixes;
-  }
-
-  public FileMetadata[] getFiles() {
-    return files;
-  }
-
-  public String[] getCommonPrefixes() {
-    return commonPrefixes;
-  }
-
-  public String getPriorLastKey() {
-    return priorLastKey;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/45d9568a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java
new file mode 100644
index 0000000..e7a3fa8
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestListPerformance.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import org.junit.Assume;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.azure.integration.AbstractAzureScaleTest;
+import org.apache.hadoop.fs.azure.integration.AzureTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+
+/**
+ * Test list performance.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+
+public class ITestListPerformance extends AbstractAzureScaleTest {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ITestListPerformance.class);
+
+  private static final Path TEST_DIR_PATH = new Path(
+      "DirectoryWithManyFiles");
+
+  private static final int NUMBER_OF_THREADS = 10;
+  private static final int NUMBER_OF_FILES_PER_THREAD = 1000;
+
+  private int threads;
+
+  private int filesPerThread;
+
+  private int expectedFileCount;
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    Configuration conf = getConfiguration();
+    // fail fast
+    threads = AzureTestUtils.getTestPropertyInt(conf,
+        "fs.azure.scale.test.list.performance.threads", NUMBER_OF_THREADS);
+    filesPerThread = AzureTestUtils.getTestPropertyInt(conf,
+        "fs.azure.scale.test.list.performance.files", NUMBER_OF_FILES_PER_THREAD);
+    expectedFileCount = threads * filesPerThread;
+    LOG.info("Thread = {}, Files per Thread = {}, expected files = {}",
+        threads, filesPerThread, expectedFileCount);
+    conf.set("fs.azure.io.retry.max.retries", "1");
+    conf.set("fs.azure.delete.threads", "16");
+    createTestAccount();
+  }
+
+  @Override
+  protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
+    return AzureBlobStorageTestAccount.create(
+        "itestlistperformance",
+        EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
+        null,
+        true);
+  }
+
+  @Test
+  public void test_0101_CreateDirectoryWithFiles() throws Exception {
+    Assume.assumeFalse("Test path exists; skipping", fs.exists(TEST_DIR_PATH));
+
+    ExecutorService executorService = Executors.newFixedThreadPool(threads);
+    CloudBlobContainer container = testAccount.getRealContainer();
+
+    final String basePath = (fs.getWorkingDirectory().toUri().getPath() + "/" + TEST_DIR_PATH + "/").substring(1);
+
+    ArrayList<Callable<Integer>> tasks = new ArrayList<>(threads);
+    fs.mkdirs(TEST_DIR_PATH);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    for (int i = 0; i < threads; i++) {
+      tasks.add(
+          new Callable<Integer>() {
+            public Integer call() {
+              int written = 0;
+              for (int j = 0; j < filesPerThread; j++) {
+                String blobName = basePath + UUID.randomUUID().toString();
+                try {
+                  CloudBlockBlob blob = container.getBlockBlobReference(
+                      blobName);
+                  blob.uploadText("");
+                  written ++;
+                } catch (Exception e) {
+                  LOG.error("Filed to write {}", blobName, e);
+                  break;
+                }
+              }
+              LOG.info("Thread completed with {} files written", written);
+              return written;
+            }
+          }
+      );
+    }
+
+    List<Future<Integer>> futures = executorService.invokeAll(tasks,
+        getTestTimeoutMillis(), TimeUnit.MILLISECONDS);
+    long elapsedMs = timer.elapsedTimeMs();
+    LOG.info("time to create files: {} millis", elapsedMs);
+
+    for (Future<Integer> future : futures) {
+      assertTrue("Future timed out", future.isDone());
+      assertEquals("Future did not write all files timed out",
+          filesPerThread, future.get().intValue());
+    }
+  }
+
+  @Test
+  public void test_0200_ListStatusPerformance() throws Exception {
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    FileStatus[] fileList = fs.listStatus(TEST_DIR_PATH);
+    long elapsedMs = timer.elapsedTimeMs();
+    LOG.info(String.format(
+        "files=%1$d, elapsedMs=%2$d",
+        fileList.length,
+        elapsedMs));
+    Map<Path, FileStatus> foundInList =new HashMap<>(expectedFileCount);
+
+    for (FileStatus fileStatus : fileList) {
+      foundInList.put(fileStatus.getPath(), fileStatus);
+      LOG.info("{}: {}", fileStatus.getPath(),
+          fileStatus.isDirectory() ? "dir" : "file");
+    }
+    assertEquals("Mismatch between expected files and actual",
+        expectedFileCount, fileList.length);
+
+
+    // now do a listFiles() recursive
+    ContractTestUtils.NanoTimer initialStatusCallTimer
+        = new ContractTestUtils.NanoTimer();
+    RemoteIterator<LocatedFileStatus> listing
+        = fs.listFiles(TEST_DIR_PATH, true);
+    long initialListTime = initialStatusCallTimer.elapsedTimeMs();
+    timer = new ContractTestUtils.NanoTimer();
+    while (listing.hasNext()) {
+      FileStatus fileStatus = listing.next();
+      Path path = fileStatus.getPath();
+      FileStatus removed = foundInList.remove(path);
+      assertNotNull("Did not find "  + path + "{} in the previous listing",
+          removed);
+    }
+    elapsedMs = timer.elapsedTimeMs();
+    LOG.info("time for listFiles() initial call: {} millis;"
+        + " time to iterate: {} millis", initialListTime, elapsedMs);
+    assertEquals("Not all files from listStatus() were found in listFiles()",
+        0, foundInList.size());
+
+  }
+
+  @Test
+  public void test_0300_BulkDeletePerformance() throws Exception {
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    fs.delete(TEST_DIR_PATH,true);
+    long elapsedMs = timer.elapsedTimeMs();
+    LOG.info("time for delete(): {} millis; {} nanoS per file",
+        elapsedMs, timer.nanosPerOperation(expectedFileCount));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org