You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/08/04 16:13:24 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 251d2d1  HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)
251d2d1 is described below

commit 251d2d1fa56cb5b240211105e24e4f056cc60664
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Tue Aug 4 20:30:02 2020 +0530

    HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)
    
    Contributed by Mukund Thakur.
    
    Change-Id: I79160b236a92fdd67565a4b4974f1862e600c210
---
 .../java/org/apache/hadoop/fs/s3a/Listing.java     | 191 +++++++++++++++++++--
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 158 ++++++++---------
 .../fs/s3a/impl/ListingOperationCallbacks.java     | 119 +++++++++++++
 .../apache/hadoop/fs/s3a/impl/StoreContext.java    |   4 +
 .../fs/s3a/s3guard/DumpS3GuardDynamoTable.java     |   4 +-
 .../java/org/apache/hadoop/fs/s3a/TestListing.java |   2 +-
 .../fs/s3a/impl/TestPartialDeleteFailures.java     | 149 +++++++++++++++-
 7 files changed, 521 insertions(+), 106 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 9c2f67d..fcb4928 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -30,11 +30,22 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
+import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -46,25 +57,31 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
 import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
+import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash;
 import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
 import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
 import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey;
 
 /**
  * Place for the S3A listing classes; keeps all the small classes under control.
  */
 @InterfaceAudience.Private
