You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/08/04 16:13:24 UTC
[hadoop] branch branch-3.3 updated: HADOOP-17131. Refactor S3A
Listing code for better isolation. (#2148)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 251d2d1 HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)
251d2d1 is described below
commit 251d2d1fa56cb5b240211105e24e4f056cc60664
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Tue Aug 4 20:30:02 2020 +0530
HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)
Contributed by Mukund Thakur.
Change-Id: I79160b236a92fdd67565a4b4974f1862e600c210
---
.../java/org/apache/hadoop/fs/s3a/Listing.java | 191 +++++++++++++++++++--
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 158 ++++++++---------
.../fs/s3a/impl/ListingOperationCallbacks.java | 119 +++++++++++++
.../apache/hadoop/fs/s3a/impl/StoreContext.java | 4 +
.../fs/s3a/s3guard/DumpS3GuardDynamoTable.java | 4 +-
.../java/org/apache/hadoop/fs/s3a/TestListing.java | 2 +-
.../fs/s3a/impl/TestPartialDeleteFailures.java | 149 +++++++++++++++-
7 files changed, 521 insertions(+), 106 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 9c2f67d..fcb4928 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -30,11 +30,22 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
+import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
+import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -46,25 +57,31 @@ import java.util.NoSuchElementException;
import java.util.Set;
import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
+import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash;
import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey;
/**
* Place for the S3A listing classes; keeps all the small classes under control.
*/
@InterfaceAudience.Private
-public class Listing {
+public class Listing extends AbstractStoreOperation {
- private final S3AFileSystem owner;
private static final Logger LOG = S3AFileSystem.LOG;
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();
- public Listing(S3AFileSystem owner) {
- this.owner = owner;
+ private final ListingOperationCallbacks listingOperationCallbacks;
+
+ public Listing(ListingOperationCallbacks listingOperationCallbacks,
+ StoreContext storeContext) {
+ super(storeContext);
+ this.listingOperationCallbacks = listingOperationCallbacks;
}
/**
@@ -156,6 +173,145 @@ public class Listing {
return new TombstoneReconcilingIterator(iterator, tombstones);
}
+
+ /**
+ * List files under a path assuming the path to be a directory.
+ * @param path input path.
+ * @param recursive recursive listing?
+ * @param acceptor file status filter
+ * @param collectTombstones should tombstones be collected from S3Guard?
+ * @param forceNonAuthoritativeMS forces metadata store to act like non
+ * authoritative. This is useful when
+ * listFiles output is used by import tool.
+ * @return an iterator over listing.
+ * @throws IOException any exception.
+ */
+ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
+ Path path,
+ boolean recursive, Listing.FileStatusAcceptor acceptor,
+ boolean collectTombstones,
+ boolean forceNonAuthoritativeMS) throws IOException {
+
+ String key = maybeAddTrailingSlash(pathToKey(path));
+ String delimiter = recursive ? null : "/";
+ LOG.debug("Requesting all entries under {} with delimiter '{}'",
+ key, delimiter);
+ final RemoteIterator<S3AFileStatus> cachedFilesIterator;
+ final Set<Path> tombstones;
+ boolean allowAuthoritative = listingOperationCallbacks
+ .allowAuthoritative(path);
+ if (recursive) {
+ final PathMetadata pm = getStoreContext()
+ .getMetadataStore()
+ .get(path, true);
+ if (pm != null) {
+ if (pm.isDeleted()) {
+ OffsetDateTime deletedAt = OffsetDateTime
+ .ofInstant(Instant.ofEpochMilli(
+ pm.getFileStatus().getModificationTime()),
+ ZoneOffset.UTC);
+ throw new FileNotFoundException("Path " + path + " is recorded as " +
+ "deleted by S3Guard at " + deletedAt);
+ }
+ }
+ MetadataStoreListFilesIterator metadataStoreListFilesIterator =
+ new MetadataStoreListFilesIterator(
+ getStoreContext().getMetadataStore(),
+ pm,
+ allowAuthoritative);
+ tombstones = metadataStoreListFilesIterator.listTombstones();
+ // if all of the below is true
+ // - authoritative access is allowed for this metadatastore
+ // for this directory,
+ // - all the directory listings are authoritative on the client
+ // - the caller does not force non-authoritative access
+ // return the listing without any further s3 access
+ if (!forceNonAuthoritativeMS &&
+ allowAuthoritative &&
+ metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
+ S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
+ metadataStoreListFilesIterator, tombstones);
+ cachedFilesIterator = createProvidedFileStatusIterator(
+ statuses, ACCEPT_ALL, acceptor);
+ return createLocatedFileStatusIterator(cachedFilesIterator);
+ }
+ cachedFilesIterator = metadataStoreListFilesIterator;
+ } else {
+ DirListingMetadata meta =
+ S3Guard.listChildrenWithTtl(
+ getStoreContext().getMetadataStore(),
+ path,
+ listingOperationCallbacks.getUpdatedTtlTimeProvider(),
+ allowAuthoritative);
+ if (meta != null) {
+ tombstones = meta.listTombstones();
+ } else {
+ tombstones = null;
+ }
+ cachedFilesIterator = createProvidedFileStatusIterator(
+ S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
+ if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
+ // metadata listing is authoritative, so return it directly
+ return createLocatedFileStatusIterator(cachedFilesIterator);
+ }
+ }
+ return createTombstoneReconcilingIterator(
+ createLocatedFileStatusIterator(
+ createFileStatusListingIterator(path,
+ listingOperationCallbacks
+ .createListObjectsRequest(key, delimiter),
+ ACCEPT_ALL,
+ acceptor,
+ cachedFilesIterator)),
+ collectTombstones ? tombstones : null);
+ }
+
+ /**
+ * Generate list located status for a directory.
+ * Also performing tombstone reconciliation for guarded directories.
+ * @param dir directory to check.
+ * @param filter a path filter.
+ * @return an iterator that traverses statuses of the given dir.
+ * @throws IOException in case of failure.
+ */
+ public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
+ Path dir, PathFilter filter) throws IOException {
+ final String key = maybeAddTrailingSlash(pathToKey(dir));
+ final Listing.FileStatusAcceptor acceptor =
+ new Listing.AcceptAllButSelfAndS3nDirs(dir);
+ boolean allowAuthoritative = listingOperationCallbacks
+ .allowAuthoritative(dir);
+ DirListingMetadata meta =
+ S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
+ dir,
+ listingOperationCallbacks
+ .getUpdatedTtlTimeProvider(),
+ allowAuthoritative);
+ Set<Path> tombstones = meta != null
+ ? meta.listTombstones()
+ : null;
+ final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
+ createProvidedFileStatusIterator(
+ S3Guard.dirMetaToStatuses(meta), filter, acceptor);
+ return (allowAuthoritative && meta != null
+ && meta.isAuthoritative())
+ ? createLocatedFileStatusIterator(
+ cachedFileStatusIterator)
+ : createTombstoneReconcilingIterator(
+ createLocatedFileStatusIterator(
+ createFileStatusListingIterator(dir,
+ listingOperationCallbacks
+ .createListObjectsRequest(key, "/"),
+ filter,
+ acceptor,
+ cachedFileStatusIterator)),
+ tombstones);
+ }
+
+ public S3ListRequest createListObjectsRequest(String key, String delimiter) {
+ return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
+ }
+
/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
@@ -193,9 +349,9 @@ public class Listing {
* value.
*
* If the status value is null, the iterator declares that it has no data.
- * This iterator is used to handle {@link S3AFileSystem#listStatus} calls
- * where the path handed in refers to a file, not a directory: this is the
- * iterator returned.
+ * This iterator is used to handle {@link S3AFileSystem#listStatus(Path)}
+ * calls where the path handed in refers to a file, not a directory:
+ * this is the iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<S3ALocatedFileStatus> {
@@ -465,14 +621,15 @@ public class Listing {
// objects
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
String key = summary.getKey();
- Path keyPath = owner.keyToQualifiedPath(key);
+ Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: {}", keyPath, stringify(summary));
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
- owner.getDefaultBlockSize(keyPath), owner.getUsername(),
+ listingOperationCallbacks.getDefaultBlockSize(keyPath),
+ getStoreContext().getUsername(),
summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
@@ -485,10 +642,12 @@ public class Listing {
// prefixes: always directories
for (String prefix : objects.getCommonPrefixes()) {
- Path keyPath = owner.keyToQualifiedPath(prefix);
+ Path keyPath = getStoreContext()
+ .getContextAccessors()
+ .keyToPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
- owner.getUsername());
+ getStoreContext().getUsername());
LOG.debug("Adding directory: {}", status);
added++;
stats.add(status);
@@ -573,8 +732,8 @@ public class Listing {
Path listPath,
S3ListRequest request) throws IOException {
this.listPath = listPath;
- this.maxKeys = owner.getMaxKeys();
- this.objects = owner.listObjects(request);
+ this.maxKeys = listingOperationCallbacks.getMaxKeys();
+ this.objects = listingOperationCallbacks.listObjects(request);
this.request = request;
}
@@ -616,7 +775,8 @@ public class Listing {
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
- objects = owner.continueListObjects(request, objects);
+ objects = listingOperationCallbacks
+ .continueListObjects(request, objects);
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (AmazonClientException e) {
@@ -716,7 +876,8 @@ public class Listing {
@Override
public S3ALocatedFileStatus next() throws IOException {
- return owner.toLocatedFileStatus(statusIterator.next());
+ return listingOperationCallbacks
+ .toLocatedFileStatus(statusIterator.next());
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 286df44..2cd2325 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
+import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
@@ -148,7 +149,6 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.select.SelectBinding;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
-import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
@@ -293,6 +293,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private final S3AFileSystem.OperationCallbacksImpl
operationCallbacks = new OperationCallbacksImpl();
+ private final ListingOperationCallbacks listingOperationCallbacks =
+ new ListingOperationCallbacksImpl();
+
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@@ -362,7 +365,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
FAIL_ON_METADATA_WRITE_ERROR_DEFAULT);
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
- listing = new Listing(this);
partSize = getMultipartSizeProperty(conf,
MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
multiPartThreshold = getMultipartSizeProperty(conf,
@@ -455,6 +457,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
+ listing = new Listing(listingOperationCallbacks, createStoreContext());
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
stopAllServices();
@@ -590,6 +593,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
/**
+ * Get current listing instance.
+ * @return this instance's listing.
+ */
+ public Listing getListing() {
+ return listing;
+ }
+
+ /**
* Set up the client bindings.
* If delegation tokens are enabled, the FS first looks for a DT
* ahead of any other bindings;.
@@ -1599,6 +1610,61 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
+ protected class ListingOperationCallbacksImpl implements
+ ListingOperationCallbacks {
+
+ @Override
+ @Retries.RetryRaw
+ public S3ListResult listObjects(
+ S3ListRequest request)
+ throws IOException {
+ return S3AFileSystem.this.listObjects(request);
+ }
+
+ @Override
+ @Retries.RetryRaw
+ public S3ListResult continueListObjects(
+ S3ListRequest request,
+ S3ListResult prevResult)
+ throws IOException {
+ return S3AFileSystem.this.continueListObjects(request, prevResult);
+ }
+
+ @Override
+ public S3ALocatedFileStatus toLocatedFileStatus(
+ S3AFileStatus status)
+ throws IOException {
+ return S3AFileSystem.this.toLocatedFileStatus(status);
+ }
+
+ @Override
+ public S3ListRequest createListObjectsRequest(
+ String key,
+ String delimiter) {
+ return S3AFileSystem.this.createListObjectsRequest(key, delimiter);
+ }
+
+ @Override
+ public long getDefaultBlockSize(Path path) {
+ return S3AFileSystem.this.getDefaultBlockSize(path);
+ }
+
+ @Override
+ public int getMaxKeys() {
+ return S3AFileSystem.this.getMaxKeys();
+ }
+
+ @Override
+ public ITtlTimeProvider getUpdatedTtlTimeProvider() {
+ return S3AFileSystem.this.ttlTimeProvider;
+ }
+
+ @Override
+ public boolean allowAuthoritative(final Path p) {
+ return S3AFileSystem.this.allowAuthoritative(p);
+ }
+ }
+
/**
* Low-level call to get at the object metadata.
* @param path path to the object
@@ -4216,7 +4282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// Assuming the path to be a directory
// do a bulk operation.
RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
- getListFilesAssumingDir(path,
+ listing.getListFilesAssumingDir(path,
recursive,
acceptor,
collectTombstones,
@@ -4243,89 +4309,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
/**
- * List files under a path assuming the path to be a directory.
- * @param path input path.
- * @param recursive recursive listing?
- * @param acceptor file status filter
- * @param collectTombstones should tombstones be collected from S3Guard?
- * @param forceNonAuthoritativeMS forces metadata store to act like non
- * authoritative. This is useful when
- * listFiles output is used by import tool.
- * @return an iterator over listing.
- * @throws IOException any exception.
- */
- private RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
- Path path,
- boolean recursive, Listing.FileStatusAcceptor acceptor,
- boolean collectTombstones,
- boolean forceNonAuthoritativeMS) throws IOException {
-
- String key = maybeAddTrailingSlash(pathToKey(path));
- String delimiter = recursive ? null : "/";
- LOG.debug("Requesting all entries under {} with delimiter '{}'",
- key, delimiter);
- final RemoteIterator<S3AFileStatus> cachedFilesIterator;
- final Set<Path> tombstones;
- boolean allowAuthoritative = allowAuthoritative(path);
- if (recursive) {
- final PathMetadata pm = metadataStore.get(path, true);
- if (pm != null) {
- if (pm.isDeleted()) {
- OffsetDateTime deletedAt = OffsetDateTime
- .ofInstant(Instant.ofEpochMilli(
- pm.getFileStatus().getModificationTime()),
- ZoneOffset.UTC);
- throw new FileNotFoundException("Path " + path + " is recorded as " +
- "deleted by S3Guard at " + deletedAt);
- }
- }
- MetadataStoreListFilesIterator metadataStoreListFilesIterator =
- new MetadataStoreListFilesIterator(metadataStore, pm,
- allowAuthoritative);
- tombstones = metadataStoreListFilesIterator.listTombstones();
- // if all of the below is true
- // - authoritative access is allowed for this metadatastore
- // for this directory,
- // - all the directory listings are authoritative on the client
- // - the caller does not force non-authoritative access
- // return the listing without any further s3 access
- if (!forceNonAuthoritativeMS &&
- allowAuthoritative &&
- metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
- S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
- metadataStoreListFilesIterator, tombstones);
- cachedFilesIterator = listing.createProvidedFileStatusIterator(
- statuses, ACCEPT_ALL, acceptor);
- return listing.createLocatedFileStatusIterator(cachedFilesIterator);
- }
- cachedFilesIterator = metadataStoreListFilesIterator;
- } else {
- DirListingMetadata meta =
- S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
- allowAuthoritative);
- if (meta != null) {
- tombstones = meta.listTombstones();
- } else {
- tombstones = null;
- }
- cachedFilesIterator = listing.createProvidedFileStatusIterator(
- S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
- if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
- // metadata listing is authoritative, so return it directly
- return listing.createLocatedFileStatusIterator(cachedFilesIterator);
- }
- }
- return listing.createTombstoneReconcilingIterator(
- listing.createLocatedFileStatusIterator(
- listing.createFileStatusListingIterator(path,
- createListObjectsRequest(key, delimiter),
- ACCEPT_ALL,
- acceptor,
- cachedFilesIterator)),
- collectTombstones ? tombstones : null);
- }
-
- /**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@@ -4363,7 +4346,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// trigger a list call directly.
final RemoteIterator<S3ALocatedFileStatus>
locatedFileStatusIteratorForDir =
- getLocatedFileStatusIteratorForDir(path, filter);
+ listing.getLocatedFileStatusIteratorForDir(path, filter);
// If no listing is present then path might be a file.
if (!locatedFileStatusIteratorForDir.hasNext()) {
@@ -4847,5 +4830,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
public Path makeQualified(final Path path) {
return S3AFileSystem.this.makeQualified(path);
}
+
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java
new file mode 100644
index 0000000..d89cada
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ListingOperationCallbacks.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3ListRequest;
+import org.apache.hadoop.fs.s3a.S3ListResult;
+import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
+
+/**
+ * These are all the callbacks which
+ * {@link org.apache.hadoop.fs.s3a.Listing} operations
+ * need, derived from the actual appropriate S3AFileSystem
+ * methods.
+ */
+public interface ListingOperationCallbacks {
+
+ /**
+ * Initiate a {@code listObjects} operation, incrementing metrics
+ * in the process.
+ *
+ * Retry policy: retry untranslated.
+ * @param request request to initiate
+ * @return the results
+ * @throws IOException if the retry invocation raises one (it shouldn't).
+ */
+ @Retries.RetryRaw
+ S3ListResult listObjects(
+ S3ListRequest request)
+ throws IOException;
+
+ /**
+ * List the next set of objects.
+ * Retry policy: retry untranslated.
+ * @param request last list objects request to continue
+ * @param prevResult last paged result to continue from
+ * @return the next result object
+ * @throws IOException none, just there for retryUntranslated.
+ */
+ @Retries.RetryRaw
+ S3ListResult continueListObjects(
+ S3ListRequest request,
+ S3ListResult prevResult)
+ throws IOException;
+
+ /**
+ * Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
+ * @param status file status
+ * @return a located status with block locations set up from this FS.
+ * @throws IOException IO Problems.
+ */
+ S3ALocatedFileStatus toLocatedFileStatus(
+ S3AFileStatus status)
+ throws IOException;
+ /**
+ * Create a {@code ListObjectsRequest} request against this bucket,
+ * with the maximum keys returned in a query set by
+ * {@link this.getMaxKeys()}.
+ * @param key key for request
+ * @param delimiter any delimiter
+ * @return the request
+ */
+ S3ListRequest createListObjectsRequest(
+ String key,
+ String delimiter);
+
+
+ /**
+ * Return the number of bytes that large input files should be optimally
+ * be split into to minimize I/O time. The given path will be used to
+ * locate the actual filesystem. The full path does not have to exist.
+ * @param path path of file
+ * @return the default block size for the path's filesystem
+ */
+ long getDefaultBlockSize(Path path);
+
+ /**
+ * Get the maximum key count.
+ * @return a value, valid after initialization
+ */
+ int getMaxKeys();
+
+ /**
+ * Get the updated time provider for the current fs instance.
+ * @return implementation of {@link ITtlTimeProvider}
+ */
+ ITtlTimeProvider getUpdatedTtlTimeProvider();
+
+ /**
+ * Is the path for this instance considered authoritative on the client,
+ * that is: will listing/status operations only be handled by the metastore,
+ * with no fallback to S3.
+ * @param p path
+ * @return true iff the path is authoritative on the client.
+ */
+ boolean allowAuthoritative(Path p);
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
index e307c8d..cafa22f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/StoreContext.java
@@ -210,6 +210,10 @@ public class StoreContext {
return useListV1;
}
+ public ContextAccessors getContextAccessors() {
+ return contextAccessors;
+ }
+
/**
* Convert a key to a fully qualified path.
* @param key input key
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java
index 536481a..b5f29cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DumpS3GuardDynamoTable.java
@@ -348,8 +348,8 @@ public class DumpS3GuardDynamoTable extends AbstractS3GuardDynamoDBDiagnostic {
final CsvFile csv) throws IOException {
S3AFileSystem fs = getFilesystem();
Path rootPath = fs.qualify(new Path("/"));
- Listing listing = new Listing(fs);
- S3ListRequest request = fs.createListObjectsRequest("", null);
+ Listing listing = fs.getListing();
+ S3ListRequest request = listing.createListObjectsRequest("", null);
long count = 0;
RemoteIterator<S3AFileStatus> st =
listing.createFileStatusListingIterator(rootPath, request,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
index 1a533bf..3472674 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
@@ -68,7 +68,7 @@ public class TestListing extends AbstractS3AMockTest {
Path[] allFiles = {parent, liveChild, deletedChild};
Path[] liveFiles = {parent, liveChild};
- Listing listing = new Listing(fs);
+ Listing listing = fs.getListing();
Collection<FileStatus> statuses = new ArrayList<>();
statuses.add(blankFileStatus(parent));
statuses.add(blankFileStatus(liveChild));
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
index c9d872e..7fa03a1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestPartialDeleteFailures.java
@@ -32,8 +32,11 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
+import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Before;
@@ -42,14 +45,21 @@ import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
+import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
+import org.apache.hadoop.fs.s3a.S3ListRequest;
+import org.apache.hadoop.fs.s3a.S3ListResult;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
@@ -230,6 +240,142 @@ public class TestPartialDeleteFailures {
.build();
}
+ private static class MinimalListingOperationCallbacks
+ implements ListingOperationCallbacks {
+ @Override
+ public S3ListResult listObjects(S3ListRequest request)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public S3ListResult continueListObjects(
+ S3ListRequest request,
+ S3ListResult prevResult)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public S3ALocatedFileStatus toLocatedFileStatus(
+ S3AFileStatus status) throws IOException {
+ return null;
+ }
+
+ @Override
+ public S3ListRequest createListObjectsRequest(
+ String key,
+ String delimiter) {
+ return null;
+ }
+
+ @Override
+ public long getDefaultBlockSize(Path path) {
+ return 0;
+ }
+
+ @Override
+ public int getMaxKeys() {
+ return 0;
+ }
+
+ @Override
+ public ITtlTimeProvider getUpdatedTtlTimeProvider() {
+ return null;
+ }
+
+ @Override
+ public boolean allowAuthoritative(Path p) {
+ return false;
+ }
+ }
+ private static class MinimalOperationCallbacks
+ implements OperationCallbacks {
+ @Override
+ public S3ObjectAttributes createObjectAttributes(
+ Path path,
+ String eTag,
+ String versionId,
+ long len) {
+ return null;
+ }
+
+ @Override
+ public S3ObjectAttributes createObjectAttributes(
+ S3AFileStatus fileStatus) {
+ return null;
+ }
+
+ @Override
+ public S3AReadOpContext createReadContext(
+ FileStatus fileStatus) {
+ return null;
+ }
+
+ @Override
+ public void finishRename(
+ Path sourceRenamed,
+ Path destCreated)
+ throws IOException {
+
+ }
+
+ @Override
+ public void deleteObjectAtPath(
+ Path path,
+ String key,
+ boolean isFile,
+ BulkOperationState operationState)
+ throws IOException {
+
+ }
+
+ @Override
+ public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
+ Path path,
+ S3AFileStatus status,
+ boolean collectTombstones,
+ boolean includeSelf)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public CopyResult copyFile(
+ String srcKey,
+ String destKey,
+ S3ObjectAttributes srcAttributes,
+ S3AReadOpContext readContext)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public DeleteObjectsResult removeKeys(
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+ boolean deleteFakeDir,
+ List<Path> undeletedObjectsOnFailure,
+ BulkOperationState operationState,
+ boolean quiet)
+ throws MultiObjectDeleteException, AmazonClientException,
+ IOException {
+ return null;
+ }
+
+ @Override
+ public boolean allowAuthoritative(Path p) {
+ return false;
+ }
+
+ @Override
+ public RemoteIterator<S3AFileStatus> listObjects(
+ Path path,
+ String key)
+ throws IOException {
+ return null;
+ }
+ }
+
private static class MinimalContextAccessor implements ContextAccessors {
@Override
@@ -333,7 +479,8 @@ public class TestPartialDeleteFailures {
@Override
public void deletePaths(final Collection<Path> paths,
- @Nullable final BulkOperationState operationState) throws IOException {
+ @Nullable final BulkOperationState operationState)
+ throws IOException {
deleted.addAll(paths);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org