You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/29 10:29:44 UTC
[7/8] hadoop git commit: HADOOP-13345 S3Guard: Improved Consistency
for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu,
Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
index b0f08e3..be08afe 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AFileStatus extends FileStatus {
- private boolean isEmptyDirectory;
+ private Tristate isEmptyDirectory;
/**
* Create a directory status.
@@ -42,6 +42,18 @@ public class S3AFileStatus extends FileStatus {
public S3AFileStatus(boolean isemptydir,
Path path,
String owner) {
+ this(Tristate.fromBool(isemptydir), path, owner);
+ }
+
+ /**
+ * Create a directory status.
+ * @param isemptydir is this an empty directory?
+ * @param path the path
+ * @param owner the owner
+ */
+ public S3AFileStatus(Tristate isemptydir,
+ Path path,
+ String owner) {
super(0, true, 1, 0, 0, path);
isEmptyDirectory = isemptydir;
setOwner(owner);
@@ -59,12 +71,37 @@ public class S3AFileStatus extends FileStatus {
public S3AFileStatus(long length, long modification_time, Path path,
long blockSize, String owner) {
super(length, false, 1, blockSize, modification_time, path);
- isEmptyDirectory = false;
+ isEmptyDirectory = Tristate.FALSE;
setOwner(owner);
setGroup(owner);
}
- public boolean isEmptyDirectory() {
+ /**
+ * Convenience constructor for creating from a vanilla FileStatus plus
+ * an isEmptyDirectory flag.
+ * @param source FileStatus to convert to S3AFileStatus
+ * @param isEmptyDirectory TRUE/FALSE if known to be / not be an empty
+ * directory, UNKNOWN if that information was not computed.
+ * @return a new S3AFileStatus
+ */
+ public static S3AFileStatus fromFileStatus(FileStatus source,
+ Tristate isEmptyDirectory) {
+ if (source.isDirectory()) {
+ return new S3AFileStatus(isEmptyDirectory, source.getPath(),
+ source.getOwner());
+ } else {
+ return new S3AFileStatus(source.getLen(), source.getModificationTime(),
+ source.getPath(), source.getBlockSize(), source.getOwner());
+ }
+ }
+
+
+ /**
+ * @return FALSE if status is not a directory, or its a dir, but known to
+ * not be empty. TRUE if it is an empty directory. UNKNOWN if it is a
+ * directory, but we have not computed whether or not it is empty.
+ */
+ public Tristate isEmptyDirectory() {
return isEmptyDirectory;
}
@@ -110,7 +147,7 @@ public class S3AFileStatus extends FileStatus {
@Override
public String toString() {
return super.toString() +
- String.format(" isEmptyDirectory=%s", isEmptyDirectory());
+ String.format(" isEmptyDirectory=%s", isEmptyDirectory().name());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9733179..1cce86a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -25,12 +25,16 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
+import java.util.Set;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -92,6 +96,11 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
@@ -149,6 +158,8 @@ public class S3AFileSystem extends FileSystem {
private long readAhead;
private S3AInputPolicy inputPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private MetadataStore metadataStore;
+ private boolean allowAuthoritative;
// The maximum number of entries that can be deleted in any call to s3
private static final int MAX_ENTRIES_TO_DELETE = 1000;
@@ -277,6 +288,10 @@ public class S3AFileSystem extends FileSystem {
} else {
LOG.debug("Using S3AOutputStream");
}
+
+ metadataStore = S3Guard.getMetadataStore(this);
+ allowAuthoritative = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
+ DEFAULT_METADATASTORE_AUTHORITATIVE);
} catch (AmazonClientException e) {
throw translateException("initializing ", new Path(name), e);
}
@@ -388,12 +403,35 @@ public class S3AFileSystem extends FileSystem {
* Returns the S3 client used by this filesystem.
* @return AmazonS3Client
*/
- @VisibleForTesting
AmazonS3 getAmazonS3Client() {
return s3;
}
/**
+ * Get the region of a bucket.
+ * @return the region in which a bucket is located
+ * @throws IOException on any failure.
+ */
+ public String getBucketLocation() throws IOException {
+ return getBucketLocation(bucket);
+ }
+
+ /**
+ * Get the region of a bucket.
+ * @param bucketName the name of the bucket
+ * @return the region in which a bucket is located
+ * @throws IOException on any failure.
+ */
+ public String getBucketLocation(String bucketName) throws IOException {
+ try {
+ return s3.getBucketLocation(bucketName);
+ } catch (AmazonClientException e) {
+ throw translateException("getBucketLocation()",
+ bucketName, e);
+ }
+ }
+
+ /**
* Returns the read ahead range value used by this filesystem
* @return
*/
@@ -457,7 +495,7 @@ public class S3AFileSystem extends FileSystem {
* @return a key excluding the leading "/", or, if it is the root path, ""
*/
@VisibleForTesting
- String pathToKey(Path path) {
+ public String pathToKey(Path path) {
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
@@ -508,7 +546,7 @@ public class S3AFileSystem extends FileSystem {
* @param path path to qualify
* @return a qualified path.
*/
- Path qualify(Path path) {
+ public Path qualify(Path path) {
return path.makeQualified(uri, workingDir);
}
@@ -578,7 +616,7 @@ public class S3AFileSystem extends FileSystem {
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
String key = pathToKey(f);
- S3AFileStatus status = null;
+ FileStatus status = null;
try {
// get the status or throw an FNFE
status = getFileStatus(f);
@@ -705,8 +743,8 @@ public class S3AFileSystem extends FileSystem {
* the description of the operation.
* This operation throws an exception on any failure which needs to be
* reported and downgraded to a failure. That is: if a rename
- * @param src path to be renamed
- * @param dst new path after rename
+ * @param source path to be renamed
+ * @param dest new path after rename
* @throws RenameFailedException if some criteria for a state changing
* rename was not met. This means work didn't happen; it's not something
* which is reported upstream to the FileSystem APIs, for which the semantics
@@ -715,9 +753,12 @@ public class S3AFileSystem extends FileSystem {
* @throws IOException on IO failure.
* @throws AmazonClientException on failures inside the AWS SDK
*/
- private boolean innerRename(Path src, Path dst)
+ private boolean innerRename(Path source, Path dest)
throws RenameFailedException, FileNotFoundException, IOException,
AmazonClientException {
+ Path src = qualify(source);
+ Path dst = qualify(dest);
+
LOG.debug("Rename path {} to {}", src, dst);
incrementStatistic(INVOCATION_RENAME);
@@ -733,7 +774,7 @@ public class S3AFileSystem extends FileSystem {
// get the source file status; this raises a FNFE if there is no source
// file.
- S3AFileStatus srcStatus = getFileStatus(src);
+ S3AFileStatus srcStatus = innerGetFileStatus(src, true);
if (srcKey.equals(dstKey)) {
LOG.debug("rename: src and dest refer to the same file or directory: {}",
@@ -745,7 +786,7 @@ public class S3AFileSystem extends FileSystem {
S3AFileStatus dstStatus = null;
try {
- dstStatus = getFileStatus(dst);
+ dstStatus = innerGetFileStatus(dst, true);
// if there is no destination entry, an exception is raised.
// hence this code sequence can assume that there is something
// at the end of the path; the only detail being what it is and
@@ -755,7 +796,7 @@ public class S3AFileSystem extends FileSystem {
throw new RenameFailedException(src, dst,
"source is a directory and dest is a file")
.withExitCode(srcStatus.isFile());
- } else if (!dstStatus.isEmptyDirectory()) {
+ } else if (dstStatus.isEmptyDirectory() != Tristate.TRUE) {
throw new RenameFailedException(src, dst,
"Destination is a non-empty directory")
.withExitCode(false);
@@ -777,7 +818,8 @@ public class S3AFileSystem extends FileSystem {
Path parent = dst.getParent();
if (!pathToKey(parent).isEmpty()) {
try {
- S3AFileStatus dstParentStatus = getFileStatus(dst.getParent());
+ S3AFileStatus dstParentStatus = innerGetFileStatus(dst.getParent(),
+ false);
if (!dstParentStatus.isDirectory()) {
throw new RenameFailedException(src, dst,
"destination parent is not a directory");
@@ -789,9 +831,20 @@ public class S3AFileSystem extends FileSystem {
}
}
+ // If we have a MetadataStore, track deletions/creations.
+ Collection<Path> srcPaths = null;
+ List<PathMetadata> dstMetas = null;
+ if (hasMetadataStore()) {
+ srcPaths = new HashSet<>(); // srcPaths need fast look up before put
+ dstMetas = new ArrayList<>();
+ }
+ // TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
+ // TODO S3Guard: performance: mark destination dirs as authoritative
+
// Ok! Time to start
if (srcStatus.isFile()) {
LOG.debug("rename: renaming file {} to {}", src, dst);
+ long length = srcStatus.getLen();
if (dstStatus != null && dstStatus.isDirectory()) {
String newDstKey = dstKey;
if (!newDstKey.endsWith("/")) {
@@ -800,9 +853,14 @@ public class S3AFileSystem extends FileSystem {
String filename =
srcKey.substring(pathToKey(src.getParent()).length()+1);
newDstKey = newDstKey + filename;
- copyFile(srcKey, newDstKey, srcStatus.getLen());
+ copyFile(srcKey, newDstKey, length);
+ S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
+ keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst),
+ username);
} else {
copyFile(srcKey, dstKey, srcStatus.getLen());
+ S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, dst,
+ length, getDefaultBlockSize(dst), username);
}
innerDelete(srcStatus, false);
} else {
@@ -824,42 +882,66 @@ public class S3AFileSystem extends FileSystem {
}
List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
- if (dstStatus != null && dstStatus.isEmptyDirectory()) {
+ if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) {
// delete unnecessary fake directory.
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
}
- ListObjectsRequest request = new ListObjectsRequest();
- request.setBucketName(bucket);
- request.setPrefix(srcKey);
- request.setMaxKeys(maxKeys);
-
- ObjectListing objects = listObjects(request);
-
- while (true) {
- for (S3ObjectSummary summary : objects.getObjectSummaries()) {
- keysToDelete.add(
- new DeleteObjectsRequest.KeyVersion(summary.getKey()));
- String newDstKey =
- dstKey + summary.getKey().substring(srcKey.length());
- copyFile(summary.getKey(), newDstKey, summary.getSize());
-
- if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
- removeKeys(keysToDelete, true, false);
+ Path parentPath = keyToPath(srcKey);
+ RemoteIterator<LocatedFileStatus> iterator = listFilesAndEmptyDirectories(
+ parentPath, true);
+ while (iterator.hasNext()) {
+ LocatedFileStatus status = iterator.next();
+ long length = status.getLen();
+ String key = pathToKey(status.getPath());
+ if (status.isDirectory() && !key.endsWith("/")) {
+ key += "/";
+ }
+ keysToDelete
+ .add(new DeleteObjectsRequest.KeyVersion(key));
+ String newDstKey =
+ dstKey + key.substring(srcKey.length());
+ copyFile(key, newDstKey, length);
+
+ if (hasMetadataStore()) {
+ // with a metadata store, the object entries need to be updated,
+ // including, potentially, the ancestors
+ Path childSrc = keyToQualifiedPath(key);
+ Path childDst = keyToQualifiedPath(newDstKey);
+ if (objectRepresentsDirectory(key, length)) {
+ S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
+ childDst, username);
+ } else {
+ S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc,
+ childDst, length, getDefaultBlockSize(childDst), username);
}
+ // Ancestor directories may not be listed, so we explicitly add them
+ S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
+ keyToQualifiedPath(srcKey), childSrc, childDst, username);
}
- if (objects.isTruncated()) {
- objects = continueListObjects(objects);
- } else {
- if (!keysToDelete.isEmpty()) {
- removeKeys(keysToDelete, false, false);
- }
- break;
+ if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
+ removeKeys(keysToDelete, true, false);
}
}
+ if (!keysToDelete.isEmpty()) {
+ removeKeys(keysToDelete, false, false);
+ }
+
+ // We moved all the children, now move the top-level dir
+ // Empty directory should have been added as the object summary
+ if (hasMetadataStore()
+ && srcPaths != null
+ && !srcPaths.contains(src)) {
+ LOG.debug("To move the non-empty top-level dir src={} and dst={}",
+ src, dst);
+ S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, src, dst,
+ username);
+ }
}
+ metadataStore.move(srcPaths, dstMetas);
+
if (src.getParent() != dst.getParent()) {
deleteUnnecessaryFakeDirectories(dst.getParent());
createFakeDirectoryIfNecessary(src.getParent());
@@ -879,6 +961,31 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Does this Filesystem have a metadata store?
+ * @return true iff the FS has been instantiated with a metadata store
+ */
+ public boolean hasMetadataStore() {
+ return !S3Guard.isNullMetadataStore(metadataStore);
+ }
+
+ /**
+ * Get the metadata store.
+ * This will always be non-null, but may be bound to the
+ * {@code NullMetadataStore}.
+ * @return the metadata store of this FS instance
+ */
+ @VisibleForTesting
+ MetadataStore getMetadataStore() {
+ return metadataStore;
+ }
+
+ /** For testing only. See ITestS3GuardEmptyDirs. */
+ @VisibleForTesting
+ void setMetadataStore(MetadataStore ms) {
+ metadataStore = ms;
+ }
+
+ /**
* Increment a statistic by 1.
* @param statistic The operation to increment
*/
@@ -1062,8 +1169,9 @@ public class S3AFileSystem extends FileSystem {
* @param inputStream source data.
* @return the request
*/
- private PutObjectRequest newPutObjectRequest(String key,
- ObjectMetadata metadata, InputStream inputStream) {
+ PutObjectRequest newPutObjectRequest(String key,
+ ObjectMetadata metadata,
+ InputStream inputStream) {
Preconditions.checkNotNull(inputStream);
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata);
@@ -1114,7 +1222,7 @@ public class S3AFileSystem extends FileSystem {
* @param putObjectRequest the request
* @return the upload initiated
*/
- public Upload putObject(PutObjectRequest putObjectRequest) {
+ public UploadInfo putObject(PutObjectRequest putObjectRequest) {
long len;
if (putObjectRequest.getFile() != null) {
len = putObjectRequest.getFile().length();
@@ -1125,7 +1233,7 @@ public class S3AFileSystem extends FileSystem {
try {
Upload upload = transfers.upload(putObjectRequest);
incrementPutCompletedStatistics(true, len);
- return upload;
+ return new UploadInfo(upload, len);
} catch (AmazonClientException e) {
incrementPutCompletedStatistics(false, len);
throw e;
@@ -1141,14 +1249,10 @@ public class S3AFileSystem extends FileSystem {
* @return the upload initiated
* @throws AmazonClientException on problems
*/
- public PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
+ PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest)
throws AmazonClientException {
- long len;
- if (putObjectRequest.getFile() != null) {
- len = putObjectRequest.getFile().length();
- } else {
- len = putObjectRequest.getMetadata().getContentLength();
- }
+ long len = getPutRequestLength(putObjectRequest);
+ LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
incrementPutStartStatistics(len);
try {
PutObjectResult result = s3.putObject(putObjectRequest);
@@ -1161,6 +1265,23 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Get the length of the PUT, verifying that the length is known.
+ * @param putObjectRequest a request bound to a file or a stream.
+ * @return the request length
+ * @throws IllegalArgumentException if the length is negative
+ */
+ private long getPutRequestLength(PutObjectRequest putObjectRequest) {
+ long len;
+ if (putObjectRequest.getFile() != null) {
+ len = putObjectRequest.getFile().length();
+ } else {
+ len = putObjectRequest.getMetadata().getContentLength();
+ }
+ Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length");
+ return len;
+ }
+
+ /**
* Upload part of a multi-partition file.
* Increments the write and put counters.
* <i>Important: this call does not close any input stream in the request.</i>
@@ -1287,7 +1408,7 @@ public class S3AFileSystem extends FileSystem {
*/
public boolean delete(Path f, boolean recursive) throws IOException {
try {
- return innerDelete(getFileStatus(f), recursive);
+ return innerDelete(innerGetFileStatus(f, true), recursive);
} catch (FileNotFoundException e) {
LOG.debug("Couldn't delete {} - does not exist", f);
instrumentation.errorIgnored();
@@ -1317,6 +1438,9 @@ public class S3AFileSystem extends FileSystem {
if (status.isDirectory()) {
LOG.debug("delete: Path is a directory: {}", f);
+ Preconditions.checkArgument(
+ status.isEmptyDirectory() != Tristate.UNKNOWN,
+ "File status must have directory emptiness computed");
if (!key.endsWith("/")) {
key = key + "/";
@@ -1326,13 +1450,15 @@ public class S3AFileSystem extends FileSystem {
return rejectRootDirectoryDelete(status, recursive);
}
- if (!recursive && !status.isEmptyDirectory()) {
+ if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) {
throw new PathIsNotEmptyDirectoryException(f.toString());
}
- if (status.isEmptyDirectory()) {
+ if (status.isEmptyDirectory() == Tristate.TRUE) {
LOG.debug("Deleting fake empty directory {}", key);
+ // HADOOP-13761 S3Guard: retries here
deleteObject(key);
+ metadataStore.delete(f);
instrumentation.directoryDeleted();
} else {
LOG.debug("Getting objects for directory prefix {} to delete", key);
@@ -1348,6 +1474,7 @@ public class S3AFileSystem extends FileSystem {
LOG.debug("Got object to delete {}", summary.getKey());
if (keys.size() == MAX_ENTRIES_TO_DELETE) {
+ // TODO: HADOOP-13761 S3Guard: retries
removeKeys(keys, true, false);
}
}
@@ -1356,16 +1483,19 @@ public class S3AFileSystem extends FileSystem {
objects = continueListObjects(objects);
} else {
if (!keys.isEmpty()) {
+ // TODO: HADOOP-13761 S3Guard: retries
removeKeys(keys, false, false);
}
break;
}
}
}
+ metadataStore.deleteSubtree(f);
} else {
LOG.debug("delete: Path is a file");
instrumentation.fileDeleted(1);
deleteObject(key);
+ metadataStore.delete(f);
}
Path parent = f.getParent();
@@ -1389,7 +1519,7 @@ public class S3AFileSystem extends FileSystem {
private boolean rejectRootDirectoryDelete(S3AFileStatus status,
boolean recursive) throws IOException {
LOG.info("s3a delete the {} root directory of {}", bucket, recursive);
- boolean emptyRoot = status.isEmptyDirectory();
+ boolean emptyRoot = status.isEmptyDirectory() == Tristate.TRUE;
if (emptyRoot) {
return true;
}
@@ -1404,7 +1534,7 @@ public class S3AFileSystem extends FileSystem {
private void createFakeDirectoryIfNecessary(Path f)
throws IOException, AmazonClientException {
String key = pathToKey(f);
- if (!key.isEmpty() && !exists(f)) {
+ if (!key.isEmpty() && !s3Exists(f)) {
LOG.debug("Creating new fake directory at {}", f);
createFakeDirectory(key);
}
@@ -1453,6 +1583,11 @@ public class S3AFileSystem extends FileSystem {
key = key + '/';
}
+ DirListingMetadata dirMeta = metadataStore.listChildren(path);
+ if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
+ return S3Guard.dirMetaToStatuses(dirMeta);
+ }
+
ListObjectsRequest request = createListObjectsRequest(key, "/");
LOG.debug("listStatus: doing listObjects for directory {}", key);
@@ -1465,7 +1600,8 @@ public class S3AFileSystem extends FileSystem {
while (files.hasNext()) {
result.add(files.next());
}
- return result.toArray(new FileStatus[result.size()]);
+ return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
+ allowAuthoritative);
} else {
LOG.debug("Adding: rd (not a dir): {}", path);
FileStatus[] stats = new FileStatus[1];
@@ -1481,7 +1617,8 @@ public class S3AFileSystem extends FileSystem {
* @param delimiter any delimiter
* @return the request
*/
- private ListObjectsRequest createListObjectsRequest(String key,
+ @VisibleForTesting
+ ListObjectsRequest createListObjectsRequest(String key,
String delimiter) {
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
@@ -1540,23 +1677,30 @@ public class S3AFileSystem extends FileSystem {
throw translateException("innerMkdirs", path, e);
}
}
+
/**
*
* Make the given path and all non-existent parents into
* directories.
* See {@link #mkdirs(Path, FsPermission)}
- * @param f path to create
+ * @param p path to create
* @param permission to apply to f
- * @return true if a directory was created
+ * @return true if a directory was created or already existed
* @throws FileAlreadyExistsException there is a file at the path specified
* @throws IOException other IO problems
* @throws AmazonClientException on failures inside the AWS SDK
*/
- private boolean innerMkdirs(Path f, FsPermission permission)
+ private boolean innerMkdirs(Path p, FsPermission permission)
throws IOException, FileAlreadyExistsException, AmazonClientException {
+ Path f = qualify(p);
LOG.debug("Making directory: {}", f);
incrementStatistic(INVOCATION_MKDIRS);
FileStatus fileStatus;
+ List<Path> metadataStoreDirs = null;
+ if (hasMetadataStore()) {
+ metadataStoreDirs = new ArrayList<>();
+ }
+
try {
fileStatus = getFileStatus(f);
@@ -1566,8 +1710,12 @@ public class S3AFileSystem extends FileSystem {
throw new FileAlreadyExistsException("Path is a file: " + f);
}
} catch (FileNotFoundException e) {
+ // Walk path to root, ensuring closest ancestor is a directory, not file
Path fPart = f.getParent();
- do {
+ if (metadataStoreDirs != null) {
+ metadataStoreDirs.add(f);
+ }
+ while (fPart != null) {
try {
fileStatus = getFileStatus(fPart);
if (fileStatus.isDirectory()) {
@@ -1580,12 +1728,17 @@ public class S3AFileSystem extends FileSystem {
}
} catch (FileNotFoundException fnfe) {
instrumentation.errorIgnored();
+ // We create all missing directories in MetadataStore; it does not
+ // infer directories exist by prefix like S3.
+ if (metadataStoreDirs != null) {
+ metadataStoreDirs.add(fPart);
+ }
}
fPart = fPart.getParent();
- } while (fPart != null);
-
+ }
String key = pathToKey(f);
createFakeDirectory(key);
+ S3Guard.makeDirsOrdered(metadataStore, metadataStoreDirs, username, true);
// this is complicated because getParent(a/b/c/) returns a/b/c, but
// we want a/b. See HADOOP-14428 for more details.
deleteUnnecessaryFakeDirectories(new Path(f.toString()).getParent());
@@ -1597,21 +1750,93 @@ public class S3AFileSystem extends FileSystem {
* Return a file status object that represents the path.
* @param f The path we want information from
* @return a FileStatus object
- * @throws java.io.FileNotFoundException when the path does not exist;
+ * @throws FileNotFoundException when the path does not exist
* @throws IOException on other problems.
*/
- public S3AFileStatus getFileStatus(final Path f) throws IOException {
+ public FileStatus getFileStatus(final Path f) throws IOException {
+ return innerGetFileStatus(f, false);
+ }
+
+ /**
+ * Internal version of {@link #getFileStatus(Path)}.
+ * @param f The path we want information from
+ * @param needEmptyDirectoryFlag if true, implementation will calculate
+ * a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
+ * @return a S3AFileStatus object
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException on other problems.
+ */
+ @VisibleForTesting
+ S3AFileStatus innerGetFileStatus(final Path f,
+ boolean needEmptyDirectoryFlag) throws IOException {
incrementStatistic(INVOCATION_GET_FILE_STATUS);
final Path path = qualify(f);
String key = pathToKey(path);
- LOG.debug("Getting path status for {} ({})", path , key);
+ LOG.debug("Getting path status for {} ({})", path, key);
+
+ // Check MetadataStore, if any.
+ PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
+ Set<Path> tombstones = Collections.EMPTY_SET;
+ if (pm != null) {
+ if (pm.isDeleted()) {
+ throw new FileNotFoundException("Path " + f + " is recorded as " +
+ "deleted by S3Guard");
+ }
+
+ FileStatus msStatus = pm.getFileStatus();
+ if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
+ if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
+ // We have a definitive true / false from MetadataStore, we are done.
+ return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
+ } else {
+ DirListingMetadata children = metadataStore.listChildren(path);
+ if (children != null) {
+ tombstones = children.listTombstones();
+ }
+ LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
+ }
+ } else {
+ // Either this is not a directory, or we don't care if it is empty
+ return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
+ }
+
+ // If the metadata store has no children for it and it's not listed in
+ // S3 yet, we'll assume the empty directory is true;
+ S3AFileStatus s3FileStatus;
+ try {
+ s3FileStatus = s3GetFileStatus(path, key, tombstones);
+ } catch (FileNotFoundException e) {
+ return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE);
+ }
+ // entry was found, save in S3Guard
+ return S3Guard.putAndReturn(metadataStore, s3FileStatus, instrumentation);
+ } else {
+ // there was no entry in S3Guard
+ // retrieve the data and update the metadata store in the process.
+ return S3Guard.putAndReturn(metadataStore,
+ s3GetFileStatus(path, key, tombstones), instrumentation);
+ }
+ }
+
+ /**
+ * Raw {@code getFileStatus} that talks direct to S3.
+ * Used to implement {@link #innerGetFileStatus(Path, boolean)},
+ * and for direct management of empty directory blobs.
+ * @param path Qualified path
+ * @param key Key string for the path
+ * @return Status
+ * @throws FileNotFoundException when the path does not exist
+ * @throws IOException on other problems.
+ */
+ private S3AFileStatus s3GetFileStatus(final Path path, String key,
+ Set<Path> tombstones) throws IOException {
if (!key.isEmpty()) {
try {
ObjectMetadata meta = getObjectMetadata(key);
if (objectRepresentsDirectory(key, meta.getContentLength())) {
LOG.debug("Found exact file: fake directory");
- return new S3AFileStatus(true, path, username);
+ return new S3AFileStatus(Tristate.TRUE, path, username);
} else {
LOG.debug("Found exact file: normal file");
return new S3AFileStatus(meta.getContentLength(),
@@ -1636,16 +1861,16 @@ public class S3AFileSystem extends FileSystem {
if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
LOG.debug("Found file (with /): fake directory");
- return new S3AFileStatus(true, path, username);
+ return new S3AFileStatus(Tristate.TRUE, path, username);
} else {
LOG.warn("Found file (with /): real file? should not happen: {}",
key);
return new S3AFileStatus(meta.getContentLength(),
- dateToLong(meta.getLastModified()),
- path,
- getDefaultBlockSize(path),
- username);
+ dateToLong(meta.getLastModified()),
+ path,
+ getDefaultBlockSize(path),
+ username);
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
@@ -1667,25 +1892,26 @@ public class S3AFileSystem extends FileSystem {
ObjectListing objects = listObjects(request);
- if (!objects.getCommonPrefixes().isEmpty()
- || !objects.getObjectSummaries().isEmpty()) {
+ Collection<String> prefixes = objects.getCommonPrefixes();
+ Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
+ if (!isEmptyOfKeys(prefixes, tombstones) ||
+ !isEmptyOfObjects(summaries, tombstones)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found path as directory (with /): {}/{}",
- objects.getCommonPrefixes().size() ,
- objects.getObjectSummaries().size());
+ prefixes.size(), summaries.size());
- for (S3ObjectSummary summary : objects.getObjectSummaries()) {
+ for (S3ObjectSummary summary : summaries) {
LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
}
- for (String prefix : objects.getCommonPrefixes()) {
+ for (String prefix : prefixes) {
LOG.debug("Prefix: {}", prefix);
}
}
- return new S3AFileStatus(false, path, username);
+ return new S3AFileStatus(Tristate.FALSE, path, username);
} else if (key.isEmpty()) {
LOG.debug("Found root directory");
- return new S3AFileStatus(true, path, username);
+ return new S3AFileStatus(Tristate.TRUE, path, username);
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != 404) {
@@ -1700,6 +1926,64 @@ public class S3AFileSystem extends FileSystem {
}
/**
+ * Helper function to determine if a collection of paths is empty
+ * after accounting for tombstone markers (if provided).
+ * @param keys Collection of path (prefixes / directories or keys).
+ * @param tombstones Set of tombstone markers, or null if not applicable.
+ * @return false if summaries contains objects not accounted for by
+ * tombstones.
+ */
+ private boolean isEmptyOfKeys(Collection<String> keys, Set<Path>
+ tombstones) {
+ if (tombstones == null) {
+ return keys.isEmpty();
+ }
+ for (String key : keys) {
+ Path qualified = keyToQualifiedPath(key);
+ if (!tombstones.contains(qualified)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Helper function to determine if a collection of object summaries is empty
+ * after accounting for tombstone markers (if provided).
+ * @param summaries Collection of objects as returned by listObjects.
+ * @param tombstones Set of tombstone markers, or null if not applicable.
+ * @return false if summaries contains objects not accounted for by
+ * tombstones.
+ */
+ private boolean isEmptyOfObjects(Collection<S3ObjectSummary> summaries,
+ Set<Path> tombstones) {
+ if (tombstones == null) {
+ return summaries.isEmpty();
+ }
+ Collection<String> stringCollection = new ArrayList<>(summaries.size());
+ for (S3ObjectSummary summary : summaries) {
+ stringCollection.add(summary.getKey());
+ }
+ return isEmptyOfKeys(stringCollection, tombstones);
+ }
+
+ /**
+ * Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
+ * S3Guard MetadataStore, if any, will be skipped.
+ * @return true if path exists in S3
+ */
+ private boolean s3Exists(final Path f) throws IOException {
+ Path path = qualify(f);
+ String key = pathToKey(path);
+ try {
+ s3GetFileStatus(path, key, null);
+ return true;
+ } catch (FileNotFoundException e) {
+ return false;
+ }
+ }
+
+ /**
* The src file is on the local disk. Add it to FS at
* the given dst name.
*
@@ -1761,12 +2045,13 @@ public class S3AFileSystem extends FileSystem {
final ObjectMetadata om = newObjectMetadata(srcfile.length());
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
- Upload up = putObject(putObjectRequest);
+ UploadInfo info = putObject(putObjectRequest);
+ Upload upload = info.getUpload();
ProgressableProgressListener listener = new ProgressableProgressListener(
- this, key, up, null);
- up.addProgressListener(listener);
+ this, key, upload, null);
+ upload.addProgressListener(listener);
try {
- up.waitForUploadResult();
+ upload.waitForUploadResult();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted copying " + src
+ " to " + dst + ", cancelling");
@@ -1774,7 +2059,7 @@ public class S3AFileSystem extends FileSystem {
listener.uploadCompleted();
// This will delete unnecessary fake parent directories
- finishedWrite(key);
+ finishedWrite(key, info.getLength());
if (delSrc) {
local.delete(src, false);
@@ -1798,6 +2083,10 @@ public class S3AFileSystem extends FileSystem {
transfers.shutdownNow(true);
transfers = null;
}
+ if (metadataStore != null) {
+ metadataStore.close();
+ metadataStore = null;
+ }
}
}
@@ -1940,11 +2229,38 @@ public class S3AFileSystem extends FileSystem {
/**
* Perform post-write actions.
+ * This operation MUST be called after any PUT/multipart PUT completes
+ * successfully.
+ * This includes
+ * <ol>
+ * <li>Calling {@link #deleteUnnecessaryFakeDirectories(Path)}</li>
+ * <li>Updating any metadata store with details on the newly created
+ * object.</li>
+ * </ol>
* @param key key written to
+ * @param length total length of file written
*/
- public void finishedWrite(String key) {
- LOG.debug("Finished write to {}", key);
- deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
+ @InterfaceAudience.Private
+ void finishedWrite(String key, long length) {
+ LOG.debug("Finished write to {}, len {}", key, length);
+ Path p = keyToQualifiedPath(key);
+ deleteUnnecessaryFakeDirectories(p.getParent());
+ Preconditions.checkArgument(length >= 0, "content length is negative");
+
+ // See note about failure semantics in S3Guard documentation
+ try {
+ if (hasMetadataStore()) {
+ S3Guard.addAncestors(metadataStore, p, username);
+ S3AFileStatus status = createUploadFileStatus(p,
+ S3AUtils.objectRepresentsDirectory(key, length), length,
+ getDefaultBlockSize(p), username);
+ S3Guard.putAndReturn(metadataStore, status, instrumentation);
+ }
+ } catch (IOException e) {
+ LOG.error("S3Guard: Error updating MetadataStore for write to {}:",
+ key, e);
+ instrumentation.errorIgnored();
+ }
}
/**
@@ -1999,9 +2315,9 @@ public class S3AFileSystem extends FileSystem {
PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
newObjectMetadata(0L),
im);
- Upload upload = putObject(putObjectRequest);
+ UploadInfo info = putObject(putObjectRequest);
try {
- upload.waitForUploadResult();
+ info.getUpload().waitForUploadResult();
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted creating " + objectName);
}
@@ -2107,6 +2423,8 @@ public class S3AFileSystem extends FileSystem {
if (blockFactory != null) {
sb.append(", blockFactory=").append(blockFactory);
}
+ sb.append(", metastore=").append(metadataStore);
+ sb.append(", authoritative=").append(allowAuthoritative);
sb.append(", boundedExecutor=").append(boundedThreadPool);
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", statistics {")
@@ -2223,6 +2541,18 @@ public class S3AFileSystem extends FileSystem {
@Override
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
boolean recursive) throws FileNotFoundException, IOException {
+ return innerListFiles(f, recursive,
+ new Listing.AcceptFilesOnly(qualify(f)));
+ }
+
+ public RemoteIterator<LocatedFileStatus> listFilesAndEmptyDirectories(Path f,
+ boolean recursive) throws IOException {
+ return innerListFiles(f, recursive,
+ new Listing.AcceptAllButS3nDirs());
+ }
+
+ private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
+ recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
incrementStatistic(INVOCATION_LIST_FILES);
Path path = qualify(f);
LOG.debug("listFiles({}, {})", path, recursive);
@@ -2240,13 +2570,42 @@ public class S3AFileSystem extends FileSystem {
String delimiter = recursive ? null : "/";
LOG.debug("Requesting all entries under {} with delimiter '{}'",
key, delimiter);
- return listing.createLocatedFileStatusIterator(
- listing.createFileStatusListingIterator(path,
- createListObjectsRequest(key, delimiter),
- ACCEPT_ALL,
- new Listing.AcceptFilesOnly(path)));
+ final RemoteIterator<FileStatus> cachedFilesIterator;
+ final Set<Path> tombstones;
+ if (recursive) {
+ final PathMetadata pm = metadataStore.get(path, true);
+ // shouldn't need to check pm.isDeleted() because that will have
+ // been caught by getFileStatus above.
+ MetadataStoreListFilesIterator metadataStoreListFilesIterator =
+ new MetadataStoreListFilesIterator(metadataStore, pm,
+ allowAuthoritative);
+ tombstones = metadataStoreListFilesIterator.listTombstones();
+ cachedFilesIterator = metadataStoreListFilesIterator;
+ } else {
+ DirListingMetadata meta = metadataStore.listChildren(path);
+ if (meta != null) {
+ tombstones = meta.listTombstones();
+ } else {
+ tombstones = null;
+ }
+ cachedFilesIterator = listing.createProvidedFileStatusIterator(
+ S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
+ if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
+ // metadata listing is authoritative, so return it directly
+ return listing.createLocatedFileStatusIterator(cachedFilesIterator);
+ }
+ }
+ return listing.createTombstoneReconcilingIterator(
+ listing.createLocatedFileStatusIterator(
+ listing.createFileStatusListingIterator(path,
+ createListObjectsRequest(key, delimiter),
+ ACCEPT_ALL,
+ acceptor,
+ cachedFilesIterator)),
+ tombstones);
}
} catch (AmazonClientException e) {
+ // TODO S3Guard: retry on file not found exception
throw translateException("listFiles", path, e);
}
}
@@ -2291,12 +2650,21 @@ public class S3AFileSystem extends FileSystem {
filter.accept(path) ? toLocatedFileStatus(fileStatus) : null);
} else {
// directory: trigger a lookup
- String key = maybeAddTrailingSlash(pathToKey(path));
- return listing.createLocatedFileStatusIterator(
- listing.createFileStatusListingIterator(path,
- createListObjectsRequest(key, "/"),
- filter,
- new Listing.AcceptAllButSelfAndS3nDirs(path)));
+ final String key = maybeAddTrailingSlash(pathToKey(path));
+ final Listing.FileStatusAcceptor acceptor =
+ new Listing.AcceptAllButSelfAndS3nDirs(path);
+ DirListingMetadata meta = metadataStore.listChildren(path);
+ final RemoteIterator<FileStatus> cachedFileStatusIterator =
+ listing.createProvidedFileStatusIterator(
+ S3Guard.dirMetaToStatuses(meta), filter, acceptor);
+ return (allowAuthoritative && meta != null && meta.isAuthoritative())
+ ? listing.createLocatedFileStatusIterator(cachedFileStatusIterator)
+ : listing.createLocatedFileStatusIterator(
+ listing.createFileStatusListingIterator(path,
+ createListObjectsRequest(key, "/"),
+ filter,
+ acceptor,
+ cachedFileStatusIterator));
}
} catch (AmazonClientException e) {
throw translateException("listLocatedStatus", path, e);
@@ -2371,8 +2739,8 @@ public class S3AFileSystem extends FileSystem {
/**
* Callback on a successful write.
*/
- void writeSuccessful() {
- finishedWrite(key);
+ void writeSuccessful(long length) {
+ finishedWrite(key, length);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index d2e7a88..da1fc5a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.metrics2.MetricStringBuilder;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.Interns;
@@ -30,6 +31,7 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import java.io.Closeable;
import java.net.URI;
@@ -38,7 +40,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.fs.FileSystem.Statistics;
import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -90,6 +91,10 @@ public class S3AInstrumentation {
private final Map<String, MutableCounterLong> streamMetrics =
new HashMap<>(30);
+ /** Instantiate this without caring whether or not S3Guard is enabled. */
+ private final S3GuardInstrumentation s3GuardInstrumentation
+ = new S3GuardInstrumentation();
+
private static final Statistic[] COUNTERS_TO_CREATE = {
INVOCATION_COPY_FROM_LOCAL_FILE,
INVOCATION_EXISTS,
@@ -117,6 +122,8 @@ public class S3AInstrumentation {
STREAM_WRITE_BLOCK_UPLOADS_ABORTED,
STREAM_WRITE_TOTAL_TIME,
STREAM_WRITE_TOTAL_DATA,
+ S3GUARD_METADATASTORE_PUT_PATH_REQUEST,
+ S3GUARD_METADATASTORE_INITIALIZATION
};
@@ -171,6 +178,9 @@ public class S3AInstrumentation {
for (Statistic statistic : GAUGES_TO_CREATE) {
gauge(statistic.getSymbol(), statistic.getDescription());
}
+ //todo need a config for the quantiles interval?
+ quantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY,
+ "ops", "latency", 1);
}
/**
@@ -227,6 +237,22 @@ public class S3AInstrumentation {
}
/**
+ * Create a quantiles in the registry.
+ * @param op statistic to collect
+ * @param sampleName sample name of the quantiles
+ * @param valueName value name of the quantiles
+ * @param interval interval of the quantiles in seconds
+ * @return the created quantiles metric
+ */
+ protected final MutableQuantiles quantiles(Statistic op,
+ String sampleName,
+ String valueName,
+ int interval) {
+ return registry.newQuantiles(op.getSymbol(), op.getDescription(),
+ sampleName, valueName, interval);
+ }
+
+ /**
* Get the metrics registry.
* @return the registry
*/
@@ -311,6 +337,20 @@ public class S3AInstrumentation {
}
/**
+ * Look up a quantiles.
+ * @param name quantiles name
+ * @return the quantiles or null
+ * @throws ClassCastException if the metric is not a Quantiles.
+ */
+ public MutableQuantiles lookupQuantiles(String name) {
+ MutableMetric metric = lookupMetric(name);
+ if (metric == null) {
+ LOG.debug("No quantiles {}", name);
+ }
+ return (MutableQuantiles) metric;
+ }
+
+ /**
* Look up a metric from both the registered set and the lighter weight
* stream entries.
* @param name metric name
@@ -391,6 +431,21 @@ public class S3AInstrumentation {
counter.incr(count);
}
}
+
+ /**
+ * Add a value to a quantiles statistic. No-op if the quantile
+ * isn't found.
+ * @param op operation to look up.
+ * @param value value to add.
+ * @throws ClassCastException if the metric is not a Quantiles.
+ */
+ public void addValueToQuantiles(Statistic op, long value) {
+ MutableQuantiles quantiles = lookupQuantiles(op.getSymbol());
+ if (quantiles != null) {
+ quantiles.add(value);
+ }
+ }
+
/**
* Increment a specific counter.
* No-op if not defined.
@@ -442,6 +497,15 @@ public class S3AInstrumentation {
}
/**
+ * Create a S3Guard instrumentation instance.
+ * There's likely to be at most one instance of this per FS instance.
+ * @return the S3Guard instrumentation point.
+ */
+ public S3GuardInstrumentation getS3GuardInstrumentation() {
+ return s3GuardInstrumentation;
+ }
+
+ /**
* Merge in the statistics of a single input stream into
* the filesystem-wide statistics.
* @param statistics stream statistics
@@ -840,4 +904,19 @@ public class S3AInstrumentation {
return sb.toString();
}
}
+
+ /**
+ * Instrumentation exported to S3Guard.
+ */
+ public final class S3GuardInstrumentation {
+
+ /** Initialized event. */
+ public void initialized() {
+ incrementCounter(S3GUARD_METADATASTORE_INITIALIZATION, 1);
+ }
+
+ public void storeClosed() {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index 6ebc9e4..e723b75 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.transfer.Upload;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -101,19 +100,20 @@ public class S3AOutputStream extends OutputStream {
try {
final ObjectMetadata om = fs.newObjectMetadata(backupFile.length());
- Upload upload = fs.putObject(
+ UploadInfo info = fs.putObject(
fs.newPutObjectRequest(
key,
om,
backupFile));
ProgressableProgressListener listener =
- new ProgressableProgressListener(fs, key, upload, progress);
- upload.addProgressListener(listener);
+ new ProgressableProgressListener(fs, key, info.getUpload(), progress);
+ info.getUpload().addProgressListener(listener);
- upload.waitForUploadResult();
+ info.getUpload().waitForUploadResult();
listener.uploadCompleted();
- // This will delete unnecessary fake parent directories
- fs.finishedWrite(key);
+ // This will delete unnecessary fake parent directories, update any
+ // MetadataStore
+ fs.finishedWrite(key, info.getLength());
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index d57a0c6..ae9dd79 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -294,12 +294,38 @@ public final class S3AUtils {
S3ObjectSummary summary,
long blockSize,
String owner) {
- if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
- return new S3AFileStatus(true, keyPath, owner);
+ long size = summary.getSize();
+ return createFileStatus(keyPath,
+ objectRepresentsDirectory(summary.getKey(), size),
+ size, summary.getLastModified(), blockSize, owner);
+ }
+
+ /**
+ * Create a file status for object we just uploaded. For files, we use
+ * current time as modification time, since s3a uses S3's service-based
+ * modification time, which will not be available until we do a
+ * getFileStatus() later on.
+ * @param keyPath path for created object
+ * @param isDir true iff directory
+ * @param size file length
+ * @param blockSize block size for file status
+ * @param owner Hadoop username
+ * @return a status entry
+ */
+ public static S3AFileStatus createUploadFileStatus(Path keyPath,
+ boolean isDir, long size, long blockSize, String owner) {
+ Date date = isDir ? null : new Date();
+ return createFileStatus(keyPath, isDir, size, date, blockSize, owner);
+ }
+
+ /* Date 'modified' is ignored when isDir is true. */
+ private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir,
+ long size, Date modified, long blockSize, String owner) {
+ if (isDir) {
+ return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner);
} else {
- return new S3AFileStatus(summary.getSize(),
- dateToLong(summary.getLastModified()), keyPath,
- blockSize, owner);
+ return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize,
+ owner);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 7ccdc06..e7603d9 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -18,33 +18,20 @@
package org.apache.hadoop.fs.s3a;
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-
import java.io.IOException;
import java.net.URI;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.VersionInfo;
-
-import org.slf4j.Logger;
/**
- * Factory for creation of S3 client instances to be used by {@link S3Store}.
+ * Factory for creation of {@link AmazonS3} client instances.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-interface S3ClientFactory {
+public interface S3ClientFactory {
/**
* Creates a new {@link AmazonS3} client. This method accepts the S3A file
@@ -57,182 +44,4 @@ interface S3ClientFactory {
*/
AmazonS3 createS3Client(URI name) throws IOException;
- /**
- * The default factory implementation, which calls the AWS SDK to configure
- * and create an {@link AmazonS3Client} that communicates with the S3 service.
- */
- static class DefaultS3ClientFactory extends Configured
- implements S3ClientFactory {
-
- private static final Logger LOG = S3AFileSystem.LOG;
-
- @Override
- public AmazonS3 createS3Client(URI name) throws IOException {
- Configuration conf = getConf();
- AWSCredentialsProvider credentials =
- createAWSCredentialProviderSet(name, conf);
- ClientConfiguration awsConf = new ClientConfiguration();
- initConnectionSettings(conf, awsConf);
- initProxySupport(conf, awsConf);
- initUserAgent(conf, awsConf);
- return createAmazonS3Client(conf, credentials, awsConf);
- }
-
- /**
- * Initializes all AWS SDK settings related to connection management.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- */
- private static void initConnectionSettings(Configuration conf,
- ClientConfiguration awsConf) {
- awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
- DEFAULT_MAXIMUM_CONNECTIONS, 1));
- boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
- DEFAULT_SECURE_CONNECTIONS);
- awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
- awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
- DEFAULT_MAX_ERROR_RETRIES, 0));
- awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
- DEFAULT_ESTABLISH_TIMEOUT, 0));
- awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
- DEFAULT_SOCKET_TIMEOUT, 0));
- int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
- DEFAULT_SOCKET_SEND_BUFFER, 2048);
- int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
- DEFAULT_SOCKET_RECV_BUFFER, 2048);
- awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
- String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
- if (!signerOverride.isEmpty()) {
- LOG.debug("Signer override = {}", signerOverride);
- awsConf.setSignerOverride(signerOverride);
- }
- }
-
- /**
- * Initializes AWS SDK proxy support if configured.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- * @throws IllegalArgumentException if misconfigured
- */
- private static void initProxySupport(Configuration conf,
- ClientConfiguration awsConf)
- throws IllegalArgumentException, IOException {
- String proxyHost = conf.getTrimmed(PROXY_HOST, "");
- int proxyPort = conf.getInt(PROXY_PORT, -1);
- if (!proxyHost.isEmpty()) {
- awsConf.setProxyHost(proxyHost);
- if (proxyPort >= 0) {
- awsConf.setProxyPort(proxyPort);
- } else {
- if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
- LOG.warn("Proxy host set without port. Using HTTPS default 443");
- awsConf.setProxyPort(443);
- } else {
- LOG.warn("Proxy host set without port. Using HTTP default 80");
- awsConf.setProxyPort(80);
- }
- }
- String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
- String proxyPassword = null;
- char[] proxyPass = conf.getPassword(PROXY_PASSWORD);
- if (proxyPass != null) {
- proxyPassword = new String(proxyPass).trim();
- }
- if ((proxyUsername == null) != (proxyPassword == null)) {
- String msg = "Proxy error: " + PROXY_USERNAME + " or " +
- PROXY_PASSWORD + " set without the other.";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- awsConf.setProxyUsername(proxyUsername);
- awsConf.setProxyPassword(proxyPassword);
- awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
- awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using proxy server {}:{} as user {} on " +
- "domain {} as workstation {}", awsConf.getProxyHost(),
- awsConf.getProxyPort(),
- String.valueOf(awsConf.getProxyUsername()),
- awsConf.getProxyDomain(),
- awsConf.getProxyWorkstation());
- }
- } else if (proxyPort >= 0) {
- String msg =
- "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- }
-
- /**
- * Initializes the User-Agent header to send in HTTP requests to the S3
- * back-end. We always include the Hadoop version number. The user also
- * may set an optional custom prefix to put in front of the Hadoop version
- * number. The AWS SDK interally appends its own information, which seems
- * to include the AWS SDK version, OS and JVM version.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- */
- private static void initUserAgent(Configuration conf,
- ClientConfiguration awsConf) {
- String userAgent = "Hadoop " + VersionInfo.getVersion();
- String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
- if (!userAgentPrefix.isEmpty()) {
- userAgent = userAgentPrefix + ", " + userAgent;
- }
- LOG.debug("Using User-Agent: {}", userAgent);
- awsConf.setUserAgentPrefix(userAgent);
- }
-
- /**
- * Creates an {@link AmazonS3Client} from the established configuration.
- *
- * @param conf Hadoop configuration
- * @param credentials AWS credentials
- * @param awsConf AWS SDK configuration
- * @return S3 client
- * @throws IllegalArgumentException if misconfigured
- */
- private static AmazonS3 createAmazonS3Client(Configuration conf,
- AWSCredentialsProvider credentials, ClientConfiguration awsConf)
- throws IllegalArgumentException {
- AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
- String endPoint = conf.getTrimmed(ENDPOINT, "");
- if (!endPoint.isEmpty()) {
- try {
- s3.setEndpoint(endPoint);
- } catch (IllegalArgumentException e) {
- String msg = "Incorrect endpoint: " + e.getMessage();
- LOG.error(msg);
- throw new IllegalArgumentException(msg, e);
- }
- }
- enablePathStyleAccessIfRequired(s3, conf);
- return s3;
- }
-
- /**
- * Enables path-style access to S3 buckets if configured. By default, the
- * behavior is to use virtual hosted-style access with URIs of the form
- * http://bucketname.s3.amazonaws.com. Enabling path-style access and a
- * region-specific endpoint switches the behavior to use URIs of the form
- * http://s3-eu-west-1.amazonaws.com/bucketname.
- *
- * @param s3 S3 client
- * @param conf Hadoop configuration
- */
- private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
- Configuration conf) {
- final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
- if (pathStyleAccess) {
- LOG.debug("Enabling path style access!");
- s3.setS3ClientOptions(S3ClientOptions.builder()
- .setPathStyleAccess(true)
- .build());
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 789c6d7..777c161 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -140,7 +140,18 @@ public enum Statistic {
STREAM_WRITE_TOTAL_DATA("stream_write_total_data",
"Count of total data uploaded in block output"),
STREAM_WRITE_QUEUE_DURATION("stream_write_queue_duration",
- "Total queue duration of all block uploads");
+ "Total queue duration of all block uploads"),
+
+ // S3Guard stats
+ S3GUARD_METADATASTORE_PUT_PATH_REQUEST(
+ "s3guard_metadatastore_put_path_request",
+ "s3guard metadata store put one metadata path request"),
+ S3GUARD_METADATASTORE_PUT_PATH_LATENCY(
+ "s3guard_metadatastore_put_path_latency",
+ "s3guard metadata store put one metadata path lantency"),
+ S3GUARD_METADATASTORE_INITIALIZATION("s3guard_metadatastore_initialization",
+ "s3guard metadata store initialization times");
+
private static final Map<String, Statistic> SYMBOL_MAP =
new HashMap<>(Statistic.values().length);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java
new file mode 100644
index 0000000..0462ccf
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Simple enum to express {true, false, don't know}.
+ */
+public enum Tristate {
+ // Do not add additional values here. Logic will assume there are exactly
+ // three possibilities.
+ TRUE, FALSE, UNKNOWN;
+
+ public static Tristate fromBool(boolean v) {
+ return v ? TRUE : FALSE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java
new file mode 100644
index 0000000..238cd97
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/UploadInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.transfer.Upload;
+
+/**
+ * Simple struct that contains information about a S3 upload.
+ */
+public class UploadInfo {
+ private final Upload upload;
+ private final long length;
+
+ public UploadInfo(Upload upload, long length) {
+ this.upload = upload;
+ this.length = length;
+ }
+
+ public Upload getUpload() {
+ return upload;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
new file mode 100644
index 0000000..dcee358
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * {@code DescendantsIterator} is a {@link RemoteIterator} that implements
+ * pre-ordering breadth-first traversal (BFS) of a path and all of its
+ * descendants recursively. After visiting each path, that path's direct
+ * children are discovered by calling {@link MetadataStore#listChildren(Path)}.
+ * Each iteration returns the next direct child, and if that child is a
+ * directory, also pushes it onto a queue to discover its children later.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * /dir1
+ * |-- dir2
+ * | |-- file1
+ * | `-- file2
+ * `-- dir3
+ * |-- dir4
+ * | `-- file3
+ * |-- dir5
+ * | `-- file4
+ * `-- dir6
+ * </pre>
+ *
+ * Consider this code sample:
+ * <pre>
+ * final PathMetadata dir1 = get(new Path("/dir1"));
+ * for (DescendantsIterator descendants = new DescendantsIterator(dir1);
+ * descendants.hasNext(); ) {
+ * final FileStatus status = descendants.next().getFileStatus();
+ * System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F',
+ * status.getPath());
+ * }
+ * </pre>
+ *
+ * The output is:
+ * <pre>
+ * D /dir1
+ * D /dir1/dir2
+ * D /dir1/dir3
+ * F /dir1/dir2/file1
+ * F /dir1/dir2/file2
+ * D /dir1/dir3/dir4
+ * D /dir1/dir3/dir5
+ * F /dir1/dir3/dir4/file3
+ * F /dir1/dir3/dir5/file4
+ * D /dir1/dir3/dir6
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DescendantsIterator implements RemoteIterator<FileStatus> {
+
+ private final MetadataStore metadataStore;
+ private final Queue<PathMetadata> queue = new LinkedList<>();
+
+ /**
+ * Creates a new {@code DescendantsIterator}.
+ *
+ * @param ms the associated {@link MetadataStore}
+ * @param meta base path for descendants iteration, which will be the first
+ * returned during iteration (except root). Null makes empty iterator.
+ * @throws IOException if errors happen during metadata store listing
+ */
+ public DescendantsIterator(MetadataStore ms, PathMetadata meta)
+ throws IOException {
+ Preconditions.checkNotNull(ms);
+ this.metadataStore = ms;
+
+ if (meta != null) {
+ final Path path = meta.getFileStatus().getPath();
+ if (path.isRoot()) {
+ DirListingMetadata rootListing = ms.listChildren(path);
+ if (rootListing != null) {
+ rootListing = rootListing.withoutTombstones();
+ queue.addAll(rootListing.getListing());
+ }
+ } else {
+ queue.add(meta);
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more descendants.");
+ }
+ PathMetadata next;
+ next = queue.poll();
+ if (next.getFileStatus().isDirectory()) {
+ final Path path = next.getFileStatus().getPath();
+ DirListingMetadata meta = metadataStore.listChildren(path);
+ if (meta != null) {
+ Collection<PathMetadata> more = meta.withoutTombstones().getListing();
+ if (!more.isEmpty()) {
+ queue.addAll(more);
+ }
+ }
+ }
+ return next.getFileStatus();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
new file mode 100644
index 0000000..e5b4fb5
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Tristate;
+
+/**
+ * {@code DirListingMetadata} models a directory listing stored in a
+ * {@link MetadataStore}. Instances of this class are mutable and thread-safe.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DirListingMetadata {
+
+ /**
+ * Convenience parameter for passing into constructor.
+ */
+ public static final Collection<PathMetadata> EMPTY_DIR =
+ Collections.emptyList();
+
+ private final Path path;
+
+ /** Using a map for fast find / remove with large directories. */
+ private Map<Path, PathMetadata> listMap = new ConcurrentHashMap<>();
+
+ private boolean isAuthoritative;
+
+ /**
+ * Create a directory listing metadata container.
+ *
+ * @param path Path of the directory. If this path has a host component, then
+ * all paths added later via {@link #put(FileStatus)} must also have
+ * the same host.
+ * @param listing Entries in the directory.
+ * @param isAuthoritative true iff listing is the full contents of the
+ * directory, and the calling client reports that this may be cached as
+ * the full and authoritative listing of all files in the directory.
+ */
+ public DirListingMetadata(Path path, Collection<PathMetadata> listing,
+ boolean isAuthoritative) {
+
+ checkPathAbsolute(path);
+ this.path = path;
+
+ if (listing != null) {
+ for (PathMetadata entry : listing) {
+ Path childPath = entry.getFileStatus().getPath();
+ checkChildPath(childPath);
+ listMap.put(childPath, entry);
+ }
+ }
+ this.isAuthoritative = isAuthoritative;
+ }
+
+ /**
+ * Copy constructor.
+ * @param d the existing {@link DirListingMetadata} object.
+ */
+ public DirListingMetadata(DirListingMetadata d) {
+ path = d.path;
+ isAuthoritative = d.isAuthoritative;
+ listMap = new ConcurrentHashMap<>(d.listMap);
+ }
+
+ /**
+ * @return {@code Path} of the directory that contains this listing.
+ */
+ public Path getPath() {
+ return path;
+ }
+
+ /**
+ * @return entries in the directory
+ */
+ public Collection<PathMetadata> getListing() {
+ return Collections.unmodifiableCollection(listMap.values());
+ }
+
+ public Set<Path> listTombstones() {
+ Set<Path> tombstones = new HashSet<>();
+ for (PathMetadata meta : listMap.values()) {
+ if (meta.isDeleted()) {
+ tombstones.add(meta.getFileStatus().getPath());
+ }
+ }
+ return tombstones;
+ }
+
+ public DirListingMetadata withoutTombstones() {
+ Collection<PathMetadata> filteredList = new ArrayList<>();
+ for (PathMetadata meta : listMap.values()) {
+ if (!meta.isDeleted()) {
+ filteredList.add(meta);
+ }
+ }
+ return new DirListingMetadata(path, filteredList, isAuthoritative);
+ }
+
+ /**
+ * @return number of entries tracked. This is not the same as the number
+ * of entries in the actual directory unless {@link #isAuthoritative()} is
+ * true.
+ */
+ public int numEntries() {
+ return listMap.size();
+ }
+
+ /**
+ * @return true iff this directory listing is full and authoritative within
+ * the scope of the {@code MetadataStore} that returned it.
+ */
+ public boolean isAuthoritative() {
+ return isAuthoritative;
+ }
+
+
+ /**
+ * Is the underlying directory known to be empty?
+ * @return FALSE if directory is known to have a child entry, TRUE if
+ * directory is known to be empty, UNKNOWN otherwise.
+ */
+ public Tristate isEmpty() {
+ if (getListing().isEmpty()) {
+ if (isAuthoritative()) {
+ return Tristate.TRUE;
+ } else {
+ // This listing is empty, but may not be full list of underlying dir.
+ return Tristate.UNKNOWN;
+ }
+ } else { // not empty listing
+ // There exists at least one child, dir not empty.
+ return Tristate.FALSE;
+ }
+ }
+
+ /**
+ * Marks this directory listing as full and authoritative.
+ * @param authoritative see {@link #isAuthoritative()}.
+ */
+ public void setAuthoritative(boolean authoritative) {
+ this.isAuthoritative = authoritative;
+ }
+
+ /**
+ * Lookup entry within this directory listing. This may return null if the
+ * {@code MetadataStore} only tracks a partial set of the directory entries.
+ * In the case where {@link #isAuthoritative()} is true, however, this
+ * function returns null iff the directory is known not to contain the listing
+ * at given path (within the scope of the {@code MetadataStore} that returned
+ * it).
+ *
+ * @param childPath path of entry to look for.
+ * @return entry, or null if it is not present or not being tracked.
+ */
+ public PathMetadata get(Path childPath) {
+ checkChildPath(childPath);
+ return listMap.get(childPath);
+ }
+
+ /**
+ * Replace an entry with a tombstone.
+ * @param childPath path of entry to replace.
+ */
+ public void markDeleted(Path childPath) {
+ checkChildPath(childPath);
+ listMap.put(childPath, PathMetadata.tombstone(childPath));
+ }
+
+ /**
+ * Remove entry from this directory.
+ *
+ * @param childPath path of entry to remove.
+ */
+ public void remove(Path childPath) {
+ checkChildPath(childPath);
+ listMap.remove(childPath);
+ }
+
+ /**
+ * Add an entry to the directory listing. If this listing already contains a
+ * {@code FileStatus} with the same path, it will be replaced.
+ *
+ * @param childFileStatus entry to add to this directory listing.
+ * @return true if the status was added or replaced with a new value. False
+ * if the same FileStatus value was already present.
+ */
+ public boolean put(FileStatus childFileStatus) {
+ Preconditions.checkNotNull(childFileStatus,
+ "childFileStatus must be non-null");
+ Path childPath = childStatusToPathKey(childFileStatus);
+ PathMetadata newValue = new PathMetadata(childFileStatus);
+ PathMetadata oldValue = listMap.put(childPath, newValue);
+ return oldValue == null || !oldValue.equals(newValue);
+ }
+
+ @Override
+ public String toString() {
+ return "DirListingMetadata{" +
+ "path=" + path +
+ ", listMap=" + listMap +
+ ", isAuthoritative=" + isAuthoritative +
+ '}';
+ }
+
+ /**
+ * Log contents to supplied StringBuilder in a pretty fashion.
+ * @param sb target StringBuilder
+ */
+ public void prettyPrint(StringBuilder sb) {
+ sb.append(String.format("DirMeta %-20s %-18s",
+ path.toString(),
+ isAuthoritative ? "Authoritative" : "Not Authoritative"));
+ for (Map.Entry<Path, PathMetadata> entry : listMap.entrySet()) {
+ sb.append("\n key: ").append(entry.getKey()).append(": ");
+ entry.getValue().prettyPrint(sb);
+ }
+ sb.append("\n");
+ }
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ prettyPrint(sb);
+ return sb.toString();
+ }
+
+ /**
+ * Checks that child path is valid.
+ * @param childPath path to check.
+ */
+ private void checkChildPath(Path childPath) {
+ checkPathAbsolute(childPath);
+
+ // If this dir's path has host (and thus scheme), so must its children
+ URI parentUri = path.toUri();
+ if (parentUri.getHost() != null) {
+ URI childUri = childPath.toUri();
+ Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " +
+ "host: %s", childUri);
+ Preconditions.checkArgument(
+ childUri.getHost().equals(parentUri.getHost()),
+ "childUri %s and parentUri %s must have the same host",
+ childUri, parentUri);
+ Preconditions.checkNotNull(childUri.getScheme(), "No scheme in path %s",
+ childUri);
+ }
+ Preconditions.checkArgument(!childPath.isRoot(),
+ "childPath cannot be the root path: %s", childPath);
+ Preconditions.checkArgument(childPath.getParent().equals(path),
+ "childPath %s must be a child of %s", childPath, path);
+ }
+
+ /**
+ * For Paths that are handed in directly, we assert they are in consistent
+ * format with checkPath(). For paths that are supplied embedded in
+ * FileStatus, we attempt to fill in missing scheme and host, when this
+ * DirListingMetadata is associated with one.
+ *
+ * @return Path suitable for consistent hashtable lookups
+ * @throws NullPointerException null status argument
+ * @throws IllegalArgumentException bad status values or failure to
+ * create a URI.
+ */
+ private Path childStatusToPathKey(FileStatus status) {
+ Path p = status.getPath();
+ Preconditions.checkNotNull(p, "Child status' path cannot be null");
+ Preconditions.checkArgument(!p.isRoot(),
+ "childPath cannot be the root path: %s", p);
+ Preconditions.checkArgument(p.getParent().equals(path),
+ "childPath %s must be a child of %s", p, path);
+ URI uri = p.toUri();
+ URI parentUri = path.toUri();
+ // If FileStatus' path is missing host, but should have one, add it.
+ if (uri.getHost() == null && parentUri.getHost() != null) {
+ try {
+ return new Path(new URI(parentUri.getScheme(), parentUri.getHost(),
+ uri.getPath(), uri.getFragment()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("FileStatus path invalid with" +
+ " added " + parentUri.getScheme() + "://" + parentUri.getHost() +
+ " added", e);
+ }
+ }
+ return p;
+ }
+
+ private void checkPathAbsolute(Path p) {
+ Preconditions.checkNotNull(p, "path must be non-null");
+ Preconditions.checkArgument(p.isAbsolute(), "path must be absolute: %s", p);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org