You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/01/18 13:14:19 UTC
hadoop git commit: HADOOP-13974. S3Guard CLI to support list/purge of
pending multipart commits. Contributed by Aaron Fabbri
Repository: hadoop
Updated Branches:
refs/heads/trunk de630708d -> 1093a7368
HADOOP-13974. S3Guard CLI to support list/purge of pending multipart commits.
Contributed by Aaron Fabbri
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1093a736
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1093a736
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1093a736
Branch: refs/heads/trunk
Commit: 1093a73689912f78547e6d23023be2fd1c7ddc85
Parents: de63070
Author: Steve Loughran <st...@apache.org>
Authored: Thu Jan 18 13:13:58 2018 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Jan 18 13:13:58 2018 +0000
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/s3a/Invoker.java | 7 +-
.../apache/hadoop/fs/s3a/MultipartUtils.java | 214 ++++++++++++++
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 30 +-
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 3 +-
.../hadoop/fs/s3a/WriteOperationHelper.java | 5 +-
.../hadoop/fs/s3a/commit/CommitOperations.java | 2 +-
.../fs/s3a/commit/MagicCommitIntegration.java | 2 +-
.../hadoop/fs/s3a/s3guard/S3GuardTool.java | 287 +++++++++++++++++--
.../src/site/markdown/tools/hadoop-aws/index.md | 7 +-
.../site/markdown/tools/hadoop-aws/s3guard.md | 35 ++-
.../hadoop/fs/s3a/ITestS3AMultipartUtils.java | 126 ++++++++
.../apache/hadoop/fs/s3a/MockS3AFileSystem.java | 7 +
.../hadoop/fs/s3a/MultipartTestUtils.java | 184 ++++++++++++
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 21 +-
.../fs/s3a/commit/AbstractCommitITest.java | 3 +-
.../commit/magic/ITestS3AHugeMagicCommits.java | 2 +-
.../fs/s3a/s3guard/ITestS3GuardToolLocal.java | 187 ++++++++++++
17 files changed, 1064 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
index 107a247..875948e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Optional;
+import javax.annotation.Nullable;
import com.amazonaws.AmazonClientException;
import com.amazonaws.SdkBaseException;
@@ -222,7 +223,7 @@ public class Invoker {
*/
@Retries.RetryTranslated
public <T> T retry(String action,
- String path,
+ @Nullable String path,
boolean idempotent,
Operation<T> operation)
throws IOException {
@@ -247,7 +248,7 @@ public class Invoker {
@Retries.RetryTranslated
public <T> T retry(
String action,
- String path,
+ @Nullable String path,
boolean idempotent,
Retried retrying,
Operation<T> operation)
@@ -413,7 +414,7 @@ public class Invoker {
* @param path path (may be null or empty)
* @return string for logs
*/
- private static String toDescription(String action, String path) {
+ private static String toDescription(String action, @Nullable String path) {
return action +
(StringUtils.isNotEmpty(path) ? (" on " + path) : "");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java
new file mode 100644
index 0000000..6eb490f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
+import com.amazonaws.services.s3.model.MultipartUpload;
+import com.amazonaws.services.s3.model.MultipartUploadListing;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.RemoteIterator;
+
+
+/**
+ * MultipartUtils upload-specific functions for use by S3AFileSystem and Hadoop
+ * CLI.
+ */
+public final class MultipartUtils {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MultipartUtils.class);
+
+ /** Not instantiated. */
+ private MultipartUtils() { }
+
+ /**
+ * List outstanding multipart uploads.
+ * Package private: S3AFileSystem and tests are the users of this.
+ * @param s3 AmazonS3 client to use.
+ * @param bucketName name of S3 bucket to use.
+ * @param maxKeys maximum batch size to request at a time from S3.
+ * @param prefix optional key prefix to narrow search. If null then whole
+ * bucket will be searched.
+ * @return an iterator of matching uploads
+ */
+ static MultipartUtils.UploadIterator listMultipartUploads(AmazonS3 s3,
+ Invoker invoker, String bucketName, int maxKeys, @Nullable String prefix)
+ throws IOException {
+ return new MultipartUtils.UploadIterator(s3, invoker, bucketName, maxKeys,
+ prefix);
+ }
+
+ /**
+ * Simple RemoteIterator wrapper for AWS `listMultipartUpload` API.
+ * Iterates over batches of multipart upload metadata listings.
+ */
+ static class ListingIterator implements
+ RemoteIterator<MultipartUploadListing> {
+
+ private final String bucketName;
+ private final String prefix;
+ private final int maxKeys;
+ private final AmazonS3 s3;
+ private final Invoker invoker;
+
+ /**
+ * Most recent listing results.
+ */
+ private MultipartUploadListing listing;
+
+ /**
+ * Indicator that this is the first listing.
+ */
+ private boolean firstListing = true;
+
+ private int listCount = 1;
+
+ ListingIterator(AmazonS3 s3, Invoker invoker, String bucketName,
+ int maxKeys, @Nullable String prefix) throws IOException {
+ this.s3 = s3;
+ this.bucketName = bucketName;
+ this.maxKeys = maxKeys;
+ this.prefix = prefix;
+ this.invoker = invoker;
+
+ requestNextBatch();
+ }
+
+ /**
+ * Iterator has data if it is either is the initial iteration, or
+ * the last listing obtained was incomplete.
+ * @throws IOException not thrown by this implementation.
+ */
+ @Override
+ public boolean hasNext() throws IOException {
+ if (listing == null) {
+ // shouldn't happen, but don't trust AWS SDK
+ return false;
+ } else {
+ return firstListing || listing.isTruncated();
+ }
+ }
+
+ /**
+ * Get next listing. First call, this returns initial set (possibly
+ * empty) obtained from S3. Subsequent calls my block on I/O or fail.
+ * @return next upload listing.
+ * @throws IOException if S3 operation fails.
+ * @throws NoSuchElementException if there are no more uploads.
+ */
+ @Override
+ @Retries.RetryTranslated
+ public MultipartUploadListing next() throws IOException {
+ if (firstListing) {
+ firstListing = false;
+ } else {
+ if (listing == null || !listing.isTruncated()) {
+ // nothing more to request: fail.
+ throw new NoSuchElementException("No more uploads under " + prefix);
+ }
+ // need to request a new set of objects.
+ requestNextBatch();
+ }
+ return listing;
+ }
+
+ @Override
+ public String toString() {
+ return "Upload iterator: prefix " + prefix + "; list count " +
+ listCount + "; isTruncated=" + listing.isTruncated();
+ }
+
+ @Retries.RetryTranslated
+ private void requestNextBatch() throws IOException {
+ ListMultipartUploadsRequest req =
+ new ListMultipartUploadsRequest(bucketName);
+ if (prefix != null) {
+ req.setPrefix(prefix);
+ }
+ if (!firstListing) {
+ req.setKeyMarker(listing.getNextKeyMarker());
+ req.setUploadIdMarker(listing.getNextUploadIdMarker());
+ }
+ req.setMaxUploads(listCount);
+
+ LOG.debug("[{}], Requesting next {} uploads prefix {}, " +
+ "next key {}, next upload id {}", listCount, maxKeys, prefix,
+ req.getKeyMarker(), req.getUploadIdMarker());
+ listCount++;
+
+ listing = invoker.retry("listMultipartUploads", prefix, true,
+ () -> s3.listMultipartUploads(req));
+ LOG.debug("New listing state: {}", this);
+ }
+ }
+
+ /**
+ * Iterator over multipart uploads. Similar to
+ * {@link org.apache.hadoop.fs.s3a.Listing.FileStatusListingIterator}, but
+ * iterates over pending uploads instead of existing objects.
+ */
+ public static class UploadIterator
+ implements RemoteIterator<MultipartUpload> {
+
+ private ListingIterator lister;
+ /** Current listing: the last upload listing we fetched. */
+ private MultipartUploadListing listing;
+ /** Iterator over the current listing. */
+ private ListIterator<MultipartUpload> batchIterator;
+
+ @Retries.RetryTranslated
+ public UploadIterator(AmazonS3 s3, Invoker invoker, String bucketName,
+ int maxKeys, @Nullable String prefix)
+ throws IOException {
+
+ lister = new ListingIterator(s3, invoker, bucketName, maxKeys, prefix);
+ requestNextBatch();
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return (batchIterator.hasNext() || requestNextBatch());
+ }
+
+ @Override
+ public MultipartUpload next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return batchIterator.next();
+ }
+
+ private boolean requestNextBatch() throws IOException {
+ if (lister.hasNext()) {
+ listing = lister.next();
+ batchIterator = listing.getMultipartUploads().listIterator();
+ return batchIterator.hasNext();
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 92863c3..fced494 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -43,6 +43,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
@@ -195,6 +196,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private String blockOutputBuffer;
private S3ADataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
+ private WriteOperationHelper writeHelper;
private boolean useListV1;
private MagicCommitIntegration committerIntegration;
@@ -248,6 +250,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3Client(name);
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
+ writeHelper = new WriteOperationHelper(this, getConf());
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
listing = new Listing(this);
@@ -763,13 +766,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
partSize,
blockFactory,
instrumentation.newOutputStreamStatistics(statistics),
- createWriteOperationHelper(),
+ getWriteOperationHelper(),
putTracker),
null);
}
/**
- * Create a new {@code WriteOperationHelper} instance.
+ * Get a {@code WriteOperationHelper} instance.
*
* This class permits other low-level operations against the store.
* It is unstable and
@@ -778,8 +781,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
* @return a new helper.
*/
@InterfaceAudience.Private
- public WriteOperationHelper createWriteOperationHelper() {
- return new WriteOperationHelper(this);
+ public WriteOperationHelper getWriteOperationHelper() {
+ return writeHelper;
}
/**
@@ -3154,7 +3157,25 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
}
/**
+ * List any pending multipart uploads whose keys begin with prefix, using
+ * an iterator that can handle an unlimited number of entries.
+ * See {@link #listMultipartUploads(String)} for a non-iterator version of
+ * this.
+ *
+ * @param prefix optional key prefix to search
+ * @return Iterator over multipart uploads.
+ * @throws IOException on failure
+ */
+ public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
+ throws IOException {
+ return MultipartUtils.listMultipartUploads(s3, invoker, bucket, maxKeys,
+ prefix);
+ }
+
+ /**
* Listing all multipart uploads; limited to the first few hundred.
+ * See {@link #listUploads(String)} for an iterator-based version that does
+ * not limit the number of entries returned.
* Retry policy: retry, translated.
* @return a listing of multipart uploads.
* @param prefix prefix to scan for, "" for none
@@ -3241,5 +3262,4 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
return false;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index e38c165..4dd6ed1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -51,6 +51,7 @@ import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -153,7 +154,7 @@ public final class S3AUtils {
* @return an IOE which wraps the caught exception.
*/
@SuppressWarnings("ThrowableInstanceNeverThrown")
- public static IOException translateException(String operation,
+ public static IOException translateException(@Nullable String operation,
String path,
SdkBaseException exception) {
String message = String.format("%s%s: %s",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
index 930c005..46ca65c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
@@ -38,6 +38,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,9 +85,9 @@ public class WriteOperationHelper {
* @param conf Configuration object
*
*/
- protected WriteOperationHelper(S3AFileSystem owner) {
+ protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) {
this.owner = owner;
- this.invoker = new Invoker(new S3ARetryPolicy(owner.getConf()),
+ this.invoker = new Invoker(new S3ARetryPolicy(conf),
this::operationRetried);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
index 1338d2e..f6e12f4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -101,7 +101,7 @@ public class CommitOperations {
Preconditions.checkArgument(fs != null, "null fs");
this.fs = fs;
statistics = fs.newCommitterStatistics();
- writeOperations = fs.createWriteOperationHelper();
+ writeOperations = fs.getWriteOperationHelper();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
index a07b5c9..7f9dadf 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
@@ -101,7 +101,7 @@ public class MagicCommitIntegration {
key,
destKey,
pendingsetPath,
- owner.createWriteOperationHelper());
+ owner.getWriteOperationHelper());
LOG.debug("Created {}", tracker);
} else {
LOG.warn("File being created has a \"magic\" path, but the filesystem"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index ace043b..e764021 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -23,14 +23,17 @@ import java.io.IOException;
import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.amazonaws.services.s3.model.MultipartUpload;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -44,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.MultipartUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
@@ -55,6 +59,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.*;
/**
@@ -79,6 +84,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
"\t" + Destroy.NAME + " - " + Destroy.PURPOSE + "\n" +
"\t" + Import.NAME + " - " + Import.PURPOSE + "\n" +
"\t" + BucketInfo.NAME + " - " + BucketInfo.PURPOSE + "\n" +
+ "\t" + Uploads.NAME + " - " + Uploads.PURPOSE + "\n" +
"\t" + Diff.NAME + " - " + Diff.PURPOSE + "\n" +
"\t" + Prune.NAME + " - " + Prune.PURPOSE + "\n" +
"\t" + SetCapacity.NAME + " - " +SetCapacity.PURPOSE + "\n";
@@ -100,10 +106,14 @@ public abstract class S3GuardTool extends Configured implements Tool {
private final CommandFormat commandFormat;
public static final String META_FLAG = "meta";
+
+ // These are common to prune, upload subcommands.
public static final String DAYS_FLAG = "days";
public static final String HOURS_FLAG = "hours";
public static final String MINUTES_FLAG = "minutes";
public static final String SECONDS_FLAG = "seconds";
+ public static final String AGE_OPTIONS_USAGE = "[-days <days>] "
+ + "[-hours <hours>] [-minutes <minutes>] [-seconds <seconds>]";
public static final String REGION_FLAG = "region";
public static final String READ_FLAG = "read";
@@ -177,6 +187,36 @@ public abstract class S3GuardTool extends Configured implements Tool {
"config, or S3 bucket");
}
+ private long getDeltaComponent(TimeUnit unit, String arg) {
+ String raw = getCommandFormat().getOptValue(arg);
+ if (raw == null || raw.isEmpty()) {
+ return 0;
+ }
+ Long parsed = Long.parseLong(raw);
+ return unit.toMillis(parsed);
+ }
+
+ /**
+ * Convert all age options supplied to total milliseconds of time.
+ * @return Sum of all age options, or zero if none were given.
+ */
+ long ageOptionsToMsec() {
+ long cliDelta = 0;
+ cliDelta += getDeltaComponent(TimeUnit.DAYS, DAYS_FLAG);
+ cliDelta += getDeltaComponent(TimeUnit.HOURS, HOURS_FLAG);
+ cliDelta += getDeltaComponent(TimeUnit.MINUTES, MINUTES_FLAG);
+ cliDelta += getDeltaComponent(TimeUnit.SECONDS, SECONDS_FLAG);
+ return cliDelta;
+ }
+
+ protected void addAgeOptions() {
+ CommandFormat format = getCommandFormat();
+ format.addOptionWithValue(DAYS_FLAG);
+ format.addOptionWithValue(HOURS_FLAG);
+ format.addOptionWithValue(MINUTES_FLAG);
+ format.addOptionWithValue(SECONDS_FLAG);
+ }
+
/**
* Parse metadata store from command line option or HDFS configuration.
*
@@ -867,7 +907,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
"Common options:\n" +
" -" + META_FLAG + " URL - Metadata repository details " +
"(implementation-specific)\n" +
- "\n" +
+ "Age options. Any combination of these integer-valued options:\n" +
+ AGE_OPTIONS_USAGE + "\n" +
"Amazon DynamoDB-specific options:\n" +
" -" + REGION_FLAG + " REGION - Service region for connections\n" +
"\n" +
@@ -877,12 +918,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
Prune(Configuration conf) {
super(conf);
-
- CommandFormat format = getCommandFormat();
- format.addOptionWithValue(DAYS_FLAG);
- format.addOptionWithValue(HOURS_FLAG);
- format.addOptionWithValue(MINUTES_FLAG);
- format.addOptionWithValue(SECONDS_FLAG);
+ addAgeOptions();
}
@VisibleForTesting
@@ -901,15 +937,6 @@ public abstract class S3GuardTool extends Configured implements Tool {
return USAGE;
}
- private long getDeltaComponent(TimeUnit unit, String arg) {
- String raw = getCommandFormat().getOptValue(arg);
- if (raw == null || raw.isEmpty()) {
- return 0;
- }
- Long parsed = Long.parseLong(raw);
- return unit.toMillis(parsed);
- }
-
public int run(String[] args, PrintStream out) throws
InterruptedException, IOException {
List<String> paths = parseArgs(args);
@@ -924,11 +951,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
Configuration conf = getConf();
long confDelta = conf.getLong(S3GUARD_CLI_PRUNE_AGE, 0);
- long cliDelta = 0;
- cliDelta += getDeltaComponent(TimeUnit.DAYS, "days");
- cliDelta += getDeltaComponent(TimeUnit.HOURS, "hours");
- cliDelta += getDeltaComponent(TimeUnit.MINUTES, "minutes");
- cliDelta += getDeltaComponent(TimeUnit.SECONDS, "seconds");
+ long cliDelta = ageOptionsToMsec();
if (confDelta <= 0 && cliDelta <= 0) {
errorln("You must specify a positive age for metadata to prune.");
@@ -1080,6 +1103,214 @@ public abstract class S3GuardTool extends Configured implements Tool {
}
+ /**
+ * Command to list / abort pending multipart uploads.
+ */
+ static class Uploads extends S3GuardTool {
+ public static final String NAME = "uploads";
+ public static final String ABORT = "abort";
+ public static final String LIST = "list";
+ public static final String EXPECT = "expect";
+ public static final String VERBOSE = "verbose";
+ public static final String FORCE = "force";
+
+ public static final String PURPOSE = "list or abort pending " +
+ "multipart uploads";
+ private static final String USAGE = NAME + " [OPTIONS] " +
+ "s3a://BUCKET[/path]\n"
+ + "\t" + PURPOSE + "\n\n"
+ + "Common options:\n"
+ + " (-" + LIST + " | -" + EXPECT +" <num-uploads> | -" + ABORT
+ + ") [-" + VERBOSE +"] "
+ + "[<age-options>] [-force]\n"
+ + "\t - Under given path, list or delete all uploads," +
+ " or only those \n"
+ + "older than specified by <age-options>\n"
+ + "<age-options> are any combination of the integer-valued options:\n"
+ + "\t" + AGE_OPTIONS_USAGE + "\n"
+ + "-" + EXPECT + " is similar to list, except no output is printed,\n"
+ + "\tbut the exit code will be an error if the provided number\n"
+ + "\tis different that the number of uploads found by the command.\n"
+ + "-" + FORCE + " option prevents the \"Are you sure\" prompt when\n"
+ + "\tusing -" + ABORT;
+
+ /** Constant used for output and parsed by tests. */
+ public static final String TOTAL = "Total";
+
+ /** Runs in one of three modes. */
+ private enum Mode { LIST, EXPECT, ABORT };
+ private Mode mode = null;
+
+ /** For Mode == EXPECT, expected listing size. */
+ private int expectedCount;
+
+ /** List/abort uploads older than this many milliseconds. */
+ private long ageMsec = 0;
+
+ /** Verbose output flag. */
+ private boolean verbose = false;
+
+ /** Whether to delete with out "are you sure" prompt. */
+ private boolean force = false;
+
+ /** Path prefix to use when searching multipart uploads. */
+ private String prefix;
+
+ Uploads(Configuration conf) {
+ super(conf, ABORT, LIST, VERBOSE, FORCE);
+ addAgeOptions();
+ getCommandFormat().addOptionWithValue(EXPECT);
+ }
+
+ @Override
+ String getName() {
+ return NAME;
+ }
+
+ @Override
+ public String getUsage() {
+ return USAGE;
+ }
+
+ public int run(String[] args, PrintStream out)
+ throws InterruptedException, IOException {
+ List<String> paths = parseArgs(args);
+ if (paths.isEmpty()) {
+ errorln(getUsage());
+ throw invalidArgs("No options specified");
+ }
+ processArgs(paths, out);
+ promptBeforeAbort(out);
+ processUploads(out);
+
+ out.flush();
+ return SUCCESS;
+ }
+
+ private void promptBeforeAbort(PrintStream out) throws IOException {
+ if (mode != Mode.ABORT || force) {
+ return;
+ }
+ Scanner scanner = new Scanner(System.in, "UTF-8");
+ out.println("Are you sure you want to delete any pending " +
+ "uploads? (yes/no) >");
+ String response = scanner.nextLine();
+ if (!"yes".equalsIgnoreCase(response)) {
+ throw S3GuardTool.userAborted("User did not answer yes, quitting.");
+ }
+ }
+
+ private void processUploads(PrintStream out) throws IOException {
+ MultipartUtils.UploadIterator uploads;
+ uploads = getFilesystem().listUploads(prefix);
+
+ int count = 0;
+ while (uploads.hasNext()) {
+ MultipartUpload upload = uploads.next();
+ if (!olderThan(upload, ageMsec)) {
+ continue;
+ }
+ count++;
+ if (mode == Mode.ABORT || mode == Mode.LIST || verbose) {
+ println(out, "%s%s %s", mode == Mode.ABORT ? "Deleting: " : "",
+ upload.getKey(), upload.getUploadId());
+ }
+ if (mode == Mode.ABORT) {
+ getFilesystem().getWriteOperationHelper()
+ .abortMultipartUpload(upload.getKey(), upload.getUploadId(),
+ LOG_EVENT);
+ }
+ }
+ if (mode != Mode.EXPECT || verbose) {
+ println(out, "%s %d uploads %s.", TOTAL, count,
+ mode == Mode.ABORT ? "deleted" : "found");
+ }
+ if (mode == Mode.EXPECT) {
+ if (count != expectedCount) {
+ throw badState("Expected %d uploads, found %d", expectedCount, count);
+ }
+ }
+ }
+
+ /**
+ * Check if upload is at least as old as given age.
+ * @param u upload to check
+ * @param msec age in milliseconds
+ * @return true iff u was created at least age milliseconds ago.
+ */
+ private boolean olderThan(MultipartUpload u, long msec) {
+ Date ageDate = new Date(System.currentTimeMillis() - msec);
+ return ageDate.compareTo(u.getInitiated()) >= 0;
+ }
+
+ private void processArgs(List<String> args, PrintStream out)
+ throws IOException {
+ CommandFormat commands = getCommandFormat();
+ String err = "Can only specify one of -" + LIST + ", " +
+ " -" + ABORT + ", and " + EXPECT;
+
+ // Three mutually-exclusive options
+ if (commands.getOpt(LIST)) {
+ mode = Mode.LIST;
+ }
+ if (commands.getOpt(ABORT)) {
+ if (mode != null) {
+ throw invalidArgs(err);
+ }
+ mode = Mode.ABORT;
+ }
+
+ String expectVal = commands.getOptValue(EXPECT);
+ if (expectVal != null) {
+ if (mode != null) {
+ throw invalidArgs(err);
+ }
+ mode = Mode.EXPECT;
+ expectedCount = Integer.parseInt(expectVal);
+ }
+
+ // Default to list
+ if (mode == null) {
+ vprintln(out, "No mode specified, defaulting to -" + LIST);
+ mode = Mode.LIST;
+ }
+
+ // Other flags
+ if (commands.getOpt(VERBOSE)) {
+ verbose = true;
+ }
+ if (commands.getOpt(FORCE)) {
+ force = true;
+ }
+ ageMsec = ageOptionsToMsec();
+
+ String s3Path = args.get(0);
+ URI uri = S3GuardTool.toUri(s3Path);
+ prefix = uri.getPath();
+ if (prefix.length() > 0) {
+ prefix = prefix.substring(1);
+ }
+ vprintln(out, "Command: %s, age %d msec, path %s (prefix \"%s\")",
+ mode.name(), ageMsec, s3Path, prefix);
+
+ initS3AFileSystem(s3Path);
+ }
+
+ /**
+ * If verbose flag is set, print a formatted string followed by a newline
+ * to the output stream.
+ * @param out destination
+ * @param format format string
+ * @param args optional arguments
+ */
+ private void vprintln(PrintStream out, String format, Object...
+ args) {
+ if (verbose) {
+ out.println(String.format(format, args));
+ }
+ }
+ }
+
private static S3GuardTool command;
/**
@@ -1183,6 +1414,17 @@ public abstract class S3GuardTool extends Configured implements Tool {
}
/**
+ * Build the exception to raise on user-aborted action.
+ * @param format string format
+ * @param args optional arguments for the string
+ * @return a new exception to throw
+ */
+ protected static ExitUtil.ExitException userAborted(
+ String format, Object...args) {
+ return new ExitUtil.ExitException(ERROR, String.format(format, args));
+ }
+
+ /**
* Execute the command with the given arguments.
*
* @param conf Hadoop configuration.
@@ -1224,6 +1466,9 @@ public abstract class S3GuardTool extends Configured implements Tool {
case SetCapacity.NAME:
command = new SetCapacity(conf);
break;
+ case Uploads.NAME:
+ command = new Uploads(conf);
+ break;
default:
printHelp();
throw new ExitUtil.ExitException(E_USAGE,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index c413e19..0e03100 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1531,8 +1531,13 @@ from VMs running on EC2.
</property>
```
-### <a name="multipart_purge"></a>Cleaning up after partial Upload Failures: `fs.s3a.multipart.purge`
+### <a name="multipart_purge"></a>Cleaning up after partial Upload Failures
+There are two mechanisms for cleaning up after leftover multipart
+uploads:
+- Hadoop s3guard CLI commands for listing and deleting uploads by their
+age. Doumented in the [S3Guard](./s3guard.html) section.
+- The configuration parameter `fs.s3a.multipart.purge`, covered below.
If an large stream writeoperation is interrupted, there may be
intermediate partitions uploaded to S3 —data which will be billed for.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index e2cb549..1050f8a 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -515,9 +515,42 @@ hadoop s3guard bucket-info -guarded -auth s3a://landsat-pds
Require the bucket to be using S3Guard in authoritative mode. This will normally
fail against this specific bucket.
+### List or Delete Leftover Multipart Uploads: `s3guard uploads`
-### Delete a table: `s3guard destroy`
+Lists or deletes all pending (uncompleted) multipart uploads older than
+given age.
+
+```bash
+hadoop s3guard uploads (-list | -abort | -expect <num-uploads>) [-verbose] \
+ [-days <days>] [-hours <hours>] [-minutes <minutes>] [-seconds <seconds>] \
+ [-force] s3a://bucket/prefix
+```
+The command lists or deletes all multipart uploads which are older than
+the given age, and that match the prefix supplied, if any.
+
+For example, to delete all uncompleted multipart uploads older than two
+days in the folder at `s3a://my-bucket/path/to/stuff`, use the following
+command:
+
+```bash
+hadoop s3guard uploads -abort -days 2 s3a://my-bucket/path/to/stuff
+```
+
+We recommend running with `-list` first to confirm the parts shown
+are those that you wish to delete. Note that the command will prompt
+you with a "Are you sure?" prompt unless you specify the `-force`
+option. This is to safeguard against accidental deletion of data, which
+is especially risky without a long age parameter as it can affect
+in-fight uploads.
+
+The `-expect` option is similar to `-list`, except it is silent by
+default, and terminates with a success or failure exit code depending
+on whether or not the supplied number matches the number of uploads
+found that match the given options (path, age).
+
+
+### Delete a table: `s3guard destroy`
Deletes a metadata store. With DynamoDB as the store, this means
the specific DynamoDB table use to store the metadata.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java
new file mode 100644
index 0000000..4746ad5
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMultipartUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.MultipartUpload;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ * Tests for {@link MultipartUtils}.
+ */
+public class ITestS3AMultipartUtils extends AbstractS3ATestBase {
+
+ private static final int UPLOAD_LEN = 1024;
+ private static final String PART_FILENAME_BASE = "pending-part";
+ private static final int LIST_BATCH_SIZE = 2;
+ private static final int NUM_KEYS = 5;
+
+
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ // Forces listings to come back in multiple batches to test that part of
+ // the iterators.
+ conf.setInt(Constants.MAX_PAGING_KEYS, LIST_BATCH_SIZE);
+ return conf;
+ }
+
+ /**
+ * Main test case for upload part listing and iterator paging.
+ * @throws Exception on failure.
+ */
+ @Test
+ public void testListMultipartUploads() throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Set<MultipartTestUtils.IdKey> keySet = new HashSet<>();
+ try {
+ // 1. Create NUM_KEYS pending upload parts
+ for (int i = 0; i < NUM_KEYS; i++) {
+ Path filePath = getPartFilename(i);
+ String key = fs.pathToKey(filePath);
+ describe("creating upload part with key %s", key);
+ // create a multipart upload
+ MultipartTestUtils.IdKey idKey = MultipartTestUtils
+ .createPartUpload(fs, key, UPLOAD_LEN,
+ 1);
+ keySet.add(idKey);
+ }
+
+ // 2. Verify all uploads are found listing by prefix
+ describe("Verifying upload list by prefix");
+ MultipartUtils.UploadIterator uploads = fs.listUploads(getPartPrefix(fs));
+ assertUploadsPresent(uploads, keySet);
+
+ // 3. Verify all uploads are found listing without prefix
+ describe("Verifying list all uploads");
+ uploads = fs.listUploads(null);
+ assertUploadsPresent(uploads, keySet);
+
+ } finally {
+ // 4. Delete all uploads we created
+ MultipartTestUtils.cleanupParts(fs, keySet);
+ }
+ }
+
+ /**
+ * Assert that all provided multipart uploads are contained in the upload
+ * iterator's results.
+ * @param list upload iterator
+ * @param ourUploads set up uploads that should be present
+ * @throws IOException on I/O error
+ */
+ private void assertUploadsPresent(MultipartUtils.UploadIterator list,
+ Set<MultipartTestUtils.IdKey> ourUploads) throws IOException {
+
+ // Don't modify passed-in set, use copy.
+ Set<MultipartTestUtils.IdKey> uploads = new HashSet<>(ourUploads);
+ while (list.hasNext()) {
+ MultipartTestUtils.IdKey listing = toIdKey(list.next());
+ if (uploads.contains(listing)) {
+ LOG.debug("Matched: {},{}", listing.getKey(), listing.getUploadId());
+ uploads.remove(listing);
+ } else {
+ LOG.debug("Not our upload {},{}", listing.getKey(),
+ listing.getUploadId());
+ }
+ }
+ assertTrue("Not all our uploads were listed", uploads.isEmpty());
+ }
+
+ private MultipartTestUtils.IdKey toIdKey(MultipartUpload mu) {
+ return new MultipartTestUtils.IdKey(mu.getKey(), mu.getUploadId());
+ }
+
+ private Path getPartFilename(int index) throws IOException {
+ return path(String.format("%s-%d", PART_FILENAME_BASE, index));
+ }
+
+ private String getPartPrefix(S3AFileSystem fs) throws IOException {
+ return fs.pathToKey(path("blah").getParent());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
index 55e3e37..4952580 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3AFileSystem.java
@@ -78,6 +78,7 @@ public class MockS3AFileSystem extends S3AFileSystem {
private final S3AInstrumentation instrumentation =
new S3AInstrumentation(FS_URI);
private Configuration conf;
+ private WriteOperationHelper writeHelper;
public MockS3AFileSystem(S3AFileSystem mock,
Pair<StagingTestBase.ClientResults, StagingTestBase.ClientErrors> outcome) {
@@ -125,6 +126,12 @@ public class MockS3AFileSystem extends S3AFileSystem {
public void initialize(URI name, Configuration originalConf)
throws IOException {
conf = originalConf;
+ writeHelper = new WriteOperationHelper(this, conf);
+ }
+
+ @Override
+ public WriteOperationHelper getWriteOperationHelper() {
+ return writeHelper;
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
new file mode 100644
index 0000000..8be3ff7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MultipartTestUtils.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.MultipartUpload;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Invoker.LOG_EVENT;
+
+/**
+ * Utilities for S3A multipart upload tests.
+ */
+public final class MultipartTestUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ MultipartTestUtils.class);
+
+ /** Not instantiated. */
+ private MultipartTestUtils() { }
+
+ /**
+ * Clean up all provided uploads.
+ * @param keySet set of uploads to abort
+ */
+ static void cleanupParts(S3AFileSystem fs, Set <IdKey> keySet) {
+ boolean anyFailure = false;
+ for (IdKey ik : keySet) {
+ try {
+ LOG.debug("aborting upload id {}", ik.getUploadId());
+ fs.abortMultipartUpload(ik.getKey(), ik.getUploadId());
+ } catch (Exception e) {
+ LOG.error(String.format("Failure aborting upload %s, continuing.",
+ ik.getKey()), e);
+ anyFailure = true;
+ }
+ }
+ Assert.assertFalse("Failure aborting multipart upload(s), see log.",
+ anyFailure);
+ }
+
+ public static IdKey createPartUpload(S3AFileSystem fs, String key, int len,
+ int partNo) throws IOException {
+ WriteOperationHelper writeHelper = fs.getWriteOperationHelper();
+ byte[] data = dataset(len, 'a', 'z');
+ InputStream in = new ByteArrayInputStream(data);
+ String uploadId = writeHelper.initiateMultiPartUpload(key);
+ UploadPartRequest req = writeHelper.newUploadPartRequest(key, uploadId,
+ partNo, len, in, null, 0L);
+ PartETag partEtag = fs.uploadPart(req).getPartETag();
+ LOG.debug("uploaded part etag {}, upid {}", partEtag.getETag(), uploadId);
+ return new IdKey(key, uploadId);
+ }
+
+ /** Delete any uploads under given path (recursive). Silent on failure. */
+ public static void clearAnyUploads(S3AFileSystem fs, Path path) {
+ try {
+ String key = fs.pathToKey(path);
+ MultipartUtils.UploadIterator uploads = fs.listUploads(key);
+ while (uploads.hasNext()) {
+ MultipartUpload upload = uploads.next();
+ fs.getWriteOperationHelper().abortMultipartUpload(upload.getKey(),
+ upload.getUploadId(), LOG_EVENT);
+ LOG.debug("Cleaning up upload: {} {}", upload.getKey(),
+ truncatedUploadId(upload.getUploadId()));
+ }
+ } catch (IOException ioe) {
+ LOG.info("Ignoring exception: ", ioe);
+ }
+ }
+
+ /** Assert that there are not any upload parts at given path. */
+ public static void assertNoUploadsAt(S3AFileSystem fs, Path path) throws
+ Exception {
+ String key = fs.pathToKey(path);
+ MultipartUtils.UploadIterator uploads = fs.listUploads(key);
+ while (uploads.hasNext()) {
+ MultipartUpload upload = uploads.next();
+ Assert.fail("Found unexpected upload " + upload.getKey() + " " +
+ truncatedUploadId(upload.getUploadId()));
+ }
+ }
+
+ /** Get number of part uploads under given path. */
+ public static int countUploadsAt(S3AFileSystem fs, Path path) throws
+ IOException {
+ String key = fs.pathToKey(path);
+ MultipartUtils.UploadIterator uploads = fs.listUploads(key);
+ int count = 0;
+ while (uploads.hasNext()) {
+ MultipartUpload upload = uploads.next();
+ count++;
+ }
+ return count;
+ }
+
+ /**
+ * Get a list of all pending uploads under a prefix, one which can be printed.
+ * @param prefix prefix to look under
+ * @return possibly empty list
+ * @throws IOException IO failure.
+ */
+ public static List<String> listMultipartUploads(S3AFileSystem fs,
+ String prefix) throws IOException {
+
+ return fs
+ .listMultipartUploads(prefix).stream()
+ .map(upload -> String.format("Upload to %s with ID %s; initiated %s",
+ upload.getKey(),
+ upload.getUploadId(),
+ S3ATestUtils.LISTING_FORMAT.format(upload.getInitiated())))
+ .collect(Collectors.toList());
+ }
+
+
+ private static String truncatedUploadId(String fullId) {
+ return fullId.substring(0, 12) + " ...";
+ }
+
+ /** Struct of object key, upload ID. */
+ static class IdKey {
+ private String key;
+ private String uploadId;
+
+ IdKey(String key, String uploadId) {
+ this.key = key;
+ this.uploadId = uploadId;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getUploadId() {
+ return uploadId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ IdKey key1 = (IdKey) o;
+ return Objects.equals(key, key1.key) &&
+ Objects.equals(uploadId, key1.uploadId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, uploadId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 9dc4b8e..d6533bf 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -45,7 +45,6 @@ import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.List;
-import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
@@ -822,28 +821,10 @@ public final class S3ATestUtils {
/**
* Date format used for mapping upload initiation time to human string.
*/
- private static final DateFormat LISTING_FORMAT = new SimpleDateFormat(
+ public static final DateFormat LISTING_FORMAT = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
/**
- * Get a list of all pending uploads under a prefix, one which can be printed.
- * @param prefix prefix to look under
- * @return possibly empty list
- * @throws IOException IO failure.
- */
- public static List<String> listMultipartUploads(S3AFileSystem fs,
- String prefix) throws IOException {
-
- return fs
- .listMultipartUploads(prefix).stream()
- .map(upload -> String.format("Upload to %s with ID %s; initiated %s",
- upload.getKey(),
- upload.getUploadId(),
- LISTING_FORMAT.format(upload.getInitiated())))
- .collect(Collectors.toList());
- }
-
- /**
* Skip a test if the FS isn't marked as supporting magic commits.
* @param fs filesystem
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 267d4df..04676db 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -247,7 +248,7 @@ public abstract class AbstractCommitITest extends AbstractS3ATestBase {
S3AFileSystem fs = getFileSystem();
if (fs != null && path != null) {
String key = fs.pathToKey(path);
- WriteOperationHelper writeOps = fs.createWriteOperationHelper();
+ WriteOperationHelper writeOps = fs.getWriteOperationHelper();
int count = writeOps.abortMultipartUploadsUnderPath(key);
if (count > 0) {
log().info("Multipart uploads deleted: {}", count);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
index e3a295b..0722959 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestS3AHugeMagicCommits.java
@@ -40,8 +40,8 @@ import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles;
+import static org.apache.hadoop.fs.s3a.MultipartTestUtils.listMultipartUploads;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1093a736/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
index 43cbe93..64a2b13 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
@@ -24,10 +24,15 @@ import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -35,15 +40,20 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
+import static org.apache.hadoop.fs.s3a.MultipartTestUtils.*;
import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test S3Guard related CLI commands against a LocalMetadataStore.
+ * Also responsible for testing the non s3guard-specific commands that, for
+ * now, live under the s3guard CLI command.
*/
public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
private static final String LOCAL_METADATA = "local://metadata";
+ private static final String[] ABORT_FORCE_OPTIONS = new String[] {"-abort",
+ "-force", "-verbose"};
@Override
protected MetadataStore newMetadataStore() {
@@ -261,5 +271,182 @@ public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
LOG.info("Exec output=\n{}", output);
}
+ private final static String UPLOAD_PREFIX = "test-upload-prefix";
+ private final static String UPLOAD_NAME = "test-upload";
+ @Test
+ public void testUploads() throws Throwable {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
+
+ describe("Cleaning up any leftover uploads from previous runs.");
+ // 1. Make sure key doesn't already exist
+ clearAnyUploads(fs, path);
+
+ // 2. Confirm no uploads are listed via API
+ assertNoUploadsAt(fs, path.getParent());
+
+ // 3. Confirm no uploads are listed via CLI
+ describe("Confirming CLI lists nothing.");
+ assertNumUploads(path, 0);
+
+ // 4. Create a upload part
+ describe("Uploading single part.");
+ createPartUpload(fs, fs.pathToKey(path), 128, 1);
+
+ try {
+ // 5. Confirm it exists via API..
+ LambdaTestUtils.eventually(5000, /* 5 seconds until failure */
+ 1000, /* one second retry interval */
+ () -> {
+ assertEquals("Should be one upload", 1, countUploadsAt(fs, path));
+ });
+
+ // 6. Confirm part exists via CLI, direct path and parent path
+ describe("Confirming CLI lists one part");
+ LambdaTestUtils.eventually(5000, 1000,
+ () -> { assertNumUploads(path, 1); });
+ LambdaTestUtils.eventually(5000, 1000,
+ () -> { assertNumUploads(path.getParent(), 1); });
+
+ // 7. Use CLI to delete part, assert it worked
+ describe("Deleting part via CLI");
+ assertNumDeleted(fs, path, 1);
+
+ // 8. Confirm deletion via API
+ describe("Confirming deletion via API");
+ assertEquals("Should be no uploads", 0, countUploadsAt(fs, path));
+
+ // 9. Confirm no uploads are listed via CLI
+ describe("Confirming CLI lists nothing.");
+ assertNumUploads(path, 0);
+
+ } catch (Throwable t) {
+ // Clean up on intermediate failure
+ clearAnyUploads(fs, path);
+ throw t;
+ }
+ }
+
+ @Test
+ public void testUploadListByAge() throws Throwable {
+ S3AFileSystem fs = getFileSystem();
+ Path path = path(UPLOAD_PREFIX + "/" + UPLOAD_NAME);
+
+ describe("Cleaning up any leftover uploads from previous runs.");
+ // 1. Make sure key doesn't already exist
+ clearAnyUploads(fs, path);
+
+ // 2. Create a upload part
+ describe("Uploading single part.");
+ createPartUpload(fs, fs.pathToKey(path), 128, 1);
+
+ try {
+ // 3. Confirm it exists via API.. may want to wrap with
+ // LambdaTestUtils.eventually() ?
+ LambdaTestUtils.eventually(5000, 1000,
+ () -> {
+ assertEquals("Should be one upload", 1, countUploadsAt(fs, path));
+ });
+
+ // 4. Confirm part does appear in listing with long age filter
+ describe("Confirming CLI older age doesn't list");
+ assertNumUploadsAge(path, 0, 600);
+
+ // 5. Confirm part does not get deleted with long age filter
+ describe("Confirming CLI older age doesn't delete");
+ uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 0,
+ 600);
+
+ // 6. Wait a second and then assert the part is in listing of things at
+ // least a second old
+ describe("Sleeping 1 second then confirming upload still there");
+ Thread.sleep(1000);
+ LambdaTestUtils.eventually(5000, 1000,
+ () -> { assertNumUploadsAge(path, 1, 1); });
+
+ // 7. Assert deletion works when age filter matches
+ describe("Doing aged deletion");
+ uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path, 1, 1);
+ describe("Confirming age deletion happened");
+ assertEquals("Should be no uploads", 0, countUploadsAt(fs, path));
+ } catch (Throwable t) {
+ // Clean up on intermediate failure
+ clearAnyUploads(fs, path);
+ throw t;
+ }
+ }
+
+ @Test
+ public void testUploadNegativeExpect() throws Throwable {
+ runToFailure(E_BAD_STATE, Uploads.NAME, "-expect", "1",
+ path("/we/are/almost/postive/this/doesnt/exist/fhfsadfoijew")
+ .toString());
+ }
+
+ private void assertNumUploads(Path path, int numUploads) throws Exception {
+ assertNumUploadsAge(path, numUploads, 0);
+ }
+
+ private void assertNumUploadsAge(Path path, int numUploads, int ageSeconds)
+ throws Exception {
+ if (ageSeconds > 0) {
+ run(Uploads.NAME, "-expect", String.valueOf(numUploads), "-seconds",
+ String.valueOf(ageSeconds), path.toString());
+ } else {
+ run(Uploads.NAME, "-expect", String.valueOf(numUploads), path.toString());
+ }
+ }
+
+ private void assertNumDeleted(S3AFileSystem fs, Path path, int numDeleted)
+ throws Exception {
+ uploadCommandAssertCount(fs, ABORT_FORCE_OPTIONS, path,
+ numDeleted, 0);
+ }
+
+ /**
+ * Run uploads cli command and assert the reported count (listed or
+ * deleted) matches.
+ * @param fs S3AFileSystem
+ * @param options main command options
+ * @param path path of part(s)
+ * @param numUploads expected number of listed/deleted parts
+ * @param ageSeconds optional seconds of age to specify to CLI, or zero to
+ * search all parts
+ * @throws Exception on failure
+ */
+ private void uploadCommandAssertCount(S3AFileSystem fs, String options[],
+ Path path, int numUploads, int ageSeconds)
+ throws Exception {
+ List<String> allOptions = new ArrayList<>();
+ List<String> output = new ArrayList<>();
+ S3GuardTool.Uploads cmd = new S3GuardTool.Uploads(fs.getConf());
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ allOptions.add(cmd.getName());
+ allOptions.addAll(Arrays.asList(options));
+ if (ageSeconds > 0) {
+ allOptions.add("-" + Uploads.SECONDS_FLAG);
+ allOptions.add(String.valueOf(ageSeconds));
+ }
+ allOptions.add(path.toString());
+ exec(cmd, buf, allOptions.toArray(new String[0]));
+
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(new ByteArrayInputStream(buf.toByteArray())))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ String[] fields = line.split("\\s");
+ if (fields.length == 4 && fields[0].equals(Uploads.TOTAL)) {
+ int parsedUploads = Integer.valueOf(fields[1]);
+ LOG.debug("Matched CLI output: {} {} {} {}", fields);
+ assertEquals("Unexpected number of uploads", numUploads,
+ parsedUploads);
+ return;
+ }
+ LOG.debug("Not matched: {}", line);
+ output.add(line);
+ }
+ }
+ fail("Command output did not match: \n" + StringUtils.join("\n", output));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org