You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/06/03 16:33:34 UTC
[6/6] hadoop git commit: HADOOP-13171. Add StorageStatistics to S3A;
instrument some more operations. Contributed by Steve Loughran.
HADOOP-13171. Add StorageStatistics to S3A; instrument some more operations. Contributed by Steve Loughran.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b8216c10
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b8216c10
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b8216c10
Branch: refs/heads/branch-2.8
Commit: b8216c10d8f13b147916e62003af246c3cf9f50b
Parents: d0dc5aa
Author: Chris Nauroth <cn...@apache.org>
Authored: Fri Jun 3 08:56:36 2016 -0700
Committer: Chris Nauroth <cn...@apache.org>
Committed: Fri Jun 3 08:56:36 2016 -0700
----------------------------------------------------------------------
.../hadoop/fs/contract/ContractTestUtils.java | 420 +++++++++++++++
.../fs/s3a/ProgressableProgressListener.java | 94 ++++
.../hadoop/fs/s3a/S3AFastOutputStream.java | 65 +--
.../org/apache/hadoop/fs/s3a/S3AFileStatus.java | 7 +
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 507 ++++++++++++++-----
.../hadoop/fs/s3a/S3AInstrumentation.java | 218 +++++---
.../apache/hadoop/fs/s3a/S3AOutputStream.java | 98 +---
.../hadoop/fs/s3a/S3AStorageStatistics.java | 104 ++++
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 48 ++
.../org/apache/hadoop/fs/s3a/Statistic.java | 143 ++++++
.../src/site/markdown/tools/hadoop-aws/index.md | 12 +-
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 153 ++++++
.../hadoop/fs/s3a/TestS3AFileOperationCost.java | 192 +++++++
.../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 154 ++----
.../fs/s3a/scale/TestS3ADeleteManyFiles.java | 10 +-
.../s3a/scale/TestS3ADirectoryPerformance.java | 189 +++++++
.../scale/TestS3AInputStreamPerformance.java | 6 +-
.../src/test/resources/log4j.properties | 4 +-
18 files changed, 1985 insertions(+), 439 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 6343d40..20ba075 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.internal.AssumptionViolatedException;
@@ -34,8 +36,14 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
import java.util.Properties;
+import java.util.Set;
import java.util.UUID;
/**
@@ -892,4 +900,416 @@ public class ContractTestUtils extends Assert {
fs.delete(objectPath, false);
}
}
+
+ /**
+ * Make times more readable, by adding a "," every three digits.
+ * @param nanos nanos or other large number
+ * @return a string for logging
+ */
+ public static String toHuman(long nanos) {
+ return String.format(Locale.ENGLISH, "%,d", nanos);
+ }
+
+ /**
+ * Log the bandwidth of a timer as inferred from the number of
+ * bytes processed.
+ * @param timer timer
+ * @param bytes bytes processed in the time period
+ */
+ public static void bandwidth(NanoTimer timer, long bytes) {
+ LOG.info("Bandwidth = {} MB/S",
+ timer.bandwidthDescription(bytes));
+ }
+
+ /**
+ * Work out the bandwidth in MB/s.
+ * @param bytes bytes
+ * @param durationNS duration in nanos
+ * @return the number of megabytes/second of the recorded operation
+ */
+ public static double bandwidthMBs(long bytes, long durationNS) {
+ return (bytes * 1000.0) / durationNS;
+ }
+
+ /**
+ * Recursively create a directory tree.
+ * Return the details about the created tree. The files and directories
+ * are those created under the path, not the base directory created. That
+ * is retrievable via {@link TreeScanResults#getBasePath()}.
+ * @param fs filesystem
+ * @param current parent dir
+ * @param depth depth of directory tree
+ * @param width width: subdirs per entry
+ * @param files number of files per entry
+ * @param filesize size of files to create in bytes.
+ * @return the details about the created tree.
+ * @throws IOException IO Problems
+ */
+ public static TreeScanResults createSubdirs(FileSystem fs,
+ Path current,
+ int depth,
+ int width,
+ int files,
+ int filesize) throws IOException {
+ return createSubdirs(fs, current, depth, width, files,
+ filesize, "dir-", "file-", "0");
+ }
+
+ /**
+ * Recursively create a directory tree.
+ * @param fs filesystem
+ * @param current the current dir in the walk
+ * @param depth depth of directory tree
+ * @param width width: subdirs per entry
+ * @param files number of files per entry
+ * @param filesize size of files to create in bytes.
+ * @param dirPrefix prefix for directory entries
+ * @param filePrefix prefix for file entries
+ * @param marker string which is slowly built up to uniquely name things
+ * @return the details about the created tree.
+ * @throws IOException IO Problems
+ */
+ public static TreeScanResults createSubdirs(FileSystem fs,
+ Path current,
+ int depth,
+ int width,
+ int files,
+ int filesize,
+ String dirPrefix,
+ String filePrefix,
+ String marker) throws IOException {
+ fs.mkdirs(current);
+ TreeScanResults results = new TreeScanResults(current);
+ if (depth > 0) {
+ byte[] data = dataset(filesize, 'a', 'z');
+ for (int i = 0; i < files; i++) {
+ String name = String.format("%s-%s-%04d.txt", filePrefix, marker, i);
+ Path path = new Path(current, name);
+ createFile(fs, path, true, data);
+ results.add(fs, path);
+ }
+ for (int w = 0; w < width; w++) {
+ String marker2 = String.format("%s-%04d", marker, w);
+ Path child = new Path(current, dirPrefix + marker2);
+ results.add(createSubdirs(fs, child, depth - 1, width, files,
+ filesize, dirPrefix, filePrefix, marker2));
+ results.add(fs, child);
+ }
+ }
+ return results;
+ }
+
+ /**
+ * Predicate to determine if two lists are equivalent, that is, they
+ * contain the same entries.
+ * @param left first collection of paths
+ * @param right second collection of paths
+ * @return true if all entries are in each collection of path.
+ */
+ public static boolean collectionsEquivalent(Collection<Path> left,
+ Collection<Path> right) {
+ Set<Path> leftSet = new HashSet<>(left);
+ Set<Path> rightSet = new HashSet<>(right);
+ return leftSet.containsAll(right) && rightSet.containsAll(left);
+ }
+
+ /**
+ * Predicate to determine if two lists are equivalent, that is, they
+ * contain the same entries.
+ * @param left first collection of paths
+ * @param right second collection of paths
+ * @return true if all entries are in each collection of path.
+ */
+ public static boolean collectionsEquivalentNoDuplicates(Collection<Path> left,
+ Collection<Path> right) {
+ return collectionsEquivalent(left, right) &&
+ !containsDuplicates(left) && !containsDuplicates(right);
+ }
+
+
+ /**
+ * Predicate to test for a collection of paths containing duplicate entries.
+ * @param paths collection of paths
+ * @return true if there are duplicates.
+ */
+ public static boolean containsDuplicates(Collection<Path> paths) {
+ return new HashSet<>(paths).size() != paths.size();
+ }
+
+ /**
+ * Recursively list all entries, with a depth first traversal of the
+ * directory tree.
+ * @param path path
+ * @return the number of entries listed
+ * @throws IOException IO problems
+ */
+ public static TreeScanResults treeWalk(FileSystem fs, Path path)
+ throws IOException {
+ TreeScanResults dirsAndFiles = new TreeScanResults();
+
+ FileStatus[] statuses = fs.listStatus(path);
+ for (FileStatus status : statuses) {
+ LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : "");
+ }
+ for (FileStatus status : statuses) {
+ dirsAndFiles.add(status);
+ if (status.isDirectory()) {
+ dirsAndFiles.add(treeWalk(fs, status.getPath()));
+ }
+ }
+ return dirsAndFiles;
+ }
+
+ /**
+ * Results of recursive directory creation/scan operations.
+ */
+ public static final class TreeScanResults {
+
+ private Path basePath;
+ private final List<Path> files = new ArrayList<>();
+ private final List<Path> directories = new ArrayList<>();
+ private final List<Path> other = new ArrayList<>();
+
+
+ public TreeScanResults() {
+ }
+
+ public TreeScanResults(Path basePath) {
+ this.basePath = basePath;
+ }
+
+ /**
+ * Build from a located file status iterator.
+ * @param results results of the listFiles/listStatus call.
+ * @throws IOException IO problems during the iteration.
+ */
+ public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
+ throws IOException {
+ while (results.hasNext()) {
+ add(results.next());
+ }
+ }
+
+ /**
+ * Construct results from an array of statistics.
+ * @param stats statistics array. Must not be null.
+ */
+ public TreeScanResults(FileStatus[] stats) {
+ assertNotNull("Null file status array", stats);
+ for (FileStatus stat : stats) {
+ add(stat);
+ }
+ }
+
+ /**
+ * Add all paths in the other set of results to this instance.
+ * @param that the other instance
+ * @return this instance
+ */
+ public TreeScanResults add(TreeScanResults that) {
+ files.addAll(that.files);
+ directories.addAll(that.directories);
+ other.addAll(that.other);
+ return this;
+ }
+
+ /**
+ * Increment the counters based on the file status.
+ * @param status path status to count.
+ */
+ public void add(FileStatus status) {
+ if (status.isFile()) {
+ files.add(status.getPath());
+ } else if (status.isDirectory()) {
+ directories.add(status.getPath());
+ } else {
+ other.add(status.getPath());
+ }
+ }
+
+ public void add(FileSystem fs, Path path) throws IOException {
+ add(fs.getFileStatus(path));
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%d director%s and %d file%s",
+ getDirCount(),
+ getDirCount() == 1 ? "y" : "ies",
+ getFileCount(),
+ getFileCount() == 1 ? "" : "s");
+ }
+
+ /**
+ * Assert that the state of a listing has the specific number of files,
+ * directories and other entries. The error text will include
+ * the {@code text} param, the field in question, and the entire object's
+ * string value.
+ * @param text text prefix for assertions.
+ * @param f file count
+ * @param d expected directory count
+ * @param o expected other entries.
+ */
+ public void assertSizeEquals(String text, long f, long d, long o) {
+ String self = toString();
+ Assert.assertEquals(text + ": file count in " + self,
+ f, getFileCount());
+ Assert.assertEquals(text + ": directory count in " + self,
+ d, getDirCount());
+ Assert.assertEquals(text + ": 'other' count in " + self,
+ o, getOtherCount());
+ }
+
+ /**
+ * Assert that the trees are equivalent: that every list matches (and
+ * that neither has any duplicates).
+ * @param that the other entry
+ */
+ public void assertEquivalent(TreeScanResults that) {
+ String details = "this= " + this + "; that=" + that;
+ assertFieldsEquivalent("files", that, files, that.files);
+ assertFieldsEquivalent("directories", that,
+ directories, that.directories);
+ assertFieldsEquivalent("other", that, other, that.other);
+ }
+
+ /**
+ * Assert that a field in two instances are equivalent.
+ * @param fieldname field name for error messages
+ * @param that the other instance to scan
+ * @param ours our field's contents
+ * @param theirs the other instance's field constants
+ */
+ public void assertFieldsEquivalent(String fieldname,
+ TreeScanResults that,
+ List<Path> ours, List<Path> theirs) {
+ assertFalse("Duplicate " + files + " in " + this,
+ containsDuplicates(ours));
+ assertFalse("Duplicate " + files + " in other " + that,
+ containsDuplicates(theirs));
+ assertTrue(fieldname + " mismatch: between {" + this + "}" +
+ " and {" + that + "}",
+ collectionsEquivalent(files, that.files));
+ }
+
+ public List<Path> getFiles() {
+ return files;
+ }
+
+ public List<Path> getDirectories() {
+ return directories;
+ }
+
+ public List<Path> getOther() {
+ return other;
+ }
+
+ public Path getBasePath() {
+ return basePath;
+ }
+
+ public long getFileCount() {
+ return files.size();
+ }
+
+ public long getDirCount() {
+ return directories.size();
+ }
+
+ public long getOtherCount() {
+ return other.size();
+ }
+
+ /**
+ * Total count of entries.
+ * @return the total number of entries
+ */
+ public long totalCount() {
+ return getFileCount() + getDirCount() + getOtherCount();
+ }
+
+ }
+
+ /**
+ * A simple class for timing operations in nanoseconds, and for
+ * printing some useful results in the process.
+ */
+ public static final class NanoTimer {
+ private final long startTime;
+ private long endTime;
+
+ public NanoTimer() {
+ startTime = now();
+ }
+
+ /**
+ * End the operation.
+ * @return the duration of the operation
+ */
+ public long end() {
+ endTime = now();
+ return duration();
+ }
+
+ /**
+ * End the operation; log the duration.
+ * @param format message
+ * @param args any arguments
+ * @return the duration of the operation
+ */
+ public long end(String format, Object... args) {
+ long d = end();
+ LOG.info("Duration of {}: {} nS",
+ String.format(format, args), toHuman(d));
+ return d;
+ }
+
+ public long now() {
+ return System.nanoTime();
+ }
+
+ public long duration() {
+ return endTime - startTime;
+ }
+
+ public double bandwidth(long bytes) {
+ return bandwidthMBs(bytes, duration());
+ }
+
+ /**
+ * Bandwidth as bytes per second.
+ * @param bytes bytes in
+ * @return the number of bytes per second this operation timed.
+ */
+ public double bandwidthBytes(long bytes) {
+ return (bytes * 1.0) / duration();
+ }
+
+ /**
+ * How many nanoseconds per IOP, byte, etc.
+ * @param operations operations processed in this time period
+ * @return the nanoseconds it took each byte to be processed
+ */
+ public long nanosPerOperation(long operations) {
+ return duration() / operations;
+ }
+
+ /**
+ * Get a description of the bandwidth, even down to fractions of
+ * a MB.
+ * @param bytes bytes processed
+ * @return bandwidth
+ */
+ public String bandwidthDescription(long bytes) {
+ return String.format("%,.6f", bandwidth(bytes));
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java
new file mode 100644
index 0000000..0ce022a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.transfer.Upload;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+
+import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
+import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
+
+/**
+ * Listener to progress from AWS regarding transfers.
+ */
+public class ProgressableProgressListener implements ProgressListener {
+ private static final Logger LOG = S3AFileSystem.LOG;
+ private final S3AFileSystem fs;
+ private final String key;
+ private final Progressable progress;
+ private long lastBytesTransferred;
+ private final Upload upload;
+
+ /**
+ * Instantiate.
+ * @param fs filesystem: will be invoked with statistics updates
+ * @param key key for the upload
+ * @param upload source of events
+ * @param progress optional callback for progress.
+ */
+ public ProgressableProgressListener(S3AFileSystem fs,
+ String key,
+ Upload upload,
+ Progressable progress) {
+ this.fs = fs;
+ this.key = key;
+ this.upload = upload;
+ this.progress = progress;
+ this.lastBytesTransferred = 0;
+ }
+
+ @Override
+ public void progressChanged(ProgressEvent progressEvent) {
+ if (progress != null) {
+ progress.progress();
+ }
+
+ // There are 3 http ops here, but this should be close enough for now
+ ProgressEventType pet = progressEvent.getEventType();
+ if (pet == TRANSFER_PART_STARTED_EVENT ||
+ pet == TRANSFER_COMPLETED_EVENT) {
+ fs.incrementWriteOperations();
+ }
+
+ long transferred = upload.getProgress().getBytesTransferred();
+ long delta = transferred - lastBytesTransferred;
+ fs.incrementPutProgressStatistics(key, delta);
+ lastBytesTransferred = transferred;
+ }
+
+ /**
+ * Method to invoke after upload has completed.
+ * This can handle race conditions in setup/teardown.
+ * @return the number of bytes which were transferred after the notification
+ */
+ public long uploadCompleted() {
+ long delta = upload.getProgress().getBytesTransferred() -
+ lastBytesTransferred;
+ if (delta > 0) {
+ LOG.debug("S3A write delta changed after finished: {} bytes", delta);
+ fs.incrementPutProgressStatistics(key, delta);
+ }
+ return delta;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 61a83d4..7a985c6 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -35,10 +35,8 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
@@ -54,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
/**
* Upload files/parts asap directly from a memory buffer (instead of buffering
@@ -77,8 +76,6 @@ public class S3AFastOutputStream extends OutputStream {
private final int multiPartThreshold;
private final S3AFileSystem fs;
private final CannedAccessControlList cannedACL;
- private final FileSystem.Statistics statistics;
- private final String serverSideEncryptionAlgorithm;
private final ProgressListener progressListener;
private final ListeningExecutorService executorService;
private MultiPartUpload multiPartUpload;
@@ -98,28 +95,28 @@ public class S3AFastOutputStream extends OutputStream {
* @param bucket S3 bucket name
* @param key S3 key name
* @param progress report progress in order to prevent timeouts
- * @param statistics track FileSystem.Statistics on the performed operations
* @param cannedACL used CannedAccessControlList
- * @param serverSideEncryptionAlgorithm algorithm for server side encryption
* @param partSize size of a single part in a multi-part upload (except
* last part)
* @param multiPartThreshold files at least this size use multi-part upload
* @param threadPoolExecutor thread factory
* @throws IOException on any problem
*/
- public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
- String bucket, String key, Progressable progress,
- FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
- String serverSideEncryptionAlgorithm, long partSize,
- long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
+ public S3AFastOutputStream(AmazonS3Client client,
+ S3AFileSystem fs,
+ String bucket,
+ String key,
+ Progressable progress,
+ CannedAccessControlList cannedACL,
+ long partSize,
+ long multiPartThreshold,
+ ThreadPoolExecutor threadPoolExecutor)
throws IOException {
this.bucket = bucket;
this.key = key;
this.client = client;
this.fs = fs;
this.cannedACL = cannedACL;
- this.statistics = statistics;
- this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
//Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
if (partSize > Integer.MAX_VALUE) {
this.partSize = Integer.MAX_VALUE;
@@ -246,16 +243,17 @@ public class S3AFastOutputStream extends OutputStream {
if (multiPartUpload == null) {
putObject();
} else {
- if (buffer.size() > 0) {
+ int size = buffer.size();
+ if (size > 0) {
+ fs.incrementPutStartStatistics(size);
//send last part
multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
- .toByteArray()), buffer.size());
+ .toByteArray()), size);
}
final List<PartETag> partETags = multiPartUpload
.waitForAllPartUploads();
multiPartUpload.complete(partETags);
}
- statistics.incrementWriteOps(1);
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
@@ -265,18 +263,19 @@ public class S3AFastOutputStream extends OutputStream {
}
}
+ /**
+ * Create the default metadata for a multipart upload operation.
+ * @return the metadata to use/extend.
+ */
private ObjectMetadata createDefaultMetadata() {
- ObjectMetadata om = new ObjectMetadata();
- if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
- om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
- }
- return om;
+ return fs.newObjectMetadata();
}
private MultiPartUpload initiateMultiPartUpload() throws IOException {
- final ObjectMetadata om = createDefaultMetadata();
final InitiateMultipartUploadRequest initiateMPURequest =
- new InitiateMultipartUploadRequest(bucket, key, om);
+ new InitiateMultipartUploadRequest(bucket,
+ key,
+ createDefaultMetadata());
initiateMPURequest.setCannedACL(cannedACL);
try {
return new MultiPartUpload(
@@ -290,15 +289,18 @@ public class S3AFastOutputStream extends OutputStream {
LOG.debug("Executing regular upload for bucket '{}' key '{}'",
bucket, key);
final ObjectMetadata om = createDefaultMetadata();
- om.setContentLength(buffer.size());
- final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
- new ByteArrayInputStream(buffer.toByteArray()), om);
- putObjectRequest.setCannedAcl(cannedACL);
+ final int size = buffer.size();
+ om.setContentLength(size);
+ final PutObjectRequest putObjectRequest =
+ fs.newPutObjectRequest(key,
+ om,
+ new ByteArrayInputStream(buffer.toByteArray()));
putObjectRequest.setGeneralProgressListener(progressListener);
ListenableFuture<PutObjectResult> putObjectResult =
executorService.submit(new Callable<PutObjectResult>() {
@Override
public PutObjectResult call() throws Exception {
+ fs.incrementPutStartStatistics(size);
return client.putObject(putObjectRequest);
}
});
@@ -306,7 +308,7 @@ public class S3AFastOutputStream extends OutputStream {
try {
putObjectResult.get();
} catch (InterruptedException ie) {
- LOG.warn("Interrupted object upload:" + ie, ie);
+ LOG.warn("Interrupted object upload: {}", ie, ie);
Thread.currentThread().interrupt();
} catch (ExecutionException ee) {
throw extractException("regular upload", key, ee);
@@ -339,7 +341,7 @@ public class S3AFastOutputStream extends OutputStream {
public PartETag call() throws Exception {
LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
uploadId);
- return client.uploadPart(request).getPartETag();
+ return fs.uploadPart(request).getPartETag();
}
});
partETagsFutures.add(partETagFuture);
@@ -349,7 +351,7 @@ public class S3AFastOutputStream extends OutputStream {
try {
return Futures.allAsList(partETagsFutures).get();
} catch (InterruptedException ie) {
- LOG.warn("Interrupted partUpload:" + ie, ie);
+ LOG.warn("Interrupted partUpload: {}", ie, ie);
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException ee) {
@@ -382,11 +384,12 @@ public class S3AFastOutputStream extends OutputStream {
public void abort() {
LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
try {
+ fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
key, uploadId));
} catch (Exception e2) {
LOG.warn("Unable to abort multipart upload, you may need to purge " +
- "uploaded parts: " + e2, e2);
+ "uploaded parts: {}", e2, e2);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
index 9ecca33..75a6500 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -93,4 +93,11 @@ public class S3AFileStatus extends FileStatus {
return super.getModificationTime();
}
}
+
+ @Override
+ public String toString() {
+ return super.toString() +
+ String.format(" isEmptyDirectory=%s", isEmptyDirectory());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index a4c0c25..d392d8e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -45,7 +45,6 @@ import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.DeleteObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
@@ -53,6 +52,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
@@ -71,8 +72,13 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.util.Progressable;
@@ -80,6 +86,7 @@ import org.apache.hadoop.util.VersionInfo;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,6 +125,7 @@ public class S3AFileSystem extends FileSystem {
private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm;
private S3AInstrumentation instrumentation;
+ private S3AStorageStatistics storageStatistics;
private long readAhead;
// The maximum number of entries that can be deleted in any call to s3
@@ -237,6 +245,15 @@ public class S3AFileSystem extends FileSystem {
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
+ storageStatistics = (S3AStorageStatistics)
+ GlobalStorageStatistics.INSTANCE
+ .put(S3AStorageStatistics.NAME,
+ new GlobalStorageStatistics.StorageStatisticsProvider() {
+ @Override
+ public StorageStatistics provide() {
+ return new S3AStorageStatistics();
+ }
+ });
int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
@@ -346,6 +363,14 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Get S3A Instrumentation. For test purposes.
+ * @return this instance's instrumentation.
+ */
+ public S3AInstrumentation getInstrumentation() {
+ return instrumentation;
+ }
+
+ /**
* Initializes the User-Agent header to send in HTTP requests to the S3
* back-end. We always include the Hadoop version number. The user also may
* set an optional custom prefix to put in front of the Hadoop version number.
@@ -621,23 +646,26 @@ public class S3AFileSystem extends FileSystem {
}
instrumentation.fileCreated();
if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
- return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
- key, progress, statistics, cannedACL,
- serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
- threadPoolExecutor), statistics);
+ return new FSDataOutputStream(
+ new S3AFastOutputStream(s3,
+ this,
+ bucket,
+ key,
+ progress,
+ cannedACL,
+ partSize,
+ multiPartThreshold,
+ threadPoolExecutor),
+ statistics);
}
// We pass null to FSDataOutputStream so it won't count writes that
// are being buffered to a file
return new FSDataOutputStream(
new S3AOutputStream(getConf(),
- transfers,
this,
- bucket,
key,
- progress,
- cannedACL,
- statistics,
- serverSideEncryptionAlgorithm),
+ progress
+ ),
null);
}
@@ -693,6 +721,7 @@ public class S3AFileSystem extends FileSystem {
private boolean innerRename(Path src, Path dst) throws IOException,
AmazonClientException {
LOG.debug("Rename path {} to {}", src, dst);
+ incrementStatistic(INVOCATION_RENAME);
String srcKey = pathToKey(src);
String dstKey = pathToKey(dst);
@@ -793,8 +822,7 @@ public class S3AFileSystem extends FileSystem {
request.setPrefix(srcKey);
request.setMaxKeys(maxKeys);
- ObjectListing objects = s3.listObjects(request);
- statistics.incrementReadOps(1);
+ ObjectListing objects = listObjects(request);
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
@@ -808,8 +836,7 @@ public class S3AFileSystem extends FileSystem {
}
if (objects.isTruncated()) {
- objects = s3.listNextBatchOfObjects(objects);
- statistics.incrementReadOps(1);
+ objects = continueListObjects(objects);
} else {
if (!keysToDelete.isEmpty()) {
removeKeys(keysToDelete, false);
@@ -838,17 +865,223 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Increment a statistic by 1.
+ * @param statistic The operation to increment
+ */
+ protected void incrementStatistic(Statistic statistic) {
+ incrementStatistic(statistic, 1);
+ }
+
+ /**
+ * Increment a statistic by a specific value.
+ * @param statistic The operation to increment
+ * @param count the count to increment
+ */
+ protected void incrementStatistic(Statistic statistic, long count) {
+ instrumentation.incrementCounter(statistic, count);
+ storageStatistics.incrementCounter(statistic, count);
+ }
+
+ /**
* Request object metadata; increments counters in the process.
* @param key key
* @return the metadata
*/
- private ObjectMetadata getObjectMetadata(String key) {
+ protected ObjectMetadata getObjectMetadata(String key) {
+ incrementStatistic(OBJECT_METADATA_REQUESTS);
ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
- statistics.incrementReadOps(1);
+ incrementReadOperations();
return meta;
}
/**
+ * Initiate a {@code listObjects} operation, incrementing metrics
+ * in the process.
+ * @param request request to initiate
+ * @return the results
+ */
+ protected ObjectListing listObjects(ListObjectsRequest request) {
+ incrementStatistic(OBJECT_LIST_REQUESTS);
+ incrementReadOperations();
+ return s3.listObjects(request);
+ }
+
+ /**
+ * List the next set of objects.
+ * @param objects paged result
+ * @return the next result object
+ */
+ protected ObjectListing continueListObjects(ObjectListing objects) {
+ incrementStatistic(OBJECT_LIST_REQUESTS);
+ incrementReadOperations();
+ return s3.listNextBatchOfObjects(objects);
+ }
+
+ /**
+ * Increment read operations.
+ */
+ public void incrementReadOperations() {
+ statistics.incrementReadOps(1);
+ }
+
+ /**
+ * Increment the write operation counter.
+ * This is somewhat inaccurate, as it appears to be invoked more
+ * often than needed in progress callbacks.
+ */
+ public void incrementWriteOperations() {
+ statistics.incrementWriteOps(1);
+ }
+
+ /**
+ * Delete an object.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+ * operation statistics.
+ * @param key key to blob to delete.
+ */
+ private void deleteObject(String key) {
+ incrementWriteOperations();
+ incrementStatistic(OBJECT_DELETE_REQUESTS);
+ s3.deleteObject(bucket, key);
+ }
+
+ /**
+ * Perform a bulk object delete operation.
+ * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+ * operation statistics.
+ * @param deleteRequest keys to delete on the s3-backend
+ */
+ private void deleteObjects(DeleteObjectsRequest deleteRequest) {
+ incrementWriteOperations();
+ incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
+ s3.deleteObjects(deleteRequest);
+ }
+
+ /**
+ * Create a putObject request.
+ * Adds the ACL and metadata
+ * @param key key of object
+ * @param metadata metadata header
+ * @param srcfile source file
+ * @return the request
+ */
+ public PutObjectRequest newPutObjectRequest(String key,
+ ObjectMetadata metadata, File srcfile) {
+ PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+ srcfile);
+ putObjectRequest.setCannedAcl(cannedACL);
+ putObjectRequest.setMetadata(metadata);
+ return putObjectRequest;
+ }
+
+ /**
+ * Create a {@link PutObjectRequest} request.
+ * The metadata is assumed to have been configured with the size of the
+ * operation.
+ * @param key key of object
+ * @param metadata metadata header
+ * @param inputStream source data.
+ * @return the request
+ */
+ PutObjectRequest newPutObjectRequest(String key,
+ ObjectMetadata metadata, InputStream inputStream) {
+ PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+ inputStream, metadata);
+ putObjectRequest.setCannedAcl(cannedACL);
+ return putObjectRequest;
+ }
+
+ /**
+ * Create a new object metadata instance.
+ * Any standard metadata headers are added here, for example:
+ * encryption.
+ * @return a new metadata instance
+ */
+ public ObjectMetadata newObjectMetadata() {
+ final ObjectMetadata om = new ObjectMetadata();
+ if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+ om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
+ }
+ return om;
+ }
+
+ /**
+ * Create a new object metadata instance.
+ * Any standard metadata headers are added here, for example:
+ * encryption.
+ *
+ * @param length length of data to set in header.
+ * @return a new metadata instance
+ */
+ public ObjectMetadata newObjectMetadata(long length) {
+ final ObjectMetadata om = newObjectMetadata();
+ om.setContentLength(length);
+ return om;
+ }
+
+ /**
+ * PUT an object, incrementing the put requests and put bytes
+ * counters.
+ * It does not update the other counters,
+ * as existing code does that as progress callbacks come in.
+ * Byte length is calculated from the file length, or, if there is no
+ * file, from the content length of the header.
+ * @param putObjectRequest the request
+ * @return the upload initiated
+ */
+ public Upload putObject(PutObjectRequest putObjectRequest) {
+ long len;
+ if (putObjectRequest.getFile() != null) {
+ len = putObjectRequest.getFile().length();
+ } else {
+ len = putObjectRequest.getMetadata().getContentLength();
+ }
+ incrementPutStartStatistics(len);
+ return transfers.upload(putObjectRequest);
+ }
+
+ /**
+ * Upload part of a multi-partition file.
+ * Increments the write and put counters
+ * @param request request
+ * @return the result of the operation.
+ */
+ public UploadPartResult uploadPart(UploadPartRequest request) {
+ incrementPutStartStatistics(request.getPartSize());
+ return s3.uploadPart(request);
+ }
+
+ /**
+ * At the start of a put/multipart upload operation, update the
+ * relevant counters.
+ *
+ * @param bytes bytes in the request.
+ */
+ public void incrementPutStartStatistics(long bytes) {
+ LOG.debug("PUT start {} bytes", bytes);
+ incrementWriteOperations();
+ incrementStatistic(OBJECT_PUT_REQUESTS);
+ if (bytes > 0) {
+ incrementStatistic(OBJECT_PUT_BYTES, bytes);
+ }
+ }
+
+ /**
+ * Callback for use in progress callbacks from put/multipart upload events.
+ * Increments those statistics which are expected to be updated during
+ * the ongoing upload operation.
+ * @param key key to file that is being written (for logging)
+ * @param bytes bytes successfully uploaded.
+ */
+ public void incrementPutProgressStatistics(String key, long bytes) {
+ LOG.debug("PUT {}: {} bytes", key, bytes);
+ incrementWriteOperations();
+ if (bytes > 0) {
+ statistics.incrementBytesWritten(bytes);
+ }
+ }
+
+ /**
* A helper method to delete a list of keys on a s3-backend.
*
* @param keysToDelete collection of keys to delete on the s3-backend
@@ -858,21 +1091,13 @@ public class S3AFileSystem extends FileSystem {
private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean clearKeys) throws AmazonClientException {
if (enableMultiObjectsDelete) {
- DeleteObjectsRequest deleteRequest
- = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
- s3.deleteObjects(deleteRequest);
+ deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete));
instrumentation.fileDeleted(keysToDelete.size());
- statistics.incrementWriteOps(1);
} else {
- int writeops = 0;
-
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
- s3.deleteObject(
- new DeleteObjectRequest(bucket, keyVersion.getKey()));
- writeops++;
+ deleteObject(keyVersion.getKey());
}
instrumentation.fileDeleted(keysToDelete.size());
- statistics.incrementWriteOps(writeops);
}
if (clearKeys) {
keysToDelete.clear();
@@ -942,9 +1167,8 @@ public class S3AFileSystem extends FileSystem {
if (status.isEmptyDirectory()) {
LOG.debug("Deleting fake empty directory {}", key);
- s3.deleteObject(bucket, key);
+ deleteObject(key);
instrumentation.directoryDeleted();
- statistics.incrementWriteOps(1);
} else {
LOG.debug("Getting objects for directory prefix {} to delete", key);
@@ -955,9 +1179,9 @@ public class S3AFileSystem extends FileSystem {
//request.setDelimiter("/");
request.setMaxKeys(maxKeys);
- List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>();
- ObjectListing objects = s3.listObjects(request);
- statistics.incrementReadOps(1);
+ ObjectListing objects = listObjects(request);
+ List<DeleteObjectsRequest.KeyVersion> keys =
+ new ArrayList<>(objects.getObjectSummaries().size());
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
@@ -969,8 +1193,7 @@ public class S3AFileSystem extends FileSystem {
}
if (objects.isTruncated()) {
- objects = s3.listNextBatchOfObjects(objects);
- statistics.incrementReadOps(1);
+ objects = continueListObjects(objects);
} else {
if (!keys.isEmpty()) {
removeKeys(keys, false);
@@ -981,13 +1204,11 @@ public class S3AFileSystem extends FileSystem {
}
} else {
LOG.debug("delete: Path is a file");
- s3.deleteObject(bucket, key);
instrumentation.fileDeleted(1);
- statistics.incrementWriteOps(1);
+ deleteObject(key);
}
createFakeDirectoryIfNecessary(f.getParent());
-
return true;
}
@@ -996,7 +1217,7 @@ public class S3AFileSystem extends FileSystem {
String key = pathToKey(f);
if (!key.isEmpty() && !exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
- createFakeDirectory(bucket, key);
+ createFakeDirectory(key);
}
}
@@ -1032,6 +1253,7 @@ public class S3AFileSystem extends FileSystem {
IOException, AmazonClientException {
String key = pathToKey(f);
LOG.debug("List status for path: {}", f);
+ incrementStatistic(INVOCATION_LIST_STATUS);
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(f);
@@ -1049,8 +1271,7 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("listStatus: doing listObjects for directory {}", key);
- ObjectListing objects = s3.listObjects(request);
- statistics.incrementReadOps(1);
+ ObjectListing objects = listObjects(request);
Path fQualified = f.makeQualified(uri, workingDir);
@@ -1061,33 +1282,25 @@ public class S3AFileSystem extends FileSystem {
if (keyPath.equals(fQualified) ||
summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
LOG.debug("Ignoring: {}", keyPath);
- continue;
- }
-
- if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
- result.add(new S3AFileStatus(true, true, keyPath));
- LOG.debug("Adding: fd: {}", keyPath);
} else {
- result.add(new S3AFileStatus(summary.getSize(),
- dateToLong(summary.getLastModified()), keyPath,
- getDefaultBlockSize(fQualified)));
- LOG.debug("Adding: fi: {}", keyPath);
+ S3AFileStatus status = createFileStatus(keyPath, summary,
+ getDefaultBlockSize(keyPath));
+ result.add(status);
+ LOG.debug("Adding: {}", status);
}
}
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
- if (keyPath.equals(f)) {
- continue;
+ if (!keyPath.equals(f)) {
+ result.add(new S3AFileStatus(true, false, keyPath));
+ LOG.debug("Adding: rd: {}", keyPath);
}
- result.add(new S3AFileStatus(true, false, keyPath));
- LOG.debug("Adding: rd: {}", keyPath);
}
if (objects.isTruncated()) {
LOG.debug("listStatus: list truncated - getting next batch");
- objects = s3.listNextBatchOfObjects(objects);
- statistics.incrementReadOps(1);
+ objects = continueListObjects(objects);
} else {
break;
}
@@ -1100,8 +1313,6 @@ public class S3AFileSystem extends FileSystem {
return result.toArray(new FileStatus[result.size()]);
}
-
-
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
@@ -1123,7 +1334,7 @@ public class S3AFileSystem extends FileSystem {
/**
*
* Make the given path and all non-existent parents into
- * directories. Has the semantics of Unix @{code 'mkdir -p'}.
+ * directories. Has the semantics of Unix {@code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error.
* @param path path to create
* @param permission to apply to f
@@ -1158,7 +1369,7 @@ public class S3AFileSystem extends FileSystem {
private boolean innerMkdirs(Path f, FsPermission permission)
throws IOException, FileAlreadyExistsException, AmazonClientException {
LOG.debug("Making directory: {}", f);
-
+ incrementStatistic(INVOCATION_MKDIRS);
try {
FileStatus fileStatus = getFileStatus(f);
@@ -1187,7 +1398,7 @@ public class S3AFileSystem extends FileSystem {
} while (fPart != null);
String key = pathToKey(f);
- createFakeDirectory(bucket, key);
+ createFakeDirectory(key);
return true;
}
}
@@ -1201,12 +1412,12 @@ public class S3AFileSystem extends FileSystem {
*/
public S3AFileStatus getFileStatus(Path f) throws IOException {
String key = pathToKey(f);
+ incrementStatistic(INVOCATION_GET_FILE_STATUS);
LOG.debug("Getting path status for {} ({})", f , key);
if (!key.isEmpty()) {
try {
- ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
- statistics.incrementReadOps(1);
+ ObjectMetadata meta = getObjectMetadata(key);
if (objectRepresentsDirectory(key, meta.getContentLength())) {
LOG.debug("Found exact file: fake directory");
@@ -1231,8 +1442,7 @@ public class S3AFileSystem extends FileSystem {
if (!key.endsWith("/")) {
String newKey = key + "/";
try {
- ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
- statistics.incrementReadOps(1);
+ ObjectMetadata meta = getObjectMetadata(newKey);
if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
LOG.debug("Found file (with /): fake directory");
@@ -1265,8 +1475,7 @@ public class S3AFileSystem extends FileSystem {
request.setDelimiter("/");
request.setMaxKeys(1);
- ObjectListing objects = s3.listObjects(request);
- statistics.incrementReadOps(1);
+ ObjectListing objects = listObjects(request);
if (!objects.getCommonPrefixes().isEmpty()
|| !objects.getObjectSummaries().isEmpty()) {
@@ -1349,7 +1558,8 @@ public class S3AFileSystem extends FileSystem {
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException {
- String key = pathToKey(dst);
+ incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
+ final String key = pathToKey(dst);
if (!overwrite && exists(dst)) {
throw new FileAlreadyExistsException(dst + " already exists");
@@ -1360,35 +1570,19 @@ public class S3AFileSystem extends FileSystem {
LocalFileSystem local = getLocal(getConf());
File srcfile = local.pathToFile(src);
- final ObjectMetadata om = new ObjectMetadata();
- if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
- om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
- }
- PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
- putObjectRequest.setCannedAcl(cannedACL);
- putObjectRequest.setMetadata(om);
-
- ProgressListener progressListener = new ProgressListener() {
- public void progressChanged(ProgressEvent progressEvent) {
- switch (progressEvent.getEventType()) {
- case TRANSFER_PART_COMPLETED_EVENT:
- statistics.incrementWriteOps(1);
- break;
- default:
- break;
- }
- }
- };
-
- statistics.incrementWriteOps(1);
- Upload up = transfers.upload(putObjectRequest);
- up.addProgressListener(progressListener);
+ final ObjectMetadata om = newObjectMetadata();
+ PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
+ Upload up = putObject(putObjectRequest);
+ ProgressableProgressListener listener = new ProgressableProgressListener(
+ this, key, up, null);
+ up.addProgressListener(listener);
try {
up.waitForUploadResult();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted copying " + src
+ " to " + dst + ", cancelling");
}
+ listener.uploadCompleted();
// This will delete unnecessary fake parent directories
finishedWrite(key);
@@ -1437,7 +1631,7 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
try {
- ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
+ ObjectMetadata srcom = getObjectMetadata(srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
@@ -1451,7 +1645,7 @@ public class S3AFileSystem extends FileSystem {
public void progressChanged(ProgressEvent progressEvent) {
switch (progressEvent.getEventType()) {
case TRANSFER_PART_COMPLETED_EVENT:
- statistics.incrementWriteOps(1);
+ incrementWriteOperations();
break;
default:
break;
@@ -1463,7 +1657,7 @@ public class S3AFileSystem extends FileSystem {
copy.addProgressListener(progressListener);
try {
copy.waitForCopyResult();
- statistics.incrementWriteOps(1);
+ incrementWriteOperations();
instrumentation.filesCopied(1, size);
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted copying " + srcKey
@@ -1475,26 +1669,12 @@ public class S3AFileSystem extends FileSystem {
}
}
- private boolean objectRepresentsDirectory(final String name, final long size) {
- return !name.isEmpty()
- && name.charAt(name.length() - 1) == '/'
- && size == 0L;
- }
-
- // Handles null Dates that can be returned by AWS
- private static long dateToLong(final Date date) {
- if (date == null) {
- return 0L;
- }
-
- return date.getTime();
- }
-
/**
* Perform post-write actions.
* @param key key written to
*/
public void finishedWrite(String key) {
+ LOG.debug("Finished write to {}", key);
deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
}
@@ -1516,8 +1696,7 @@ public class S3AFileSystem extends FileSystem {
if (status.isDirectory() && status.isEmptyDirectory()) {
LOG.debug("Deleting fake directory {}/", key);
- s3.deleteObject(bucket, key + "/");
- statistics.incrementWriteOps(1);
+ deleteObject(key + "/");
}
} catch (IOException | AmazonClientException e) {
LOG.debug("While deleting key {} ", key, e);
@@ -1533,18 +1712,20 @@ public class S3AFileSystem extends FileSystem {
}
- private void createFakeDirectory(final String bucketName, final String objectName)
- throws AmazonClientException, AmazonServiceException {
+ private void createFakeDirectory(final String objectName)
+ throws AmazonClientException, AmazonServiceException,
+ InterruptedIOException {
if (!objectName.endsWith("/")) {
- createEmptyObject(bucketName, objectName + "/");
+ createEmptyObject(objectName + "/");
} else {
- createEmptyObject(bucketName, objectName);
+ createEmptyObject(objectName);
}
}
// Used to create an empty file that represents an empty directory
- private void createEmptyObject(final String bucketName, final String objectName)
- throws AmazonClientException, AmazonServiceException {
+ private void createEmptyObject(final String objectName)
+ throws AmazonClientException, AmazonServiceException,
+ InterruptedIOException {
final InputStream im = new InputStream() {
@Override
public int read() throws IOException {
@@ -1552,16 +1733,16 @@ public class S3AFileSystem extends FileSystem {
}
};
- final ObjectMetadata om = new ObjectMetadata();
- om.setContentLength(0L);
- if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
- om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
+ PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
+ newObjectMetadata(0L),
+ im);
+ Upload upload = putObject(putObjectRequest);
+ try {
+ upload.waitForUploadResult();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException("Interrupted creating " + objectName);
}
- PutObjectRequest putObjectRequest =
- new PutObjectRequest(bucketName, objectName, im, om);
- putObjectRequest.setCannedAcl(cannedACL);
- s3.putObject(putObjectRequest);
- statistics.incrementWriteOps(1);
+ incrementPutProgressStatistics(objectName, 0);
instrumentation.directoryCreated();
}
@@ -1576,10 +1757,7 @@ public class S3AFileSystem extends FileSystem {
// This approach may be too brittle, especially if
// in future there are new attributes added to ObjectMetadata
// that we do not explicitly call to set here
- ObjectMetadata ret = new ObjectMetadata();
-
- // Non null attributes
- ret.setContentLength(source.getContentLength());
+ ObjectMetadata ret = newObjectMetadata(source.getContentLength());
// Possibly null attributes
// Allowing nulls to pass breaks it during later use
@@ -1689,6 +1867,75 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Override superclass so as to add statistic collection.
+ * {@inheritDoc}
+ */
+ @Override
+ public FileStatus[] globStatus(Path pathPattern) throws IOException {
+ incrementStatistic(INVOCATION_GLOB_STATUS);
+ return super.globStatus(pathPattern);
+ }
+
+ /**
+ * Override superclass so as to add statistic collection.
+ * {@inheritDoc}
+ */
+ @Override
+ public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+ throws IOException {
+ incrementStatistic(INVOCATION_GLOB_STATUS);
+ return super.globStatus(pathPattern, filter);
+ }
+
+ /**
+ * Override superclass so as to add statistic collection.
+ * {@inheritDoc}
+ */
+ @Override
+ public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+ throws FileNotFoundException, IOException {
+ incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
+ return super.listLocatedStatus(f);
+ }
+
+ @Override
+ public RemoteIterator<LocatedFileStatus> listFiles(Path f,
+ boolean recursive) throws FileNotFoundException, IOException {
+ incrementStatistic(INVOCATION_LIST_FILES);
+ return super.listFiles(f, recursive);
+ }
+
+ /**
+ * Override superclass so as to add statistic collection.
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean exists(Path f) throws IOException {
+ incrementStatistic(INVOCATION_EXISTS);
+ return super.exists(f);
+ }
+
+ /**
+ * Override superclass so as to add statistic collection.
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isDirectory(Path f) throws IOException {
+ incrementStatistic(INVOCATION_IS_DIRECTORY);
+ return super.isDirectory(f);
+ }
+
+ /**
+ * Override superclass so as to add statistic collection.
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isFile(Path f) throws IOException {
+ incrementStatistic(INVOCATION_IS_FILE);
+ return super.isFile(f);
+ }
+
+ /**
* Get a integer option >= the minimum allowed value.
* @param conf configuration
* @param key key to look up
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 285f228..8892f0e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricStringBuilder;
@@ -26,49 +27,30 @@ import org.apache.hadoop.metrics2.lib.Interns;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableMetric;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
/**
* Instrumentation of S3a.
- * Derived from the {@code AzureFileSystemInstrumentation}
+ * Derived from the {@code AzureFileSystemInstrumentation}.
+ *
+ * Counters and metrics are generally addressed in code by their name or
+ * {@link Statistic} key. There <i>may</i> be some Statistics which do
+ * not have an entry here. To avoid attempts to access such counters failing,
+ * the operations to increment/query metric values are designed to handle
+ * lookup failures.
*/
@Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AInstrumentation {
public static final String CONTEXT = "S3AFileSystem";
-
- public static final String STREAM_OPENED = "streamOpened";
- public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations";
- public static final String STREAM_CLOSED = "streamClosed";
- public static final String STREAM_ABORTED = "streamAborted";
- public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions";
- public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations";
- public static final String STREAM_FORWARD_SEEK_OPERATIONS
- = "streamForwardSeekOperations";
- public static final String STREAM_BACKWARD_SEEK_OPERATIONS
- = "streamBackwardSeekOperations";
- public static final String STREAM_SEEK_BYTES_SKIPPED =
- "streamBytesSkippedOnSeek";
- public static final String STREAM_SEEK_BYTES_BACKWARDS =
- "streamBytesBackwardsOnSeek";
- public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead";
- public static final String STREAM_READ_OPERATIONS = "streamReadOperations";
- public static final String STREAM_READ_FULLY_OPERATIONS
- = "streamReadFullyOperations";
- public static final String STREAM_READ_OPERATIONS_INCOMPLETE
- = "streamReadOperationsIncomplete";
- public static final String FILES_CREATED = "files_created";
- public static final String FILES_COPIED = "files_copied";
- public static final String FILES_COPIED_BYTES = "files_copied_bytes";
- public static final String FILES_DELETED = "files_deleted";
- public static final String DIRECTORIES_CREATED = "directories_created";
- public static final String DIRECTORIES_DELETED = "directories_deleted";
- public static final String IGNORED_ERRORS = "ignored_errors";
private final MetricsRegistry registry =
new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
private final MutableCounterLong streamOpenOperations;
@@ -95,6 +77,27 @@ public class S3AInstrumentation {
private final MutableCounterLong numberOfDirectoriesDeleted;
private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>();
+ private static final Statistic[] COUNTERS_TO_CREATE = {
+ INVOCATION_COPY_FROM_LOCAL_FILE,
+ INVOCATION_EXISTS,
+ INVOCATION_GET_FILE_STATUS,
+ INVOCATION_GLOB_STATUS,
+ INVOCATION_IS_DIRECTORY,
+ INVOCATION_IS_FILE,
+ INVOCATION_LIST_FILES,
+ INVOCATION_LIST_LOCATED_STATUS,
+ INVOCATION_LIST_STATUS,
+ INVOCATION_MKDIRS,
+ INVOCATION_RENAME,
+ OBJECT_COPY_REQUESTS,
+ OBJECT_DELETE_REQUESTS,
+ OBJECT_LIST_REQUESTS,
+ OBJECT_METADATA_REQUESTS,
+ OBJECT_MULTIPART_UPLOAD_ABORTED,
+ OBJECT_PUT_BYTES,
+ OBJECT_PUT_REQUESTS
+ };
+
public S3AInstrumentation(URI name) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag("FileSystemId",
@@ -103,50 +106,35 @@ public class S3AInstrumentation {
registry.tag("fsURI",
"URI of this filesystem",
name.toString());
- streamOpenOperations = streamCounter(STREAM_OPENED,
- "Total count of times an input stream to object store was opened");
- streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS,
- "Total count of times an attempt to close a data stream was made");
- streamClosed = streamCounter(STREAM_CLOSED,
- "Count of times the TCP stream was closed");
- streamAborted = streamCounter(STREAM_ABORTED,
- "Count of times the TCP stream was aborted");
- streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS,
- "Number of seek operations invoked on input streams");
- streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS,
- "Number of read exceptions caught and attempted to recovered from");
- streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS,
- "Number of executed seek operations which went forward in a stream");
- streamBackwardSeekOperations = streamCounter(
- STREAM_BACKWARD_SEEK_OPERATIONS,
- "Number of executed seek operations which went backwards in a stream");
- streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED,
- "Count of bytes skipped during forward seek operations");
- streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS,
- "Count of bytes moved backwards during seek operations");
- streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ,
- "Count of bytes read during seek() in stream operations");
- streamReadOperations = streamCounter(STREAM_READ_OPERATIONS,
- "Count of read() operations in streams");
- streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS,
- "Count of readFully() operations in streams");
- streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE,
- "Count of incomplete read() operations in streams");
-
- numberOfFilesCreated = counter(FILES_CREATED,
- "Total number of files created through the object store.");
- numberOfFilesCopied = counter(FILES_COPIED,
- "Total number of files copied within the object store.");
- bytesOfFilesCopied = counter(FILES_COPIED_BYTES,
- "Total number of bytes copied within the object store.");
- numberOfFilesDeleted = counter(FILES_DELETED,
- "Total number of files deleted through from the object store.");
- numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED,
- "Total number of directories created through the object store.");
- numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED,
- "Total number of directories deleted through the object store.");
- ignoredErrors = counter(IGNORED_ERRORS,
- "Total number of errors caught and ingored.");
+ streamOpenOperations = streamCounter(STREAM_OPENED);
+ streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
+ streamClosed = streamCounter(STREAM_CLOSED);
+ streamAborted = streamCounter(STREAM_ABORTED);
+ streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS);
+ streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS);
+ streamForwardSeekOperations =
+ streamCounter(STREAM_FORWARD_SEEK_OPERATIONS);
+ streamBackwardSeekOperations =
+ streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS);
+ streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED);
+ streamBytesBackwardsOnSeek =
+ streamCounter(STREAM_SEEK_BYTES_BACKWARDS);
+ streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ);
+ streamReadOperations = streamCounter(STREAM_READ_OPERATIONS);
+ streamReadFullyOperations =
+ streamCounter(STREAM_READ_FULLY_OPERATIONS);
+ streamReadsIncomplete =
+ streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
+ numberOfFilesCreated = counter(FILES_CREATED);
+ numberOfFilesCopied = counter(FILES_COPIED);
+ bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
+ numberOfFilesDeleted = counter(FILES_DELETED);
+ numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED);
+ numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED);
+ ignoredErrors = counter(IGNORED_ERRORS);
+ for (Statistic statistic : COUNTERS_TO_CREATE) {
+ counter(statistic);
+ }
}
/**
@@ -174,6 +162,25 @@ public class S3AInstrumentation {
}
/**
+ * Create a counter in the registry.
+ * @param op statistic to count
+ * @return a new counter
+ */
+ protected final MutableCounterLong counter(Statistic op) {
+ return counter(op.getSymbol(), op.getDescription());
+ }
+
+ /**
+ * Create a counter in the stream map: these are unregistered in the public
+ * metrics.
+ * @param op statistic to count
+ * @return a new counter
+ */
+ protected final MutableCounterLong streamCounter(Statistic op) {
+ return streamCounter(op.getSymbol(), op.getDescription());
+ }
+
+ /**
* Create a gauge in the registry.
* @param name name gauge name
* @param desc description
@@ -216,6 +223,58 @@ public class S3AInstrumentation {
}
/**
+ * Get the value of a counter.
+ * @param statistic the operation
+ * @return its value, or 0 if not found.
+ */
+ public long getCounterValue(Statistic statistic) {
+ return getCounterValue(statistic.getSymbol());
+ }
+
+ /**
+ * Get the value of a counter.
+ * If the counter is null, return 0.
+ * @param name the name of the counter
+ * @return its value.
+ */
+ public long getCounterValue(String name) {
+ MutableCounterLong counter = lookupCounter(name);
+ return counter == null ? 0 : counter.value();
+ }
+
+ /**
+ * Lookup a counter by name. Return null if it is not known.
+ * @param name counter name
+ * @return the counter
+ */
+ private MutableCounterLong lookupCounter(String name) {
+ MutableMetric metric = lookupMetric(name);
+ if (metric == null) {
+ return null;
+ }
+ Preconditions.checkNotNull(metric, "not found: " + name);
+ if (!(metric instanceof MutableCounterLong)) {
+ throw new IllegalStateException("Metric " + name
+ + " is not a MutableCounterLong: " + metric);
+ }
+ return (MutableCounterLong) metric;
+ }
+
+ /**
+ * Look up a metric from both the registered set and the lighter weight
+ * stream entries.
+ * @param name metric name
+ * @return the metric or null
+ */
+ public MutableMetric lookupMetric(String name) {
+ MutableMetric metric = getRegistry().get(name);
+ if (metric == null) {
+ metric = streamMetrics.get(name);
+ }
+ return metric;
+ }
+
+ /**
* Indicate that S3A created a file.
*/
public void fileCreated() {
@@ -263,6 +322,19 @@ public class S3AInstrumentation {
}
/**
+ * Increment a specific counter.
+ * No-op if not defined.
+ * @param op operation
+ * @param count increment value
+ */
+ public void incrementCounter(Statistic op, long count) {
+ MutableCounterLong counter = lookupCounter(op.getSymbol());
+ if (counter != null) {
+ counter.incr(count);
+ }
+ }
+
+ /**
* Create a stream input statistics instance.
* @return the new instance
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 593e9e8..23ba682 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -19,19 +19,11 @@
package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
-import com.amazonaws.event.ProgressEvent;
-import com.amazonaws.event.ProgressEventType;
-import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.util.Progressable;
@@ -44,8 +36,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
-import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
-import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
@@ -59,32 +49,20 @@ public class S3AOutputStream extends OutputStream {
private File backupFile;
private boolean closed;
private String key;
- private String bucket;
- private TransferManager transfers;
private Progressable progress;
private long partSize;
private long partSizeThreshold;
private S3AFileSystem fs;
- private CannedAccessControlList cannedACL;
- private FileSystem.Statistics statistics;
private LocalDirAllocator lDirAlloc;
- private String serverSideEncryptionAlgorithm;
public static final Logger LOG = S3AFileSystem.LOG;
- public S3AOutputStream(Configuration conf, TransferManager transfers,
- S3AFileSystem fs, String bucket, String key, Progressable progress,
- CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
- String serverSideEncryptionAlgorithm)
+ public S3AOutputStream(Configuration conf,
+ S3AFileSystem fs, String key, Progressable progress)
throws IOException {
- this.bucket = bucket;
this.key = key;
- this.transfers = transfers;
this.progress = progress;
this.fs = fs;
- this.cannedACL = cannedACL;
- this.statistics = statistics;
- this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
partSize = fs.getPartitionSize();
partSizeThreshold = fs.getMultiPartThreshold();
@@ -124,30 +102,18 @@ public class S3AOutputStream extends OutputStream {
try {
- final ObjectMetadata om = new ObjectMetadata();
- if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
- om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
- }
- PutObjectRequest putObjectRequest =
- new PutObjectRequest(bucket, key, backupFile);
- putObjectRequest.setCannedAcl(cannedACL);
- putObjectRequest.setMetadata(om);
-
- Upload upload = transfers.upload(putObjectRequest);
-
- ProgressableProgressListener listener =
- new ProgressableProgressListener(upload, progress, statistics);
+ final ObjectMetadata om = fs.newObjectMetadata();
+ Upload upload = fs.putObject(
+ fs.newPutObjectRequest(
+ key,
+ om,
+ backupFile));
+ ProgressableProgressListener listener =
+ new ProgressableProgressListener(fs, key, upload, progress);
upload.addProgressListener(listener);
upload.waitForUploadResult();
-
- long delta = upload.getProgress().getBytesTransferred() -
- listener.getLastBytesTransferred();
- if (statistics != null && delta != 0) {
- LOG.debug("S3A write delta changed after finished: {} bytes", delta);
- statistics.incrementBytesWritten(delta);
- }
-
+ listener.uploadCompleted();
// This will delete unnecessary fake parent directories
fs.finishedWrite(key);
} catch (InterruptedException e) {
@@ -175,46 +141,4 @@ public class S3AOutputStream extends OutputStream {
backupStream.write(b, off, len);
}
- /**
- * Listener to progress from AWS regarding transfers.
- */
- public static class ProgressableProgressListener implements ProgressListener {
- private Progressable progress;
- private FileSystem.Statistics statistics;
- private long lastBytesTransferred;
- private Upload upload;
-
- public ProgressableProgressListener(Upload upload, Progressable progress,
- FileSystem.Statistics statistics) {
- this.upload = upload;
- this.progress = progress;
- this.statistics = statistics;
- this.lastBytesTransferred = 0;
- }
-
- public void progressChanged(ProgressEvent progressEvent) {
- if (progress != null) {
- progress.progress();
- }
-
- // There are 3 http ops here, but this should be close enough for now
- ProgressEventType pet = progressEvent.getEventType();
- if (pet == TRANSFER_PART_STARTED_EVENT ||
- pet == TRANSFER_COMPLETED_EVENT) {
- statistics.incrementWriteOps(1);
- }
-
- long transferred = upload.getProgress().getBytesTransferred();
- long delta = transferred - lastBytesTransferred;
- if (statistics != null && delta != 0) {
- statistics.incrementBytesWritten(delta);
- }
-
- lastBytesTransferred = transferred;
- }
-
- public long getLastBytesTransferred() {
- return lastBytesTransferred;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
new file mode 100644
index 0000000..f69159a
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageStatistics;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Storage statistics for S3A.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class S3AStorageStatistics extends StorageStatistics {
+ private static final Logger LOG = S3AFileSystem.LOG;
+
+ public static final String NAME = "S3AStorageStatistics";
+ private final Map<Statistic, AtomicLong> opsCount =
+ new EnumMap<>(Statistic.class);
+
+ public S3AStorageStatistics() {
+ super(NAME);
+ for (Statistic opType : Statistic.values()) {
+ opsCount.put(opType, new AtomicLong(0));
+ }
+ }
+
+ /**
+ * Increment a specific counter.
+ * @param op operation
+ * @param count increment value
+ * @return the new value
+ */
+ public long incrementCounter(Statistic op, long count) {
+ long updated = opsCount.get(op).addAndGet(count);
+ LOG.debug("{} += {} -> {}", op, count, updated);
+ return updated;
+ }
+
+ private class LongIterator implements Iterator<LongStatistic> {
+ private Iterator<Map.Entry<Statistic, AtomicLong>> iterator =
+ Collections.unmodifiableSet(opsCount.entrySet()).iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public LongStatistic next() {
+ if (!iterator.hasNext()) {
+ throw new NoSuchElementException();
+ }
+ final Map.Entry<Statistic, AtomicLong> entry = iterator.next();
+ return new LongStatistic(entry.getKey().name(), entry.getValue().get());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public Iterator<LongStatistic> getLongStatistics() {
+ return new LongIterator();
+ }
+
+ @Override
+ public Long getLong(String key) {
+ final Statistic type = Statistic.fromSymbol(key);
+ return type == null ? null : opsCount.get(type).get();
+ }
+
+ @Override
+ public boolean isTracked(String key) {
+ return Statistic.fromSymbol(key) == null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b8216c10/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 12d14e2..062fca4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
@@ -29,6 +30,7 @@ import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.AccessDeniedException;
+import java.util.Date;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -186,4 +188,50 @@ public final class S3AUtils {
}
return builder.toString();
}
+
+ /**
+ * Create a files status instance from a listing.
+ * @param keyPath path to entry
+ * @param summary summary from AWS
+ * @param blockSize block size to declare.
+ * @return a status entry
+ */
+ public static S3AFileStatus createFileStatus(Path keyPath,
+ S3ObjectSummary summary,
+ long blockSize) {
+ if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+ return new S3AFileStatus(true, true, keyPath);
+ } else {
+ return new S3AFileStatus(summary.getSize(),
+ dateToLong(summary.getLastModified()), keyPath,
+ blockSize);
+ }
+ }
+
+ /**
+ * Predicate: does the object represent a directory?.
+ * @param name object name
+ * @param size object size
+ * @return true if it meets the criteria for being an object
+ */
+ public static boolean objectRepresentsDirectory(final String name,
+ final long size) {
+ return !name.isEmpty()
+ && name.charAt(name.length() - 1) == '/'
+ && size == 0L;
+ }
+
+ /**
+ * Date to long conversion.
+ * Handles null Dates that can be returned by AWS by returning 0
+ * @param date date from AWS query
+ * @return timestamp of the object
+ */
+ public static long dateToLong(final Date date) {
+ if (date == null) {
+ return 0L;
+ }
+
+ return date.getTime();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org