You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/09/08 20:57:45 UTC
[13/45] hadoop git commit: HADOOP-13421. Switch to v2 of the S3 List
Objects API in S3A. Contributed by Aaron Fabbri
HADOOP-13421. Switch to v2 of the S3 List Objects API in S3A.
Contributed by Aaron Fabbri
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5bbca804
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5bbca804
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5bbca804
Branch: refs/heads/HDFS-10467
Commit: 5bbca80428ffbe776650652de86a3bba885edb31
Parents: ab8368d
Author: Steve Loughran <st...@apache.org>
Authored: Fri Sep 8 12:07:02 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Fri Sep 8 12:07:02 2017 +0100
----------------------------------------------------------------------
.../src/main/resources/core-default.xml | 9 ++
.../org/apache/hadoop/fs/s3a/Constants.java | 9 ++
.../fs/s3a/InconsistentAmazonS3Client.java | 143 ++++++++++++++++---
.../java/org/apache/hadoop/fs/s3a/Listing.java | 22 +--
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 91 ++++++++----
.../org/apache/hadoop/fs/s3a/S3ListRequest.java | 69 +++++++++
.../org/apache/hadoop/fs/s3a/S3ListResult.java | 97 +++++++++++++
.../src/site/markdown/tools/hadoop-aws/index.md | 9 ++
.../ITestS3AContractGetFileStatusV1List.java | 59 ++++++++
.../fs/s3a/ITestS3GuardListConsistency.java | 22 +--
.../hadoop/fs/s3a/TestS3AGetFileStatus.java | 41 +++---
11 files changed, 492 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 9e2c553..23739b0 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1428,6 +1428,15 @@
<description>The implementation class of the S3A AbstractFileSystem.</description>
</property>
+<property>
+ <name>fs.s3a.list.version</name>
+ <value>2</value>
+ <description>
+ Select which version of the S3 SDK's List Objects API to use. Currently
+ support 2 (default) and 1 (older API).
+ </description>
+</property>
+
<!-- Azure file system properties -->
<property>
<name>fs.wasb.impl</name>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 1a464d0..4e2af3a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -451,4 +451,13 @@ public final class Constants {
public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY =
"fs.s3a.failinject.inconsistency.probability";
+ /**
+ * S3 API level parameters.
+ */
+ @InterfaceStability.Unstable
+ public static final String LIST_VERSION = "fs.s3a.list.version";
+
+ @InterfaceStability.Unstable
+ public static final int DEFAULT_LIST_VERSION = 2;
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index 5e9cb3f..6476f5d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -28,6 +28,8 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
@@ -109,8 +111,10 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
}
}
- /** Map of key to delay -> time it was deleted + object summary (object
- * summary is null for prefixes. */
+ /**
+ * Map of key to delay -> time it was deleted + object summary (object summary
+ * is null for prefixes.
+ */
private Map<String, Delete> delayedDeletes = new HashMap<>();
/** Map of key to delay -> time it was created. */
@@ -196,17 +200,29 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
return super.putObject(putObjectRequest);
}
- /* We should only need to override this version of listObjects() */
+ /* We should only need to override these versions of listObjects() */
@Override
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
throws AmazonClientException, AmazonServiceException {
LOG.debug("prefix {}", listObjectsRequest.getPrefix());
ObjectListing listing = super.listObjects(listObjectsRequest);
- listing = filterListObjects(listObjectsRequest, listing);
+ listing = filterListObjects(listing);
listing = restoreListObjects(listObjectsRequest, listing);
return listing;
}
+ /* We should only need to override these versions of listObjects() */
+ @Override
+ public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request)
+ throws AmazonClientException, AmazonServiceException {
+ LOG.debug("prefix {}", request.getPrefix());
+ ListObjectsV2Result listing = super.listObjectsV2(request);
+ listing = filterListObjectsV2(listing);
+ listing = restoreListObjectsV2(request, listing);
+ return listing;
+ }
+
+
private void addSummaryIfNotPresent(List<S3ObjectSummary> list,
S3ObjectSummary item) {
// Behavior of S3ObjectSummary
@@ -282,21 +298,58 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
// recursive list has no delimiter, returns everything that matches a
// prefix.
boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
+ String prefix = request.getPrefix();
+
+ restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix);
+ return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+ }
+
+ /**
+ * V2 list API variant of
+ * {@link #restoreListObjects(ListObjectsRequest, ObjectListing)}.
+ * @param request original v2 list request
+ * @param result raw s3 result
+ */
+ private ListObjectsV2Result restoreListObjectsV2(ListObjectsV2Request request,
+ ListObjectsV2Result result) {
+ List<S3ObjectSummary> outputList = result.getObjectSummaries();
+ List<String> outputPrefixes = result.getCommonPrefixes();
+ // recursive list has no delimiter, returns everything that matches a
+ // prefix.
+ boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
+ String prefix = request.getPrefix();
+
+ restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix);
+ return new CustomListObjectsV2Result(result, outputList, outputPrefixes);
+ }
+
+
+ /**
+ * Main logic for
+ * {@link #restoreListObjects(ListObjectsRequest, ObjectListing)} and
+ * the v2 variant above.
+ * @param summaries object summary list to modify.
+ * @param prefixes prefix list to modify
+ * @param recursive true if recursive list request
+ * @param prefix prefix for original list request
+ */
+ private void restoreDeleted(List<S3ObjectSummary> summaries,
+ List<String> prefixes, boolean recursive, String prefix) {
// Go through all deleted keys
for (String key : new HashSet<>(delayedDeletes.keySet())) {
Delete delete = delayedDeletes.get(key);
if (isKeyDelayed(delete.time(), key)) {
- if (isDescendant(request.getPrefix(), key, recursiveObjectList)) {
+ if (isDescendant(prefix, key, recursive)) {
if (delete.summary() != null) {
- addSummaryIfNotPresent(outputList, delete.summary());
+ addSummaryIfNotPresent(summaries, delete.summary());
}
}
// Non-recursive list has delimiter: will return rolled-up prefixes for
// all keys that are not direct children
- if (!recursiveObjectList) {
- if (isDescendant(request.getPrefix(), key, true)) {
- addPrefixIfNotPresent(outputPrefixes, request.getPrefix(), key);
+ if (!recursive) {
+ if (isDescendant(prefix, key, true)) {
+ addPrefixIfNotPresent(prefixes, prefix, key);
}
}
} else {
@@ -304,31 +357,52 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
delayedDeletes.remove(key);
}
}
+ }
+
+ private ObjectListing filterListObjects(ObjectListing rawListing) {
+
+ // Filter object listing
+ List<S3ObjectSummary> outputList = filterSummaries(
+ rawListing.getObjectSummaries());
+
+ // Filter prefixes (directories)
+ List<String> outputPrefixes = filterPrefixes(
+ rawListing.getCommonPrefixes());
return new CustomObjectListing(rawListing, outputList, outputPrefixes);
}
- private ObjectListing filterListObjects(ListObjectsRequest request,
- ObjectListing rawListing) {
-
+ private ListObjectsV2Result filterListObjectsV2(ListObjectsV2Result raw) {
// Filter object listing
+ List<S3ObjectSummary> outputList = filterSummaries(
+ raw.getObjectSummaries());
+
+ // Filter prefixes (directories)
+ List<String> outputPrefixes = filterPrefixes(raw.getCommonPrefixes());
+
+ return new CustomListObjectsV2Result(raw, outputList, outputPrefixes);
+ }
+
+ private List<S3ObjectSummary> filterSummaries(
+ List<S3ObjectSummary> summaries) {
List<S3ObjectSummary> outputList = new ArrayList<>();
- for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
+ for (S3ObjectSummary s : summaries) {
String key = s.getKey();
if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
outputList.add(s);
}
}
+ return outputList;
+ }
- // Filter prefixes (directories)
+ private List<String> filterPrefixes(List<String> prefixes) {
List<String> outputPrefixes = new ArrayList<>();
- for (String key : rawListing.getCommonPrefixes()) {
+ for (String key : prefixes) {
if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
outputPrefixes.add(key);
}
}
-
- return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+ return outputPrefixes;
}
private boolean isKeyDelayed(Long enqueueTime, String key) {
@@ -342,7 +416,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
delayedDeletes.remove(key);
LOG.debug("no longer delaying {}", key);
return false;
- } else {
+ } else {
LOG.info("delaying {}", key);
return true;
}
@@ -431,4 +505,37 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
return customPrefixes;
}
}
+
+ private static class CustomListObjectsV2Result extends ListObjectsV2Result {
+
+ private final List<S3ObjectSummary> customListing;
+ private final List<String> customPrefixes;
+
+ CustomListObjectsV2Result(ListObjectsV2Result raw,
+ List<S3ObjectSummary> customListing, List<String> customPrefixes) {
+ super();
+ this.customListing = customListing;
+ this.customPrefixes = customPrefixes;
+
+ this.setBucketName(raw.getBucketName());
+ this.setCommonPrefixes(raw.getCommonPrefixes());
+ this.setDelimiter(raw.getDelimiter());
+ this.setEncodingType(raw.getEncodingType());
+ this.setStartAfter(raw.getStartAfter());
+ this.setMaxKeys(raw.getMaxKeys());
+ this.setContinuationToken(raw.getContinuationToken());
+ this.setPrefix(raw.getPrefix());
+ this.setTruncated(raw.isTruncated());
+ }
+
+ @Override
+ public List<S3ObjectSummary> getObjectSummaries() {
+ return customListing;
+ }
+
+ @Override
+ public List<String> getCommonPrefixes() {
+ return customPrefixes;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 8efa218..d9f059b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
@@ -90,7 +88,7 @@ public class Listing {
*/
FileStatusListingIterator createFileStatusListingIterator(
Path listPath,
- ListObjectsRequest request,
+ S3ListRequest request,
PathFilter filter,
Listing.FileStatusAcceptor acceptor) throws IOException {
return createFileStatusListingIterator(listPath, request, filter, acceptor,
@@ -112,7 +110,7 @@ public class Listing {
*/
FileStatusListingIterator createFileStatusListingIterator(
Path listPath,
- ListObjectsRequest request,
+ S3ListRequest request,
PathFilter filter,
Listing.FileStatusAcceptor acceptor,
RemoteIterator<FileStatus> providedStatus) throws IOException {
@@ -432,7 +430,7 @@ public class Listing {
* @param objects the next object listing
* @return true if this added any entries after filtering
*/
- private boolean buildNextStatusBatch(ObjectListing objects) {
+ private boolean buildNextStatusBatch(S3ListResult objects) {
// counters for debug logs
int added = 0, ignored = 0;
// list to fill in with results. Initial size will be list maximum.
@@ -512,13 +510,16 @@ public class Listing {
*
* Thread safety: none.
*/
- class ObjectListingIterator implements RemoteIterator<ObjectListing> {
+ class ObjectListingIterator implements RemoteIterator<S3ListResult> {
/** The path listed. */
private final Path listPath;
/** The most recent listing results. */
- private ObjectListing objects;
+ private S3ListResult objects;
+
+ /** The most recent listing request. */
+ private S3ListRequest request;
/** Indicator that this is the first listing. */
private boolean firstListing = true;
@@ -542,10 +543,11 @@ public class Listing {
* */
ObjectListingIterator(
Path listPath,
- ListObjectsRequest request) {
+ S3ListRequest request) {
this.listPath = listPath;
this.maxKeys = owner.getMaxKeys();
this.objects = owner.listObjects(request);
+ this.request = request;
}
/**
@@ -569,7 +571,7 @@ public class Listing {
* @throws NoSuchElementException if there is no more data to list.
*/
@Override
- public ObjectListing next() throws IOException {
+ public S3ListResult next() throws IOException {
if (firstListing) {
// on the first listing, don't request more data.
// Instead just clear the firstListing flag so that it future calls
@@ -585,7 +587,7 @@ public class Listing {
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
- objects = owner.continueListObjects(objects);
+ objects = owner.continueListObjects(request, objects);
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (AmazonClientException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c22383a..e76ef0b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -53,8 +53,8 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
-import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
@@ -167,6 +167,7 @@ public class S3AFileSystem extends FileSystem {
private String blockOutputBuffer;
private S3ADataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
+ private boolean useListV1;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
@@ -261,6 +262,13 @@ public class S3AFileSystem extends FileSystem {
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
"s3a-transfer-unbounded"));
+ int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
+ if (listVersion < 1 || listVersion > 2) {
+ LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
+ "version 2", listVersion);
+ }
+ useListV1 = (listVersion == 1);
+
initTransferManager();
initCannedAcls(conf);
@@ -1056,21 +1064,37 @@ public class S3AFileSystem extends FileSystem {
* @param request request to initiate
* @return the results
*/
- protected ObjectListing listObjects(ListObjectsRequest request) {
+ protected S3ListResult listObjects(S3ListRequest request) {
incrementStatistic(OBJECT_LIST_REQUESTS);
incrementReadOperations();
- return s3.listObjects(request);
+ if (useListV1) {
+ Preconditions.checkArgument(request.isV1());
+ return S3ListResult.v1(s3.listObjects(request.getV1()));
+ } else {
+ Preconditions.checkArgument(!request.isV1());
+ return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
+ }
}
/**
* List the next set of objects.
- * @param objects paged result
+ * @param request last list objects request to continue
+ * @param prevResult last paged result to continue from
* @return the next result object
*/
- protected ObjectListing continueListObjects(ObjectListing objects) {
+ protected S3ListResult continueListObjects(S3ListRequest request,
+ S3ListResult prevResult) {
incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
incrementReadOperations();
- return s3.listNextBatchOfObjects(objects);
+ if (useListV1) {
+ Preconditions.checkArgument(request.isV1());
+ return S3ListResult.v1(s3.listNextBatchOfObjects(prevResult.getV1()));
+ } else {
+ Preconditions.checkArgument(!request.isV1());
+ request.getV2().setContinuationToken(prevResult.getV2()
+ .getNextContinuationToken());
+ return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
+ }
}
/**
@@ -1464,9 +1488,9 @@ public class S3AFileSystem extends FileSystem {
} else {
LOG.debug("Getting objects for directory prefix {} to delete", key);
- ListObjectsRequest request = createListObjectsRequest(key, null);
+ S3ListRequest request = createListObjectsRequest(key, null);
- ObjectListing objects = listObjects(request);
+ S3ListResult objects = listObjects(request);
List<DeleteObjectsRequest.KeyVersion> keys =
new ArrayList<>(objects.getObjectSummaries().size());
while (true) {
@@ -1481,7 +1505,7 @@ public class S3AFileSystem extends FileSystem {
}
if (objects.isTruncated()) {
- objects = continueListObjects(objects);
+ objects = continueListObjects(request, objects);
} else {
if (!keys.isEmpty()) {
// TODO: HADOOP-13761 S3Guard: retries
@@ -1589,7 +1613,7 @@ public class S3AFileSystem extends FileSystem {
return S3Guard.dirMetaToStatuses(dirMeta);
}
- ListObjectsRequest request = createListObjectsRequest(key, "/");
+ S3ListRequest request = createListObjectsRequest(key, "/");
LOG.debug("listStatus: doing listObjects for directory {}", key);
Listing.FileStatusListingIterator files =
@@ -1619,16 +1643,38 @@ public class S3AFileSystem extends FileSystem {
* @return the request
*/
@VisibleForTesting
- ListObjectsRequest createListObjectsRequest(String key,
+ S3ListRequest createListObjectsRequest(String key,
String delimiter) {
- ListObjectsRequest request = new ListObjectsRequest();
- request.setBucketName(bucket);
- request.setMaxKeys(maxKeys);
- request.setPrefix(key);
- if (delimiter != null) {
- request.setDelimiter(delimiter);
+ return createListObjectsRequest(key, delimiter, null);
+ }
+
+ private S3ListRequest createListObjectsRequest(String key,
+ String delimiter, Integer overrideMaxKeys) {
+ if (!useListV1) {
+ ListObjectsV2Request request =
+ new ListObjectsV2Request().withBucketName(bucket)
+ .withMaxKeys(maxKeys)
+ .withPrefix(key);
+ if (delimiter != null) {
+ request.setDelimiter(delimiter);
+ }
+ if (overrideMaxKeys != null) {
+ request.setMaxKeys(overrideMaxKeys);
+ }
+ return S3ListRequest.v2(request);
+ } else {
+ ListObjectsRequest request = new ListObjectsRequest();
+ request.setBucketName(bucket);
+ request.setMaxKeys(maxKeys);
+ request.setPrefix(key);
+ if (delimiter != null) {
+ request.setDelimiter(delimiter);
+ }
+ if (overrideMaxKeys != null) {
+ request.setMaxKeys(overrideMaxKeys);
+ }
+ return S3ListRequest.v1(request);
}
- return request;
}
/**
@@ -1885,13 +1931,9 @@ public class S3AFileSystem extends FileSystem {
try {
key = maybeAddTrailingSlash(key);
- ListObjectsRequest request = new ListObjectsRequest();
- request.setBucketName(bucket);
- request.setPrefix(key);
- request.setDelimiter("/");
- request.setMaxKeys(1);
+ S3ListRequest request = createListObjectsRequest(key, "/", 1);
- ObjectListing objects = listObjects(request);
+ S3ListResult objects = listObjects(request);
Collection<String> prefixes = objects.getCommonPrefixes();
Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
@@ -2441,6 +2483,7 @@ public class S3AFileSystem extends FileSystem {
}
sb.append(", metastore=").append(metadataStore);
sb.append(", authoritative=").append(allowAuthoritative);
+ sb.append(", useListV1=").append(useListV1);
sb.append(", boundedExecutor=").append(boundedThreadPool);
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", statistics {")
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
new file mode 100644
index 0000000..6b3bd46
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+
+/**
+ * API version-independent container for S3 List requests.
+ */
+public class S3ListRequest {
+ private ListObjectsRequest v1Request;
+ private ListObjectsV2Request v2Request;
+
+ protected S3ListRequest(ListObjectsRequest v1, ListObjectsV2Request v2) {
+ v1Request = v1;
+ v2Request = v2;
+ }
+
+ /**
+ * Restricted constructors to ensure v1 or v2, not both.
+ * @param request v1 request
+ * @return new list request container
+ */
+ public static S3ListRequest v1(ListObjectsRequest request) {
+ return new S3ListRequest(request, null);
+ }
+
+ /**
+ * Restricted constructors to ensure v1 or v2, not both.
+ * @param request v2 request
+ * @return new list request container
+ */
+ public static S3ListRequest v2(ListObjectsV2Request request) {
+ return new S3ListRequest(null, request);
+ }
+
+ /**
+ * Is this a v1 API request or v2?
+ * @return true if v1, false if v2
+ */
+ public boolean isV1() {
+ return v1Request != null;
+ }
+
+ public ListObjectsRequest getV1() {
+ return v1Request;
+ }
+
+ public ListObjectsV2Request getV2() {
+ return v2Request;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
new file mode 100644
index 0000000..e8aff32
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import java.util.List;
+
+/**
+ * API version-independent container for S3 List responses.
+ */
+public class S3ListResult {
+ private ObjectListing v1Result;
+ private ListObjectsV2Result v2Result;
+
+ protected S3ListResult(ObjectListing v1, ListObjectsV2Result v2) {
+ v1Result = v1;
+ v2Result = v2;
+ }
+
+ /**
+ * Restricted constructors to ensure v1 or v2, not both.
+ * @param result v1 result
+ * @return new list result container
+ */
+ public static S3ListResult v1(ObjectListing result) {
+ return new S3ListResult(result, null);
+ }
+
+ /**
+ * Restricted constructors to ensure v1 or v2, not both.
+ * @param result v2 result
+ * @return new list result container
+ */
+ public static S3ListResult v2(ListObjectsV2Result result) {
+ return new S3ListResult(null, result);
+ }
+
+ /**
+ * Is this a v1 API result or v2?
+ * @return true if v1, false if v2
+ */
+ public boolean isV1() {
+ return v1Result != null;
+ }
+
+ public ObjectListing getV1() {
+ return v1Result;
+ }
+
+ public ListObjectsV2Result getV2() {
+ return v2Result;
+ }
+
+ public List<S3ObjectSummary> getObjectSummaries() {
+ if (isV1()) {
+ return v1Result.getObjectSummaries();
+ } else {
+ return v2Result.getObjectSummaries();
+ }
+ }
+
+ public boolean isTruncated() {
+ if (isV1()) {
+ return v1Result.isTruncated();
+ } else {
+ return v2Result.isTruncated();
+ }
+ }
+
+ public List<String> getCommonPrefixes() {
+ if (isV1()) {
+ return v1Result.getCommonPrefixes();
+ } else {
+ return v2Result.getCommonPrefixes();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index b8d37c6..ffae1e9 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -895,6 +895,15 @@ from placing its declaration on the command line.
any call to setReadahead() is made to an open stream.</description>
</property>
+ <property>
+ <name>fs.s3a.list.version</name>
+ <value>2</value>
+ <description>
+ Select which version of the S3 SDK's List Objects API to use. Currently
+ support 2 (default) and 1 (older API).
+ </description>
+ </property>
+
### Configuring different S3 buckets
Different S3 buckets can be accessed with different S3A client configurations.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java
new file mode 100644
index 0000000..5275336
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+
+import static org.apache.hadoop.fs.s3a.Constants.LIST_VERSION;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
+
+/**
+ * S3A contract tests for getFileStatus, using the v1 List Objects API.
+ */
+public class ITestS3AContractGetFileStatusV1List
+ extends AbstractContractGetFileStatusTest {
+
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new S3AContract(conf);
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ getLog().info("FS details {}", getFileSystem());
+ super.teardown();
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ disableFilesystemCaching(conf);
+ conf.setInt(Constants.MAX_PAGING_KEYS, 2);
+ maybeEnableS3Guard(conf);
+
+ // Use v1 List Objects API
+ conf.setInt(LIST_VERSION, 1);
+ return conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 6cff533..da7699e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.fs.s3a;
-import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -488,6 +488,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
@Test
public void testInconsistentS3ClientDeletes() throws Throwable {
+ // Test only implemented for v2 S3 list API
+ Assume.assumeTrue(getConfiguration()
+ .getInt(LIST_VERSION, DEFAULT_LIST_VERSION) == 2);
+
S3AFileSystem fs = getFileSystem();
Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
for (int i = 0; i < 3; i++) {
@@ -502,17 +506,17 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
AmazonS3 client = fs.getAmazonS3Client();
String key = fs.pathToKey(root) + "/";
- ObjectListing preDeleteDelimited = client.listObjects(
- fs.createListObjectsRequest(key, "/"));
- ObjectListing preDeleteUndelimited = client.listObjects(
- fs.createListObjectsRequest(key, null));
+ ListObjectsV2Result preDeleteDelimited = client.listObjectsV2(
+ fs.createListObjectsRequest(key, "/").getV2());
+ ListObjectsV2Result preDeleteUndelimited = client.listObjectsV2(
+ fs.createListObjectsRequest(key, null).getV2());
fs.delete(root, true);
- ObjectListing postDeleteDelimited = client.listObjects(
- fs.createListObjectsRequest(key, "/"));
- ObjectListing postDeleteUndelimited = client.listObjects(
- fs.createListObjectsRequest(key, null));
+ ListObjectsV2Result postDeleteDelimited = client.listObjectsV2(
+ fs.createListObjectsRequest(key, "/").getV2());
+ ListObjectsV2Result postDeleteUndelimited = client.listObjectsV2(
+ fs.createListObjectsRequest(key, null).getV2());
assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
"in a non-recursive listing",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
index 58e4d30..586264d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
@@ -25,9 +25,12 @@ import static org.mockito.Mockito.*;
import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.Date;
+import java.util.List;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3ObjectSummary;
@@ -93,12 +96,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/"))
)).thenThrow(NOT_FOUND);
- ObjectListing objects = mock(ObjectListing.class);
- when(objects.getCommonPrefixes()).thenReturn(
- Collections.singletonList("dir/"));
- when(objects.getObjectSummaries()).thenReturn(
- Collections.<S3ObjectSummary>emptyList());
- when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+ setupListMocks(Collections.singletonList("dir/"), Collections.emptyList());
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
@@ -118,12 +116,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/")
))).thenThrow(NOT_FOUND);
- ObjectListing objects = mock(ObjectListing.class);
- when(objects.getCommonPrefixes()).thenReturn(
- Collections.<String>emptyList());
- when(objects.getObjectSummaries()).thenReturn(
- Collections.<S3ObjectSummary>emptyList());
- when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+ setupListMocks(Collections.emptyList(), Collections.emptyList());
FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath());
@@ -140,16 +133,28 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/")
))).thenThrow(NOT_FOUND);
- ObjectListing objects = mock(ObjectListing.class);
- when(objects.getCommonPrefixes()).thenReturn(
- Collections.<String>emptyList());
- when(objects.getObjectSummaries()).thenReturn(
- Collections.<S3ObjectSummary>emptyList());
- when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+ setupListMocks(Collections.emptyList(), Collections.emptyList());
exception.expect(FileNotFoundException.class);
fs.getFileStatus(path);
}
+ private void setupListMocks(List<String> prefixes,
+ List<S3ObjectSummary> summaries) {
+
+ // V1 list API mock
+ ObjectListing objects = mock(ObjectListing.class);
+ when(objects.getCommonPrefixes()).thenReturn(prefixes);
+ when(objects.getObjectSummaries()).thenReturn(summaries);
+ when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+
+ // V2 list API mock
+ ListObjectsV2Result v2Result = mock(ListObjectsV2Result.class);
+ when(v2Result.getCommonPrefixes()).thenReturn(prefixes);
+ when(v2Result.getObjectSummaries()).thenReturn(summaries);
+ when(s3.listObjectsV2(any(ListObjectsV2Request.class)))
+ .thenReturn(v2Result);
+ }
+
private Matcher<GetObjectMetadataRequest> correctGetMetadataRequest(
String bucket, String key) {
return new BaseMatcher<GetObjectMetadataRequest>() {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org