You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mt...@apache.org on 2022/03/11 07:36:26 UTC
[hadoop] branch trunk updated: HADOOP-18112: Implement paging during multi object delete. (#4045)
This is an automated email from the ASF dual-hosted git repository.
mthakur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 672e380 HADOOP-18112: Implement paging during multi object delete. (#4045)
672e380 is described below
commit 672e380c4f6ffcb0a6fee6d8263166e16b4323c2
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Fri Mar 11 13:05:45 2022 +0530
HADOOP-18112: Implement paging during multi object delete. (#4045)
Multi object delete of size more than 1000 is not supported by S3 and
fails with MalformedXML error. So implementing paging of requests to
reduce the number of keys in a single request. Page size can be configured
using "fs.s3a.bulk.delete.page.size"
Contributed By: Mukund Thakur
---
.../main/java/org/apache/hadoop/util/Lists.java | 24 +++++
.../java/org/apache/hadoop/util/TestLists.java | 44 +++++++++
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 78 ++++++---------
.../apache/hadoop/fs/s3a/api/RequestFactory.java | 5 +-
.../apache/hadoop/fs/s3a/impl/DeleteOperation.java | 45 ++-------
.../hadoop/fs/s3a/impl/OperationCallbacks.java | 12 +--
.../apache/hadoop/fs/s3a/impl/RenameOperation.java | 8 +-
.../hadoop/fs/s3a/impl/RequestFactoryImpl.java | 5 +-
.../org/apache/hadoop/fs/s3a/tools/MarkerTool.java | 2 +-
.../hadoop/fs/s3a/tools/MarkerToolOperations.java | 9 +-
.../fs/s3a/tools/MarkerToolOperationsImpl.java | 10 +-
.../hadoop/fs/s3a/ITestS3AFailureHandling.java | 38 +++++++-
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 107 ++++++++++++++++++++-
.../fs/s3a/impl/ITestPartialRenamesDeletes.java | 105 --------------------
.../hadoop/fs/s3a/impl/TestRequestFactory.java | 2 +-
.../fs/s3a/scale/ITestS3ADeleteManyFiles.java | 2 +-
.../fs/s3a/test/MinimalOperationCallbacks.java | 9 +-
17 files changed, 273 insertions(+), 232 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java
index b6d74ee..5d9cc05 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java
@@ -232,4 +232,28 @@ public final class Lists {
return addAll(addTo, elementsToAdd.iterator());
}
+ /**
+ * Returns consecutive sub-lists of a list, each of the same size
+ * (the final list may be smaller).
+ * @param originalList original big list.
+ * @param pageSize desired size of each sublist ( last one
+ * may be smaller)
+ * @return a list of sub lists.
+ */
+ public static <T> List<List<T>> partition(List<T> originalList, int pageSize) {
+
+ Preconditions.checkArgument(originalList != null && originalList.size() > 0,
+ "Invalid original list");
+ Preconditions.checkArgument(pageSize > 0, "Page size should " +
+ "be greater than 0 for performing partition");
+
+ List<List<T>> result = new ArrayList<>();
+ int i=0;
+ while (i < originalList.size()) {
+ result.add(originalList.subList(i,
+ Math.min(i + pageSize, originalList.size())));
+ i = i + pageSize;
+ }
+ return result;
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java
index 537e378..53241da 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.util;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -80,6 +82,48 @@ public class TestLists {
}
@Test
+ public void testListsPartition() {
+ List<String> list = new ArrayList<>();
+ list.add("a");
+ list.add("b");
+ list.add("c");
+ list.add("d");
+ list.add("e");
+ List<List<String>> res = Lists.
+ partition(list, 2);
+ Assertions.assertThat(res)
+ .describedAs("Number of partitions post partition")
+ .hasSize(3);
+ Assertions.assertThat(res.get(0))
+ .describedAs("Number of elements in first partition")
+ .hasSize(2);
+ Assertions.assertThat(res.get(2))
+ .describedAs("Number of elements in last partition")
+ .hasSize(1);
+
+ List<List<String>> res2 = Lists.
+ partition(list, 1);
+ Assertions.assertThat(res2)
+ .describedAs("Number of partitions post partition")
+ .hasSize(5);
+ Assertions.assertThat(res2.get(0))
+ .describedAs("Number of elements in first partition")
+ .hasSize(1);
+ Assertions.assertThat(res2.get(4))
+ .describedAs("Number of elements in last partition")
+ .hasSize(1);
+
+ List<List<String>> res3 = Lists.
+ partition(list, 6);
+ Assertions.assertThat(res3)
+ .describedAs("Number of partitions post partition")
+ .hasSize(1);
+ Assertions.assertThat(res3.get(0))
+ .describedAs("Number of elements in first partition")
+ .hasSize(5);
+ }
+
+ @Test
public void testArrayListWithSize() {
List<String> list = Lists.newArrayListWithCapacity(3);
list.add("record1");
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c8a73d9..86da70e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -135,6 +135,7 @@ import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
+import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
@@ -225,6 +226,7 @@ import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDura
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;
/**
@@ -550,6 +552,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
+ checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
+ "page size out of range: %s", pageSize);
listing = new Listing(listingOperationCallbacks, createStoreContext());
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
@@ -2026,14 +2030,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
@Override
- public DeleteObjectsResult removeKeys(
- final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- final boolean deleteFakeDir,
- final boolean quiet)
+ public void removeKeys(
+ final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+ final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException, IOException {
auditSpan.activate();
- return S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir,
- quiet);
+ S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir);
}
@Override
@@ -2818,10 +2820,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs
- * @param quiet should a bulk query be quiet, or should its result list
- * all deleted keys?
- * @return the deletion result if a multi object delete was invoked
- * and it returned without a failure.
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
@@ -2831,10 +2829,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @throws AmazonClientException other amazon-layer failure.
*/
@Retries.RetryRaw
- private DeleteObjectsResult removeKeysS3(
- List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- boolean deleteFakeDir,
- boolean quiet)
+ private void removeKeysS3(
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+ boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
if (LOG.isDebugEnabled()) {
@@ -2847,16 +2844,28 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
if (keysToDelete.isEmpty()) {
// exit fast if there are no keys to delete
- return null;
+ return;
}
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
blockRootDelete(keyVersion.getKey());
}
- DeleteObjectsResult result = null;
try {
if (enableMultiObjectsDelete) {
- result = deleteObjects(
- getRequestFactory().newBulkDeleteRequest(keysToDelete, quiet));
+ if (keysToDelete.size() <= pageSize) {
+ deleteObjects(getRequestFactory()
+ .newBulkDeleteRequest(keysToDelete));
+ } else {
+ // Multi object deletion of more than 1000 keys is not supported
+ // by s3. So we are paging the keys by page size.
+ LOG.debug("Partitioning the keys to delete as it is more than " +
+ "page size. Number of keys: {}, Page size: {}",
+ keysToDelete.size(), pageSize);
+ for (List<DeleteObjectsRequest.KeyVersion> batchOfKeysToDelete :
+ Lists.partition(keysToDelete, pageSize)) {
+ deleteObjects(getRequestFactory()
+ .newBulkDeleteRequest(batchOfKeysToDelete));
+ }
+ }
} else {
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
deleteObject(keyVersion.getKey());
@@ -2872,7 +2881,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
throw ex;
}
noteDeleted(keysToDelete.size(), deleteFakeDir);
- return result;
}
/**
@@ -2889,7 +2897,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
/**
- * Invoke {@link #removeKeysS3(List, boolean, boolean)}.
+ * Invoke {@link #removeKeysS3(List, boolean)}.
* If a {@code MultiObjectDeleteException} is raised, the
* relevant statistics are updated.
*
@@ -2910,35 +2918,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
- removeKeys(keysToDelete, deleteFakeDir,
- true);
- }
-
- /**
- * Invoke {@link #removeKeysS3(List, boolean, boolean)}.
- * @param keysToDelete collection of keys to delete on the s3-backend.
- * if empty, no request is made of the object store.
- * @param deleteFakeDir indicates whether this is for deleting fake dirs.
- * @param quiet should a bulk query be quiet, or should its result list
- * all deleted keys
- * @return the deletion result if a multi object delete was invoked
- * and it returned without a failure, else null.
- * @throws InvalidRequestException if the request was rejected due to
- * a mistaken attempt to delete the root directory.
- * @throws MultiObjectDeleteException one or more of the keys could not
- * be deleted in a multiple object delete operation.
- * @throws AmazonClientException amazon-layer failure.
- * @throws IOException other IO Exception.
- */
- @Retries.RetryRaw
- private DeleteObjectsResult removeKeys(
- final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- final boolean deleteFakeDir,
- final boolean quiet)
- throws MultiObjectDeleteException, AmazonClientException, IOException {
try (DurationInfo ignored = new DurationInfo(LOG, false,
- "Deleting %d keys", keysToDelete.size())) {
- return removeKeysS3(keysToDelete, deleteFakeDir, quiet);
+ "Deleting %d keys", keysToDelete.size())) {
+ removeKeysS3(keysToDelete, deleteFakeDir);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
index ee57286..97a15d9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java
@@ -291,12 +291,9 @@ public interface RequestFactory {
/**
* Bulk delete request.
* @param keysToDelete list of keys to delete.
- * @param quiet should a bulk query be quiet, or should its result list
- * all deleted keys?
* @return the request
*/
DeleteObjectsRequest newBulkDeleteRequest(
- List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- boolean quiet);
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete);
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
index 3d2ab22..a45bfe4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java
@@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
@@ -365,8 +364,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
callableWithinAuditSpan(
getAuditSpan(), () -> {
asyncDeleteAction(
- keyList,
- LOG.isDebugEnabled());
+ keyList);
return null;
}));
}
@@ -376,20 +374,16 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
* the keys from S3 and paths from S3Guard.
*
* @param keyList keys to delete.
- * @param auditDeletedKeys should the results be audited and undeleted
* entries logged?
* @throws IOException failure
*/
@Retries.RetryTranslated
private void asyncDeleteAction(
- final List<DeleteEntry> keyList,
- final boolean auditDeletedKeys)
+ final List<DeleteEntry> keyList)
throws IOException {
- List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
try (DurationInfo ignored =
new DurationInfo(LOG, false,
"Delete page of %d keys", keyList.size())) {
- DeleteObjectsResult result;
if (!keyList.isEmpty()) {
// first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
@@ -397,15 +391,12 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} file objects", files.size());
- result = Invoker.once("Remove S3 Files",
+ Invoker.once("Remove S3 Files",
status.getPath().toString(),
() -> callbacks.removeKeys(
files,
- false,
- !auditDeletedKeys));
- if (result != null) {
- deletedObjects.addAll(result.getDeletedObjects());
- }
+ false
+ ));
// now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker)
@@ -413,32 +404,12 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
.collect(Collectors.toList());
LOG.debug("Deleting of {} directory markers", dirs.size());
// This is invoked with deleteFakeDir.
- result = Invoker.once("Remove S3 Dir Markers",
+ Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
- true,
- !auditDeletedKeys));
- if (result != null) {
- deletedObjects.addAll(result.getDeletedObjects());
- }
- }
- if (auditDeletedKeys) {
- // audit the deleted keys
- if (deletedObjects.size() != keyList.size()) {
- // size mismatch
- LOG.warn("Size mismatch in deletion operation. "
- + "Expected count of deleted files: {}; "
- + "actual: {}",
- keyList.size(), deletedObjects.size());
- // strip out the deleted keys
- for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
- keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
- }
- for (DeleteEntry kv : keyList) {
- LOG.debug("{}", kv.getKey());
- }
- }
+ true
+ ));
}
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
index a72dc7e..ecfe2c0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java
@@ -24,7 +24,6 @@ import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;
@@ -138,10 +137,6 @@ public interface OperationCallbacks {
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs.
- * @param quiet should a bulk query be quiet, or should its result list
- * all deleted keys
- * @return the deletion result if a multi object delete was invoked
- * and it returned without a failure, else null.
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
@@ -150,10 +145,9 @@ public interface OperationCallbacks {
* @throws IOException other IO Exception.
*/
@Retries.RetryRaw
- DeleteObjectsResult removeKeys(
- List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- boolean deleteFakeDir,
- boolean quiet)
+ void removeKeys(
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+ boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
index c1700ef..bc9ad66 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java
@@ -49,6 +49,7 @@ import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinA
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.RENAME_PARALLEL_LIMIT;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
/**
* A parallelized rename operation.
@@ -155,6 +156,9 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
this.destKey = destKey;
this.destStatus = destStatus;
this.callbacks = callbacks;
+ checkArgument(pageSize > 0
+ && pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
+ "page size out of range: %s", pageSize);
this.pageSize = pageSize;
}
@@ -586,8 +590,8 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
sourcePath.toString(), () ->
callbacks.removeKeys(
keys,
- false,
- true));
+ false
+ ));
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
index 04cff49..fa58323 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
@@ -575,12 +575,11 @@ public class RequestFactoryImpl implements RequestFactory {
@Override
public DeleteObjectsRequest newBulkDeleteRequest(
- List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- boolean quiet) {
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
return prepareRequest(
new DeleteObjectsRequest(bucket)
.withKeys(keysToDelete)
- .withQuiet(quiet));
+ .withQuiet(true));
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java
index bd09ca6..230f077 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java
@@ -817,7 +817,7 @@ public final class MarkerTool extends S3GuardTool {
end);
once("Remove S3 Keys",
tracker.getBasePath().toString(), () ->
- operations.removeKeys(page, true, false));
+ operations.removeKeys(page, true));
summary.deleteRequests++;
// and move to the start of the next page
start = end;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java
index 7d7627d..a701f86 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java
@@ -23,7 +23,6 @@ import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.fs.InvalidRequestException;
@@ -58,10 +57,7 @@ public interface MarkerToolOperations {
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs.
- * @param quiet should a bulk query be quiet, or should its result list
* all deleted keys
- * @return the deletion result if a multi object delete was invoked
- * and it returned without a failure, else null.
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
@@ -70,10 +66,9 @@ public interface MarkerToolOperations {
* @throws IOException other IO Exception.
*/
@Retries.RetryMixed
- DeleteObjectsResult removeKeys(
+ void removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- boolean deleteFakeDir,
- boolean quiet)
+ boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java
index 7ccbc41..ccf80e1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.fs.Path;
@@ -55,13 +54,12 @@ public class MarkerToolOperationsImpl implements MarkerToolOperations {
}
@Override
- public DeleteObjectsResult removeKeys(
+ public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- final boolean deleteFakeDir,
- final boolean quiet)
+ final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException, IOException {
- return operationCallbacks.removeKeys(keysToDelete, deleteFakeDir,
- quiet);
+ operationCallbacks.removeKeys(keysToDelete, deleteFakeDir
+ );
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index c34ba22..c0f6a4b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.store.audit.AuditSpan;
@@ -37,9 +40,11 @@ import java.util.List;
import java.nio.file.AccessDeniedException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.failIf;
-import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.*;
import static org.apache.hadoop.test.LambdaTestUtils.*;
+import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator;
+import static org.apache.hadoop.util.functional.RemoteIterators.toList;
/**
* ITest for failure handling, primarily multipart deletion.
@@ -72,6 +77,37 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
removeKeys(getFileSystem(), "ITestS3AFailureHandling/missingFile");
}
+ /**
+ * See HADOOP-18112.
+ */
+ @Test
+ public void testMultiObjectDeleteLargeNumKeys() throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path("largeDir");
+ mkdirs(path);
+ createFiles(fs, path, 1, 1005, 0);
+ RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
+ fs.listFiles(path, false);
+ List<String> keys = toList(mappingRemoteIterator(locatedFileStatusRemoteIterator,
+ locatedFileStatus -> fs.pathToKey(locatedFileStatus.getPath())));
+ // After implementation of paging during multi object deletion,
+ // no exception is encountered.
+ Long bulkDeleteReqBefore = getNumberOfBulkDeleteRequestsMadeTillNow(fs);
+ try (AuditSpan span = span()) {
+ fs.removeKeys(buildDeleteRequest(keys.toArray(new String[0])), false);
+ }
+ Long bulkDeleteReqAfter = getNumberOfBulkDeleteRequestsMadeTillNow(fs);
+ // number of delete requests is 5 as we have default page size of 250.
+ Assertions.assertThat(bulkDeleteReqAfter - bulkDeleteReqBefore)
+ .describedAs("Number of batched bulk delete requests")
+ .isEqualTo(5);
+ }
+
+ private Long getNumberOfBulkDeleteRequestsMadeTillNow(S3AFileSystem fs) {
+ return fs.getIOStatistics().counters()
+ .get(StoreStatisticNames.OBJECT_BULK_DELETE_REQUEST);
+ }
+
private void removeKeys(S3AFileSystem fileSystem, String... keys)
throws IOException {
try (AuditSpan span = span()) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 55ddba9..d965e6e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -52,7 +52,11 @@ import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
@@ -70,6 +74,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -78,6 +83,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
+import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
+import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.util.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
@@ -95,8 +104,23 @@ import static org.junit.Assert.*;
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class S3ATestUtils {
+
private static final Logger LOG = LoggerFactory.getLogger(
- S3ATestUtils.class);
+ S3ATestUtils.class);
+
+ /** Many threads for scale performance: {@value}. */
+ public static final int EXECUTOR_THREAD_COUNT = 64;
+ /**
+ * For submitting work.
+ */
+ private static final ListeningExecutorService EXECUTOR =
+ MoreExecutors.listeningDecorator(
+ BlockingThreadPoolExecutorService.newInstance(
+ EXECUTOR_THREAD_COUNT,
+ EXECUTOR_THREAD_COUNT * 2,
+ 30, TimeUnit.SECONDS,
+ "test-operations"));
+
/**
* Value to set a system property to (in maven) to declare that
@@ -822,6 +846,87 @@ public final class S3ATestUtils {
}
/**
+ * Write the text to a file asynchronously. Logs the operation duration.
+ * @param fs filesystem
+ * @param path path
+ * @return future to the patch created.
+ */
+ private static CompletableFuture<Path> put(FileSystem fs,
+ Path path, String text) {
+ return submit(EXECUTOR, () -> {
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, false, "Creating %s", path)) {
+ createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
+ return path;
+ }
+ });
+ }
+
+ /**
+ * Build a set of files in a directory tree.
+ * @param fs filesystem
+ * @param destDir destination
+ * @param depth file depth
+ * @param fileCount number of files to create.
+ * @param dirCount number of dirs to create at each level
+ * @return the list of files created.
+ */
+ public static List<Path> createFiles(final FileSystem fs,
+ final Path destDir,
+ final int depth,
+ final int fileCount,
+ final int dirCount) throws IOException {
+ return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
+ new ArrayList<>(fileCount),
+ new ArrayList<>(dirCount));
+ }
+
+ /**
+ * Build a set of files in a directory tree.
+ * @param fs filesystem
+ * @param destDir destination
+ * @param depth file depth
+ * @param fileCount number of files to create.
+ * @param dirCount number of dirs to create at each level
+ * @param paths [out] list of file paths created
+ * @param dirs [out] list of directory paths created.
+ * @return the list of files created.
+ */
+ public static List<Path> createDirsAndFiles(final FileSystem fs,
+ final Path destDir,
+ final int depth,
+ final int fileCount,
+ final int dirCount,
+ final List<Path> paths,
+ final List<Path> dirs) throws IOException {
+ buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
+ List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ + dirs.size());
+
+ // create directories. With dir marker retention, that adds more entries
+ // to cause deletion issues
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
+ for (Path path : dirs) {
+ futures.add(submit(EXECUTOR, () ->{
+ fs.mkdirs(path);
+ return path;
+ }));
+ }
+ waitForCompletion(futures);
+ }
+
+ try (DurationInfo ignore =
+ new DurationInfo(LOG, "Creating %d files", paths.size())) {
+ for (Path path : paths) {
+ futures.add(put(fs, path, path.getName()));
+ }
+ waitForCompletion(futures);
+ return paths;
+ }
+ }
+
+ /**
* Helper class to do diffs of metrics.
*/
public static final class MetricDiff {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
index 068b7b2..378f4a7 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java
@@ -26,14 +26,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
-import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
-import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,13 +37,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.DurationInfo;
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
@@ -69,13 +62,10 @@ import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.bindRolePolicyStatemen
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden;
import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_BINDING;
-import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
-import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
-import static org.apache.hadoop.test.GenericTestUtils.buildPaths;
import static org.apache.hadoop.test.LambdaTestUtils.eval;
/**
@@ -112,20 +102,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
private static final Statement STATEMENT_ALL_BUCKET_READ_ACCESS
= statement(true, S3_ALL_BUCKETS, S3_BUCKET_READ_OPERATIONS);
- /** Many threads for scale performance: {@value}. */
- public static final int EXECUTOR_THREAD_COUNT = 64;
-
- /**
- * For submitting work.
- */
- private static final ListeningExecutorService EXECUTOR =
- MoreExecutors.listeningDecorator(
- BlockingThreadPoolExecutorService.newInstance(
- EXECUTOR_THREAD_COUNT,
- EXECUTOR_THREAD_COUNT * 2,
- 30, TimeUnit.SECONDS,
- "test-operations"));
-
/**
* The number of files in a non-scaled test.
@@ -743,87 +719,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
}
/**
- * Write the text to a file asynchronously. Logs the operation duration.
- * @param fs filesystem
- * @param path path
- * @return future to the patch created.
- */
- private static CompletableFuture<Path> put(FileSystem fs,
- Path path, String text) {
- return submit(EXECUTOR, () -> {
- try (DurationInfo ignore =
- new DurationInfo(LOG, false, "Creating %s", path)) {
- createFile(fs, path, true, text.getBytes(Charsets.UTF_8));
- return path;
- }
- });
- }
-
- /**
- * Build a set of files in a directory tree.
- * @param fs filesystem
- * @param destDir destination
- * @param depth file depth
- * @param fileCount number of files to create.
- * @param dirCount number of dirs to create at each level
- * @return the list of files created.
- */
- public static List<Path> createFiles(final FileSystem fs,
- final Path destDir,
- final int depth,
- final int fileCount,
- final int dirCount) throws IOException {
- return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
- new ArrayList<Path>(fileCount),
- new ArrayList<Path>(dirCount));
- }
-
- /**
- * Build a set of files in a directory tree.
- * @param fs filesystem
- * @param destDir destination
- * @param depth file depth
- * @param fileCount number of files to create.
- * @param dirCount number of dirs to create at each level
- * @param paths [out] list of file paths created
- * @param dirs [out] list of directory paths created.
- * @return the list of files created.
- */
- public static List<Path> createDirsAndFiles(final FileSystem fs,
- final Path destDir,
- final int depth,
- final int fileCount,
- final int dirCount,
- final List<Path> paths,
- final List<Path> dirs) throws IOException {
- buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
- List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
- + dirs.size());
-
- // create directories. With dir marker retention, that adds more entries
- // to cause deletion issues
- try (DurationInfo ignore =
- new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
- for (Path path : dirs) {
- futures.add(submit(EXECUTOR, () ->{
- fs.mkdirs(path);
- return path;
- }));
- }
- waitForCompletion(futures);
- }
-
- try (DurationInfo ignore =
- new DurationInfo(LOG, "Creating %d files", paths.size())) {
- for (Path path : paths) {
- futures.add(put(fs, path, path.getName()));
- }
- waitForCompletion(futures);
- return paths;
- }
- }
-
- /**
* Verifies that s3:DeleteObjectVersion is not required for rename.
* <p></p>
* See HADOOP-17621.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
index dbd89b9..9bc3aef 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
@@ -164,7 +164,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
new ArrayList<>()));
a(factory.newCopyObjectRequest(path, path2, md));
a(factory.newDeleteObjectRequest(path));
- a(factory.newBulkDeleteRequest(new ArrayList<>(), true));
+ a(factory.newBulkDeleteRequest(new ArrayList<>()));
a(factory.newDirectoryMarkerRequest(path));
a(factory.newGetObjectRequest(path));
a(factory.newGetObjectMetadataRequest(path));
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
index d5862bc..dbdd8b5 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
@@ -38,7 +38,7 @@ import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
-import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles;
import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex;
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java
index a2aebc827..fa1ad2d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java
@@ -23,7 +23,6 @@ import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
-import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;
@@ -99,13 +98,11 @@ public class MinimalOperationCallbacks
}
@Override
- public DeleteObjectsResult removeKeys(
- List<DeleteObjectsRequest.KeyVersion> keysToDelete,
- boolean deleteFakeDir,
- boolean quiet)
+ public void removeKeys(
+ List<DeleteObjectsRequest.KeyVersion> keysToDelete,
+ boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
- return null;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org