-public class Listing {
+public class Listing extends AbstractStoreOperation {
 
-  private final S3AFileSystem owner;
   private static final Logger LOG = S3AFileSystem.LOG;
 
   static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
       new AcceptAllButS3nDirs();
 
-  public Listing(S3AFileSystem owner) {
-    this.owner = owner;
+  private final ListingOperationCallbacks listingOperationCallbacks;
+
+  public Listing(ListingOperationCallbacks listingOperationCallbacks,
+                 StoreContext storeContext) {
+    super(storeContext);
+    this.listingOperationCallbacks = listingOperationCallbacks;
   }
 
   /**
@@ -156,6 +173,145 @@ public class Listing {
     return new TombstoneReconcilingIterator(iterator, tombstones);
   }
 
+
+  /**
+   * List files under a path assuming the path to be a directory.
+   * @param path input path.
+   * @param recursive recursive listing?
+   * @param acceptor file status filter
+   * @param collectTombstones should tombstones be collected from S3Guard?
+   * @param forceNonAuthoritativeMS forces metadata store to act like non
+   *                                authoritative. This is useful when
+   *                                listFiles output is used by import tool.
+   * @return an iterator over listing.
+   * @throws IOException any exception.
+   */
+  public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
+          Path path,
+          boolean recursive, Listing.FileStatusAcceptor acceptor,
+          boolean collectTombstones,
+          boolean forceNonAuthoritativeMS) throws IOException {
+
+    String key = maybeAddTrailingSlash(pathToKey(path));
+    String delimiter = recursive ? null : "/";
+    LOG.debug("Requesting all entries under {} with delimiter '{}'",
+            key, delimiter);
+    final RemoteIterator<S3AFileStatus> cachedFilesIterator;
+    final Set<Path> tombstones;
+    boolean allowAuthoritative = listingOperationCallbacks
+            .allowAuthoritative(path);
+    if (recursive) {
+      final PathMetadata pm = getStoreContext()
+              .getMetadataStore()
+              .get(path, true);
+      if (pm != null) {
+        if (pm.isDeleted()) {
+          OffsetDateTime deletedAt = OffsetDateTime
+                  .ofInstant(Instant.ofEpochMilli(
+                          pm.getFileStatus().getModificationTime()),
+                          ZoneOffset.UTC);
+          throw new FileNotFoundException("Path " + path + " is recorded as " +
+                  "deleted by S3Guard at " + deletedAt);
+        }
+      }
+      MetadataStoreListFilesIterator metadataStoreListFilesIterator =
+              new MetadataStoreListFilesIterator(
+                      getStoreContext().getMetadataStore(),
+                      pm,
+                      allowAuthoritative);
+      tombstones = metadataStoreListFilesIterator.listTombstones();
+      // if all of the below is true
+      //  - authoritative access is allowed for this metadatastore
+      //  for this directory,
+      //  - all the directory listings are authoritative on the client
+      //  - the caller does not force non-authoritative access
+      // return the listing without any further s3 access
+      if (!forceNonAuthoritativeMS &&
+              allowAuthoritative &&
+              metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
+        S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
+                metadataStoreListFilesIterator, tombstones);
+        cachedFilesIterator = createProvidedFileStatusIterator(
+                statuses, ACCEPT_ALL, acceptor);
+        return createLocatedFileStatusIterator(cachedFilesIterator);
+      }
+      cachedFilesIterator = metadataStoreListFilesIterator;
+    } else {
+      DirListingMetadata meta =
+              S3Guard.listChildrenWithTtl(
+                      getStoreContext().getMetadataStore(),
+                      path,
+                      listingOperationCallbacks.getUpdatedTtlTimeProvider(),
+                      allowAuthoritative);
+      if (meta != null) {
+        tombstones = meta.listTombstones();
+      } else {
+        tombstones = null;
+      }
+      cachedFilesIterator = createProvidedFileStatusIterator(
+              S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
+      if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
+        // metadata listing is authoritative, so return it directly
+        return createLocatedFileStatusIterator(cachedFilesIterator);
+      }
+    }
+    return createTombstoneReconcilingIterator(
+            createLocatedFileStatusIterator(
+                    createFileStatusListingIterator(path,
+                                    listingOperationCallbacks
+                                    .createListObjectsRequest(key, delimiter),
+                            ACCEPT_ALL,
+                            acceptor,
+                            cachedFilesIterator)),
+            collectTombstones ? tombstones : null);
+  }
+
+  /**
+   * Generate list located status for a directory.
+   * Also performing tombstone reconciliation for guarded directories.
+   * @param dir directory to check.
+   * @param filter a path filter.
+   * @return an iterator that traverses statuses of the given dir.
+   * @throws IOException in case of failure.
+   */
+  public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
+          Path dir, PathFilter filter) throws IOException {
+    final String key = maybeAddTrailingSlash(pathToKey(dir));
+    final Listing.FileStatusAcceptor acceptor =
+            new Listing.AcceptAllButSelfAndS3nDirs(dir);
+    boolean allowAuthoritative = listingOperationCallbacks
+            .allowAuthoritative(dir);
+    DirListingMetadata meta =
+            S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
+                    dir,
+                    listingOperationCallbacks
+                            .getUpdatedTtlTimeProvider(),
+                    allowAuthoritative);
+    Set<Path> tombstones = meta != null
+            ? meta.listTombstones()
+            : null;
+    final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
+            createProvidedFileStatusIterator(
+                    S3Guard.dirMetaToStatuses(meta), filter, acceptor);
+    return (allowAuthoritative && meta != null
+            && meta.isAuthoritative())
+            ? createLocatedFileStatusIterator(
+            cachedFileStatusIterator)
+            : createTombstoneReconcilingIterator(
+            createLocatedFileStatusIterator(
+                    createFileStatusListingIterator(dir,
+                            listingOperationCallbacks
+                                    .createListObjectsRequest(key, "/"),
+                            filter,
+                            acceptor,
+                            cachedFileStatusIterator)),
+            tombstones);
+  }
+
+  public S3ListRequest createListObjectsRequest(String key, String delimiter) {
+    return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
+  }
+
   /**
    * Interface to implement by the logic deciding whether to accept a summary
    * entry or path as a valid file or directory.
@@ -193,9 +349,9 @@ public class Listing {
    * value.
    *
    * If the status value is null, the iterator declares that it has no data.
-   * This iterator is used to handle {@link S3AFileSystem#listStatus} calls
-   * where the path handed in refers to a file, not a directory: this is the
-   * iterator returned.
+   * This iterator is used to handle {@link S3AFileSystem#listStatus(Path)}
+   * calls where the path handed in refers to a file, not a directory:
+   * this is the iterator returned.
    */
   static final class SingleStatusRemoteIterator
       implements RemoteIterator<S3ALocatedFileStatus> {
@@ -465,14 +621,15 @@ public class Listing {
       // objects
       for (S3ObjectSummary summary : objects.getObjectSummaries()) {
         String key = summary.getKey();
-        Path keyPath = owner.keyToQualifiedPath(key);
+        Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: {}", keyPath, stringify(summary));
         }
         // Skip over keys that are ourselves and old S3N _$folder$ files
         if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
           S3AFileStatus status = createFileStatus(keyPath, summary,
-              owner.getDefaultBlockSize(keyPath), owner.getUsername(),
+                  listingOperationCallbacks.getDefaultBlockSize(keyPath),
+                  getStoreContext().getUsername(),
               summary.getETag(), null);
           LOG.debug("Adding: {}", status);
           stats.add(status);
@@ -485,10 +642,12 @@ public class Listing {
 
       // prefixes: always directories
       for (String prefix : objects.getCommonPrefixes()) {
-        Path keyPath = owner.keyToQualifiedPath(prefix);
+        Path keyPath = getStoreContext()
+                .getContextAccessors()
+                .keyToPath(prefix);
         if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
           S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
-              owner.getUsername());
+              getStoreContext().getUsername());
           LOG.debug("Adding directory: {}", status);
           added++;
           stats.add(status);
@@ -573,8 +732,8 @@ public class Listing {
         Path listPath,
         S3ListRequest request) throws IOException {
       this.listPath = listPath;
-      this.maxKeys = owner.getMaxKeys();
-      this.objects = owner.listObjects(request);
+      this.maxKeys = listingOperationCallbacks.getMaxKeys();
+      this.objects = listingOperationCallbacks.listObjects(request);
       this.request = request;
     }
 
@@ -616,7 +775,8 @@ public class Listing {
           // need to request a new set of objects.
           LOG.debug("[{}], Requesting next {} objects under {}",
               listingCount, maxKeys, listPath);
-          objects = owner.continueListObjects(request, objects);
+          objects = listingOperationCallbacks
+                  .continueListObjects(request, objects);
           listingCount++;
           LOG.debug("New listing status: {}", this);
         } catch (AmazonClientException e) {
@@ -716,7 +876,8 @@ public class Listing {
 
     @Override
     public S3ALocatedFileStatus next() throws IOException {
-      return owner.toLocatedFileStatus(statusIterator.next());
+      return listingOperationCallbacks
+              .toLocatedFileStatus(statusIterator.next());
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 286df44..2cd2325 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
 import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
 import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
 import org.apache.hadoop.fs.s3a.impl.InternalConstants;
+import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
 import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.RenameOperation;
@@ -148,7 +149,6 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
 import org.apache.hadoop.fs.s3a.select.SelectBinding;
 import org.apache.hadoop.fs.s3a.select.SelectConstants;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
@@ -293,6 +293,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private final S3AFileSystem.OperationCallbacksImpl
       operationCallbacks = new OperationCallbacksImpl();
 
+  private final ListingOperationCallbacks listingOperationCallbacks =
+          new ListingOperationCallbacksImpl();
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -362,7 +365,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           FAIL_ON_METADATA_WRITE_ERROR_DEFAULT);
 
       maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
-      listing = new Listing(this);
       partSize = getMultipartSizeProperty(conf,
           MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
       multiPartThreshold = getMultipartSizeProperty(conf,
@@ -455,6 +457,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
 
       pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
           BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
+      listing = new Listing(listingOperationCallbacks, createStoreContext());
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
       stopAllServices();
@@ -590,6 +593,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
+   * Get current listing instance.
+   * @return this instance's listing.
+   */
+  public Listing getListing() {
+    return listing;
+  }
+
+  /**
    * Set up the client bindings.
    * If delegation tokens are enabled, the FS first looks for a DT
    * ahead of any other bindings;.
@@ -1599,6 +1610,61 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
   }
 
+  protected class ListingOperationCallbacksImpl implements
+          ListingOperationCallbacks {
+
+    @Override
+    @Retries.RetryRaw
+    public S3ListResult listObjects(
+            S3ListRequest request)
+            throws IOException {
+      return S3AFileSystem.this.listObjects(request);
+    }
+
+    @Override
+    @Retries.RetryRaw
+    public S3ListResult continueListObjects(
+            S3ListRequest request,
+            S3ListResult prevResult)
+            throws IOException {
+      return S3AFileSystem.this.continueListObjects(request, prevResult);
+    }
+
+    @Override
+    public S3ALocatedFileStatus toLocatedFileStatus(
+            S3AFileStatus status)
+            throws IOException {
+      return S3AFileSystem.this.toLocatedFileStatus(status);
+    }
+
+    @Override
+    public S3ListRequest createListObjectsRequest(
+            String key,
+            String delimiter) {
+      return S3AFileSystem.this.createListObjectsRequest(key, delimiter);
+    }
+
+    @Override
+    public long getDefaultBlockSize(Path path) {
+      return S3AFileSystem.this.getDefaultBlockSize(path);
+    }
+
+    @Override
+    public int getMaxKeys() {
+      return S3AFileSystem.this.getMaxKeys();
+    }
+
+    @Override
+    public ITtlTimeProvider getUpdatedTtlTimeProvider() {
+      return S3AFileSystem.this.ttlTimeProvider;
+    }
+
+    @Override
+    public boolean allowAuthoritative(final Path p) {
+      return S3AFileSystem.this.allowAuthoritative(p);
+    }
+  }
+
   /**
    * Low-level call to get at the object metadata.
    * @param path path to the object
@@ -4216,7 +4282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // Assuming the path to be a directory
       // do a bulk operation.
       RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
-              getListFilesAssumingDir(path,
+              listing.getListFilesAssumingDir(path,
                       recursive,
                       acceptor,
                       collectTombstones,
@@ -4243,89 +4309,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   }
 
   /**
-   * List files under a path assuming the path to be a directory.
-   * @param path input path.
-   * @param recursive recursive listing?
-   * @param acceptor file status filter
-   * @param collectTombstones should tombstones be collected from S3Guard?
-   * @param forceNonAuthoritativeMS forces metadata store to act like non
-   *                                authoritative. This is useful when
-   *                                listFiles output is used by import tool.
-   * @return an iterator over listing.
-   * @throws IOException any exception.
-   */
-  private RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
-          Path path,
-          boolean recursive, Listing.FileStatusAcceptor acceptor,
-          boolean collectTombstones,
-          boolean forceNonAuthoritativeMS) throws IOException {
-
-    String key = maybeAddTrailingSlash(pathToKey(path));
-    String delimiter = recursive ? null : "/";
-    LOG.debug("Requesting all entries under {} with delimiter '{}'",
-        key, delimiter);
-    final RemoteIterator<S3AFileStatus> cachedFilesIterator;
-    final Set<Path> tombstones;
-    boolean allowAuthoritative = allowAuthoritative(path);
-    if (recursive) {
-      final PathMetadata pm = metadataStore.get(path, true);
-      if (pm != null) {
-        if (pm.isDeleted()) {
-          OffsetDateTime deletedAt = OffsetDateTime
-                  .ofInstant(Instant.ofEpochMilli(
-                          pm.getFileStatus().getModificationTime()),
-                          ZoneOffset.UTC);
-          throw new FileNotFoundException("Path " + path + " is recorded as " +
-                  "deleted by S3Guard at " + deletedAt);
-        }
-      }
-      MetadataStoreListFilesIterator metadataStoreListFilesIterator =
-          new MetadataStoreListFilesIterator(metadataStore, pm,
-              allowAuthoritative);
-      tombstones = metadataStoreListFilesIterator.listTombstones();
-      // if all of the below is true
-      //  - authoritative access is allowed for this metadatastore
-      //  for this directory,
-      //  - all the directory listings are authoritative on the client
-      //  - the caller does not force non-authoritative access
-      // return the listing without any further s3 access
-      if (!forceNonAuthoritativeMS &&
-          allowAuthoritative &&
-          metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
-        S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
-            metadataStoreListFilesIterator, tombstones);
-        cachedFilesIterator = listing.createProvidedFileStatusIterator(
-            statuses, ACCEPT_ALL, acceptor);
-        return listing.createLocatedFileStatusIterator(cachedFilesIterator);
-      }
-      cachedFilesIterator = metadataStoreListFilesIterator;
-    } else {
-      DirListingMetadata meta =
-          S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
-              allowAuthoritative);
-      if (meta != null) {
-        tombstones = meta.listTombstones();
-      } else {
-        tombstones = null;
-      }
-      cachedFilesIterator = listing.createProvidedFileStatusIterator(
-          S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
-      if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
-        // metadata listing is authoritative, so return it directly
-        return listing.createLocatedFileStatusIterator(cachedFilesIterator);
-      }
-    }
-    return listing.createTombstoneReconcilingIterator(
-        listing.createLocatedFileStatusIterator(
-            listing.createFileStatusListingIterator(path,
-                createListObjectsRequest(key, delimiter),
-                ACCEPT_ALL,
-                acceptor,
-                cachedFilesIterator)),
-        collectTombstones ? tombstones : null);
-  }
-
-  /**
    * Override superclass so as to add statistic collection.
    * {@inheritDoc}
    */
@@ -4363,7 +4346,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
             // trigger a list call directly.
             final RemoteIterator<S3ALocatedFileStatus>
                     locatedFileStatusIteratorForDir =
-                    getLocatedFileStatusIteratorForDir(path, filter);
+                    listing.getLocatedFileStatusIteratorForDir(path, filter);
 
             // If no listing is present then path might be a file.
             if (!locatedFileStatusIteratorForDir.hasNext()) {
@@ -4847,5 +4830,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     public Path makeQualified(final Path path) {
       return S3AFileSystem.this.makeQualified(path);
     }
+
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java
new file mode 100644
index 0000000..d89cada
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3ListRequest;
+import org.apache.hadoop.fs.s3a.S3ListResult;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+
+/**
+ * These are all the callbacks which
+ * {@link org.apache.hadoop.fs.s3a.Listing} operations
+ * need, derived from the actual appropriate S3AFileSystem
+ * methods.
+ */
+public interface ListingOperationCallbacks {
+
+  /**
+   * Initiate a {@code listObjects} operation, incrementing metrics
+   * in the process.
+   *
+   * Retry policy: retry untranslated.
+   * @param request request to initiate
+   * @return the results
+   * @throws IOException if the retry invocation raises one (it shouldn't).
+   */
+  @Retries.RetryRaw
+  S3ListResult listObjects(
+          S3ListRequest request)
+          throws IOException;
+
+  /**
+   * List the next set of objects.
+   * Retry policy: retry untranslated.
+   * @param request last list objects request to continue
+   * @param prevResult last paged result to continue from
+   * @return the next result object
+   * @throws IOException none, just there for retryUntranslated.
+   */
+  @Retries.RetryRaw
+  S3ListResult continueListObjects(
+          S3ListRequest request,
+          S3ListResult prevResult)
+          throws IOException;
+
+  /**
+   * Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
+   * @param status file status
+   * @return a located status with block locations set up from this FS.
+   * @throws IOException IO Problems.
+   */
+  S3ALocatedFileStatus toLocatedFileStatus(
+          S3AFileStatus status)
+          throws IOException;
+  /**
+   * Create a {@code ListObjectsRequest} request against this bucket,
+   * with the maximum keys returned in a query set by
+   * {@link this.getMaxKeys()}.
+   * @param key key for request
+   * @param delimiter any delimiter
+   * @return the request
+   */
+  S3ListRequest createListObjectsRequest(
+          String key,
+          String delimiter);
+
+
+  /**
+   * Return the number of bytes that large input files should be optimally
+   * be split into to minimize I/O time.  The given path will be used to
+   * locate the actual filesystem.  The full path does not have to exist.
+   * @param path path of file
+   * @return the default block size for the path's filesystem
+   */
+  long getDefaultBlockSize(Path path);
+
+  /**
+   * Get the maximum key count.
+   * @return a value, valid after initialization
+   */
+  int getMaxKeys();
+
+  /**
+   * Get the updated time provider for the current fs instance.
+   * @return implementation of {@link ITtlTimeProvider}
+   */
+  ITtlTimeProvider getUpdatedTtlTimeProvider();
+
+  /**
+   * Is the path for this instance considered authoritative on the client,
+   * that is: will listing/status operations only be handled by the metastore,
+   * with no fallback to S3.
+   * @param p path
+   * @return true iff the path is authoritative on the client.
+   */
+  boolean allowAuthoritative(Path p);
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
index e307c8d..cafa22f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
@@ -210,6 +210,10 @@ public class StoreContext {
     return useListV1;
   }
 
+  public ContextAccessors getContextAccessors() {
+    return contextAccessors;
+  }
+
   /**
    * Convert a key to a fully qualified path.
    * @param key input key
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java
index 536481a..b5f29cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java
@@ -348,8 +348,8 @@ public class DumpS3GuardDynamoTable extends AbstractS3GuardDynamoDBDiagnostic {
       final CsvFile csv) throws IOException {
     S3AFileSystem fs = getFilesystem();
     Path rootPath = fs.qualify(new Path("/"));
-    Listing listing = new Listing(fs);
-    S3ListRequest request = fs.createListObjectsRequest("", null);
+    Listing listing = fs.getListing();
+    S3ListRequest request = listing.createListObjectsRequest("", null);
     long count = 0;
     RemoteIterator<S3AFileStatus> st =
         listing.createFileStatusListingIterator(rootPath, request,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
index 1a533bf..3472674 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
@@ -68,7 +68,7 @@ public class TestListing extends AbstractS3AMockTest {
     Path[] allFiles = {parent, liveChild, deletedChild};
     Path[] liveFiles = {parent, liveChild};
 
-    Listing listing = new Listing(fs);
+    Listing listing = fs.getListing();
     Collection<FileStatus> statuses = new ArrayList<>();
     statuses.add(blankFileStatus(parent));
     statuses.add(blankFileStatus(liveChild));
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index c9d872e..7fa03a1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -32,8 +32,11 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
 import com.google.common.collect.Lists;
 import org.assertj.core.api.Assertions;
 import org.junit.Before;
@@ -42,14 +45,21 @@ import org.junit.Test;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.Invoker;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AInputPolicy;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3ListRequest;
+import org.apache.hadoop.fs.s3a.S3ListResult;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
 import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
@@ -230,6 +240,142 @@ public class TestPartialDeleteFailures {
         .build();
   }
 
+  private static class MinimalListingOperationCallbacks
+          implements ListingOperationCallbacks {
+    @Override
+    public S3ListResult listObjects(S3ListRequest request)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public S3ListResult continueListObjects(
+            S3ListRequest request,
+            S3ListResult prevResult)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public S3ALocatedFileStatus toLocatedFileStatus(
+            S3AFileStatus status) throws IOException {
+      return null;
+    }
+
+    @Override
+    public S3ListRequest createListObjectsRequest(
+            String key,
+            String delimiter) {
+      return null;
+    }
+
+    @Override
+    public long getDefaultBlockSize(Path path) {
+      return 0;
+    }
+
+    @Override
+    public int getMaxKeys() {
+      return 0;
+    }
+
+    @Override
+    public ITtlTimeProvider getUpdatedTtlTimeProvider() {
+      return null;
+    }
+
+    @Override
+    public boolean allowAuthoritative(Path p) {
+      return false;
+    }
+  }
+  private static class MinimalOperationCallbacks
+          implements OperationCallbacks {
+    @Override
+    public S3ObjectAttributes createObjectAttributes(
+            Path path,
+            String eTag,
+            String versionId,
+            long len) {
+      return null;
+    }
+
+    @Override
+    public S3ObjectAttributes createObjectAttributes(
+            S3AFileStatus fileStatus) {
+      return null;
+    }
+
+    @Override
+    public S3AReadOpContext createReadContext(
+            FileStatus fileStatus) {
+      return null;
+    }
+
+    @Override
+    public void finishRename(
+            Path sourceRenamed,
+            Path destCreated)
+            throws IOException {
+
+    }
+
+    @Override
+    public void deleteObjectAtPath(
+            Path path,
+            String key,
+            boolean isFile,
+            BulkOperationState operationState)
+            throws IOException {
+
+    }
+
+    @Override
+    public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
+            Path path,
+            S3AFileStatus status,
+            boolean collectTombstones,
+            boolean includeSelf)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public CopyResult copyFile(
+            String srcKey,
+            String destKey,
+            S3ObjectAttributes srcAttributes,
+            S3AReadOpContext readContext)
+            throws IOException {
+      return null;
+    }
+
+    @Override
+    public DeleteObjectsResult removeKeys(
+            List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+            boolean deleteFakeDir,
+            List<Path> undeletedObjectsOnFailure,
+            BulkOperationState operationState,
+            boolean quiet)
+            throws MultiObjectDeleteException, AmazonClientException,
+            IOException {
+      return null;
+    }
+
+    @Override
+    public boolean allowAuthoritative(Path p) {
+      return false;
+    }
+
+    @Override
+    public RemoteIterator<S3AFileStatus> listObjects(
+            Path path,
+            String key)
+            throws IOException {
+      return null;
+    }
+  }
+
   private static class MinimalContextAccessor implements ContextAccessors {
 
     @Override
@@ -333,7 +479,8 @@ public class TestPartialDeleteFailures {
 
     @Override
     public void deletePaths(final Collection<Path> paths,
-        @Nullable final BulkOperationState operationState) throws IOException {
+        @Nullable final BulkOperationState operationState)
+            throws IOException {
       deleted.addAll(paths);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org