You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/09/29 10:29:43 UTC
[6/8] hadoop git commit: HADOOP-13345 S3Guard: Improved Consistency
for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu,
Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
new file mode 100644
index 0000000..66ada49
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+
+/**
+ * Interface to create a DynamoDB client.
+ *
+ * Implementation should be configured for setting and getting configuration.
+ */
+@InterfaceAudience.Private
+public interface DynamoDBClientFactory extends Configurable {
+ Logger LOG = LoggerFactory.getLogger(DynamoDBClientFactory.class);
+
+ /**
+ * Create a DynamoDB client object from configuration.
+ *
+ * The DynamoDB client to create does not have to relate to any S3 buckets.
+ * All information needed to create a DynamoDB client is from the hadoop
+ * configuration. Specially, if the region is not configured, it will use the
+ * provided region parameter. If region is neither configured nor provided,
+ * it will indicate an error.
+ *
+ * @param defaultRegion the default region of the AmazonDynamoDB client
+ * @return a new DynamoDB client
+ * @throws IOException if any IO error happens
+ */
+ AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException;
+
+ /**
+ * The default implementation for creating an AmazonDynamoDB.
+ */
+ class DefaultDynamoDBClientFactory extends Configured
+ implements DynamoDBClientFactory {
+ @Override
+ public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
+ throws IOException {
+ Preconditions.checkNotNull(getConf(),
+ "Should have been configured before usage");
+
+ final Configuration conf = getConf();
+ final AWSCredentialsProvider credentials =
+ createAWSCredentialProviderSet(null, conf);
+ final ClientConfiguration awsConf =
+ DefaultS3ClientFactory.createAwsConf(conf);
+
+ final String region = getRegion(conf, defaultRegion);
+ LOG.debug("Creating DynamoDB client in region {}", region);
+
+ return AmazonDynamoDBClientBuilder.standard()
+ .withCredentials(credentials)
+ .withClientConfiguration(awsConf)
+ .withRegion(region)
+ .build();
+ }
+
+ /**
+ * Helper method to get and validate the AWS region for DynamoDBClient.
+ *
+ * @param conf configuration
+ * @param defaultRegion the default region
+ * @return configured region or else the provided default region
+ * @throws IOException if the region is not valid
+ */
+ static String getRegion(Configuration conf, String defaultRegion)
+ throws IOException {
+ String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+ if (StringUtils.isEmpty(region)) {
+ region = defaultRegion;
+ }
+ try {
+ Regions.fromName(region);
+ } catch (IllegalArgumentException | NullPointerException e) {
+ throw new IOException("Invalid region specified: " + region + "; " +
+ "Region can be configured with " + S3GUARD_DDB_REGION_KEY + ": " +
+ validRegionsString());
+ }
+ return region;
+ }
+
+ private static String validRegionsString() {
+ final String delimiter = ", ";
+ Regions[] regions = Regions.values();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < regions.length; i++) {
+ if (i > 0) {
+ sb.append(delimiter);
+ }
+ sb.append(regions[i].getName());
+ }
+ return sb.toString();
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
new file mode 100644
index 0000000..1bed03d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.ItemCollection;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
+import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.Tristate;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
+
+/**
+ * DynamoDBMetadataStore is a {@link MetadataStore} that persists
+ * file system metadata to DynamoDB.
+ *
+ * The current implementation uses a schema consisting of a single table. The
+ * name of the table can be configured by config key
+ * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_NAME_KEY}.
+ * By default, it matches the name of the S3 bucket. Each item in the table
+ * represents a single directory or file. Its path is split into separate table
+ * attributes:
+ * <ul>
+ * <li> parent (absolute path of the parent, with bucket name inserted as
+ * first path component). </li>
+ * <li> child (path of that specific child, relative to parent). </li>
+ * <li> optional boolean attribute tracking whether the path is a directory.
+ * Absence or a false value indicates the path is a file. </li>
+ * <li> optional long attribute revealing modification time of file.
+ * This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing file length.
+ * This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing block size of the file.
+ * This attribute is meaningful only to file items.</li>
+ * </ul>
+ *
+ * The DynamoDB partition key is the parent, and the range key is the child.
+ *
+ * To allow multiple buckets to share the same DynamoDB table, the bucket
+ * name is treated as the root directory.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * s3a://bucket/dir1
+ * |-- dir2
+ * | |-- file1
+ * | `-- file2
+ * `-- dir3
+ * |-- dir4
+ * | `-- file3
+ * |-- dir5
+ * | `-- file4
+ * `-- dir6
+ * </pre>
+ *
+ * This is persisted to a single DynamoDB table as:
+ *
+ * <pre>
+ * =========================================================================
+ * | parent | child | is_dir | mod_time | len | ... |
+ * =========================================================================
+ * | /bucket | dir1 | true | | | |
+ * | /bucket/dir1 | dir2 | true | | | |
+ * | /bucket/dir1 | dir3 | true | | | |
+ * | /bucket/dir1/dir2 | file1 | | 100 | 111 | |
+ * | /bucket/dir1/dir2 | file2 | | 200 | 222 | |
+ * | /bucket/dir1/dir3 | dir4 | true | | | |
+ * | /bucket/dir1/dir3 | dir5 | true | | | |
+ * | /bucket/dir1/dir3/dir4 | file3 | | 300 | 333 | |
+ * | /bucket/dir1/dir3/dir5 | file4 | | 400 | 444 | |
+ * | /bucket/dir1/dir3 | dir6 | true | | | |
+ * =========================================================================
+ * </pre>
+ *
+ * This choice of schema is efficient for read access patterns.
+ * {@link #get(Path)} can be served from a single item lookup.
+ * {@link #listChildren(Path)} can be served from a query against all rows
+ * matching the parent (the partition key) and the returned list is guaranteed
+ * to be sorted by child (the range key). Tracking whether or not a path is a
+ * directory helps prevent unnecessary queries during traversal of an entire
+ * sub-tree.
+ *
+ * Some mutating operations, notably {@link #deleteSubtree(Path)} and
+ * {@link #move(Collection, Collection)}, are less efficient with this schema.
+ * They require mutating multiple items in the DynamoDB table.
+ *
+ * By default, DynamoDB access is performed within the same AWS region as
+ * the S3 bucket that hosts the S3A instance. During initialization, it checks
+ * the location of the S3 bucket and creates a DynamoDB client connected to the
+ * same region. The region may also be set explicitly by setting the config
+ * parameter {@code fs.s3a.s3guard.ddb.region} to the corresponding region.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DynamoDBMetadataStore implements MetadataStore {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ DynamoDBMetadataStore.class);
+
+ /** parent/child name to use in the version marker. */
+ public static final String VERSION_MARKER = "../VERSION";
+
+ /** Current version number. */
+ public static final int VERSION = 100;
+
+ /** Error: version marker not found in table. */
+ public static final String E_NO_VERSION_MARKER
+ = "S3Guard table lacks version marker.";
+
+ /** Error: version mismatch. */
+ public static final String E_INCOMPATIBLE_VERSION
+ = "Database table is from an incompatible S3Guard version.";
+
+ /** Initial delay for retries when batched operations get throttled by
+ * DynamoDB. Value is {@value} msec. */
+ public static final long MIN_RETRY_SLEEP_MSEC = 100;
+
+ private static ValueMap deleteTrackingValueMap =
+ new ValueMap().withBoolean(":false", false);
+
+ private DynamoDB dynamoDB;
+ private String region;
+ private Table table;
+ private String tableName;
+ private Configuration conf;
+ private String username;
+
+ private RetryPolicy dataAccessRetryPolicy;
+ private S3AInstrumentation.S3GuardInstrumentation instrumentation;
+
+ /**
+ * A utility function to create DynamoDB instance.
+ * @param conf the file system configuration
+ * @param s3Region region of the associated S3 bucket (if any).
+ * @return DynamoDB instance.
+ * @throws IOException I/O error.
+ */
+ private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
+ throws IOException {
+ Preconditions.checkNotNull(conf);
+ final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
+ S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+ S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
+ DynamoDBClientFactory.class);
+ LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
+ final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
+ .createDynamoDBClient(s3Region);
+ return new DynamoDB(dynamoDBClient);
+ }
+
+ @Override
+ public void initialize(FileSystem fs) throws IOException {
+ Preconditions.checkArgument(fs instanceof S3AFileSystem,
+ "DynamoDBMetadataStore only supports S3A filesystem.");
+ final S3AFileSystem s3afs = (S3AFileSystem) fs;
+ instrumentation = s3afs.getInstrumentation().getS3GuardInstrumentation();
+ final String bucket = s3afs.getBucket();
+ String confRegion = s3afs.getConf().getTrimmed(S3GUARD_DDB_REGION_KEY);
+ if (!StringUtils.isEmpty(confRegion)) {
+ region = confRegion;
+ LOG.debug("Overriding S3 region with configured DynamoDB region: {}",
+ region);
+ } else {
+ region = s3afs.getBucketLocation();
+ LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
+ }
+ username = s3afs.getUsername();
+ conf = s3afs.getConf();
+ dynamoDB = createDynamoDB(conf, region);
+
+ // use the bucket as the DynamoDB table name if not specified in config
+ tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
+ setMaxRetries(conf);
+
+ initTable();
+
+ instrumentation.initialized();
+ }
+
+ /**
+ * Performs one-time initialization of the metadata store via configuration.
+ *
+ * This initialization depends on the configuration object to get AWS
+ * credentials, DynamoDBFactory implementation class, DynamoDB endpoints,
+ * DynamoDB table names etc. After initialization, this metadata store does
+ * not explicitly relate to any S3 bucket, which be nonexistent.
+ *
+ * This is used to operate the metadata store directly beyond the scope of the
+ * S3AFileSystem integration, e.g. command line tools.
+ * Generally, callers should use {@link #initialize(FileSystem)}
+ * with an initialized {@code S3AFileSystem} instance.
+ *
+ * Without a filesystem to act as a reference point, the configuration itself
+ * must declare the table name and region in the
+ * {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and
+ * {@link Constants#S3GUARD_DDB_REGION_KEY} respectively.
+ *
+ * @see #initialize(FileSystem)
+ * @throws IOException if there is an error
+ * @throws IllegalArgumentException if the configuration is incomplete
+ */
+ @Override
+ public void initialize(Configuration config) throws IOException {
+ conf = config;
+ // use the bucket as the DynamoDB table name if not specified in config
+ tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
+ Preconditions.checkArgument(!StringUtils.isEmpty(tableName),
+ "No DynamoDB table name configured");
+ region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+ Preconditions.checkArgument(!StringUtils.isEmpty(region),
+ "No DynamoDB region configured");
+ dynamoDB = createDynamoDB(conf, region);
+
+ username = UserGroupInformation.getCurrentUser().getShortUserName();
+ setMaxRetries(conf);
+
+ initTable();
+ }
+
+ /**
+ * Set retry policy. This is driven by the value of
+ * {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff
+ * between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds.
+ * @param config
+ */
+ private void setMaxRetries(Configuration config) {
+ int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES,
+ S3GUARD_DDB_MAX_RETRIES_DEFAULT);
+ dataAccessRetryPolicy = RetryPolicies
+ .exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC,
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void delete(Path path) throws IOException {
+ innerDelete(path, true);
+ }
+
+ @Override
+ public void forgetMetadata(Path path) throws IOException {
+ innerDelete(path, false);
+ }
+
+ /**
+ * Inner delete option, action based on the {@code tombstone} flag.
+ * No tombstone: delete the entry. Tombstone: create a tombstone entry.
+ * There is no check as to whether the entry exists in the table first.
+ * @param path path to delete
+ * @param tombstone flag to create a tombstone marker
+ * @throws IOException I/O error.
+ */
+ private void innerDelete(Path path, boolean tombstone)
+ throws IOException {
+ path = checkPath(path);
+ LOG.debug("Deleting from table {} in region {}: {}",
+ tableName, region, path);
+
+ // deleting nonexistent item consumes 1 write capacity; skip it
+ if (path.isRoot()) {
+ LOG.debug("Skip deleting root directory as it does not exist in table");
+ return;
+ }
+
+ try {
+ if (tombstone) {
+ Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
+ PathMetadata.tombstone(path));
+ table.putItem(item);
+ } else {
+ table.deleteItem(pathToKey(path));
+ }
+ } catch (AmazonClientException e) {
+ throw translateException("delete", path, e);
+ }
+ }
+
+ @Override
+ public void deleteSubtree(Path path) throws IOException {
+ path = checkPath(path);
+ LOG.debug("Deleting subtree from table {} in region {}: {}",
+ tableName, region, path);
+
+ final PathMetadata meta = get(path);
+ if (meta == null || meta.isDeleted()) {
+ LOG.debug("Subtree path {} does not exist; this will be a no-op", path);
+ return;
+ }
+
+ for (DescendantsIterator desc = new DescendantsIterator(this, meta);
+ desc.hasNext();) {
+ innerDelete(desc.next().getPath(), true);
+ }
+ }
+
+ private Item getConsistentItem(PrimaryKey key) {
+ final GetItemSpec spec = new GetItemSpec()
+ .withPrimaryKey(key)
+ .withConsistentRead(true); // strictly consistent read
+ return table.getItem(spec);
+ }
+
+ @Override
+ public PathMetadata get(Path path) throws IOException {
+ return get(path, false);
+ }
+
+ @Override
+ public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+ throws IOException {
+ path = checkPath(path);
+ LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
+
+ try {
+ final PathMetadata meta;
+ if (path.isRoot()) {
+ // Root does not persist in the table
+ meta = new PathMetadata(makeDirStatus(username, path));
+ } else {
+ final Item item = getConsistentItem(pathToKey(path));
+ meta = itemToPathMetadata(item, username);
+ LOG.debug("Get from table {} in region {} returning for {}: {}",
+ tableName, region, path, meta);
+ }
+
+ if (wantEmptyDirectoryFlag && meta != null) {
+ final FileStatus status = meta.getFileStatus();
+ // for directory, we query its direct children to determine isEmpty bit
+ if (status.isDirectory()) {
+ final QuerySpec spec = new QuerySpec()
+ .withHashKey(pathToParentKeyAttribute(path))
+ .withConsistentRead(true)
+ .withFilterExpression(IS_DELETED + " = :false")
+ .withValueMap(deleteTrackingValueMap);
+ final ItemCollection<QueryOutcome> items = table.query(spec);
+ boolean hasChildren = items.iterator().hasNext();
+ // When this class has support for authoritative
+ // (fully-cached) directory listings, we may also be able to answer
+ // TRUE here. Until then, we don't know if we have full listing or
+ // not, thus the UNKNOWN here:
+ meta.setIsEmptyDirectory(
+ hasChildren ? Tristate.FALSE : Tristate.UNKNOWN);
+ }
+ }
+
+ return meta;
+ } catch (AmazonClientException e) {
+ throw translateException("get", path, e);
+ }
+ }
+
+ /**
+ * Make a FileStatus object for a directory at given path. The FileStatus
+ * only contains what S3A needs, and omits mod time since S3A uses its own
+ * implementation which returns current system time.
+ * @param owner username of owner
+ * @param path path to dir
+ * @return new FileStatus
+ */
+ private FileStatus makeDirStatus(String owner, Path path) {
+ return new FileStatus(0, true, 1, 0, 0, 0, null,
+ owner, null, path);
+ }
+
+ @Override
+ public DirListingMetadata listChildren(Path path) throws IOException {
+ path = checkPath(path);
+ LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
+
+ // find the children in the table
+ try {
+ final QuerySpec spec = new QuerySpec()
+ .withHashKey(pathToParentKeyAttribute(path))
+ .withConsistentRead(true); // strictly consistent read
+ final ItemCollection<QueryOutcome> items = table.query(spec);
+
+ final List<PathMetadata> metas = new ArrayList<>();
+ for (Item item : items) {
+ PathMetadata meta = itemToPathMetadata(item, username);
+ metas.add(meta);
+ }
+ LOG.trace("Listing table {} in region {} for {} returning {}",
+ tableName, region, path, metas);
+
+ return (metas.isEmpty() && get(path) == null)
+ ? null
+ : new DirListingMetadata(path, metas, false);
+ } catch (AmazonClientException e) {
+ // failure, including the path not being present
+ throw translateException("listChildren", path, e);
+ }
+ }
+
+ // build the list of all parent entries.
+ Collection<PathMetadata> completeAncestry(
+ Collection<PathMetadata> pathsToCreate) {
+ // Key on path to allow fast lookup
+ Map<Path, PathMetadata> ancestry = new HashMap<>();
+
+ for (PathMetadata meta : pathsToCreate) {
+ Preconditions.checkArgument(meta != null);
+ Path path = meta.getFileStatus().getPath();
+ if (path.isRoot()) {
+ break;
+ }
+ ancestry.put(path, meta);
+ Path parent = path.getParent();
+ while (!parent.isRoot() && !ancestry.containsKey(parent)) {
+ LOG.debug("auto-create ancestor path {} for child path {}",
+ parent, path);
+ final FileStatus status = makeDirStatus(parent, username);
+ ancestry.put(parent, new PathMetadata(status, Tristate.FALSE, false));
+ parent = parent.getParent();
+ }
+ }
+ return ancestry.values();
+ }
+
+ @Override
+ public void move(Collection<Path> pathsToDelete,
+ Collection<PathMetadata> pathsToCreate) throws IOException {
+ if (pathsToDelete == null && pathsToCreate == null) {
+ return;
+ }
+
+ LOG.debug("Moving paths of table {} in region {}: {} paths to delete and {}"
+ + " paths to create", tableName, region,
+ pathsToDelete == null ? 0 : pathsToDelete.size(),
+ pathsToCreate == null ? 0 : pathsToCreate.size());
+ LOG.trace("move: pathsToDelete = {}, pathsToCreate = {}", pathsToDelete,
+ pathsToCreate);
+
+ // In DynamoDBMetadataStore implementation, we assume that if a path
+ // exists, all its ancestors will also exist in the table.
+ // Following code is to maintain this invariant by putting all ancestor
+ // directories of the paths to create.
+ // ancestor paths that are not explicitly added to paths to create
+ Collection<PathMetadata> newItems = new ArrayList<>();
+ if (pathsToCreate != null) {
+ newItems.addAll(completeAncestry(pathsToCreate));
+ }
+ if (pathsToDelete != null) {
+ for (Path meta : pathsToDelete) {
+ newItems.add(PathMetadata.tombstone(meta));
+ }
+ }
+
+ try {
+ processBatchWriteRequest(null, pathMetadataToItem(newItems));
+ } catch (AmazonClientException e) {
+ throw translateException("move", (String) null, e);
+ }
+ }
+
+ /**
+ * Helper method to issue a batch write request to DynamoDB.
+ *
+ * Callers of this method should catch the {@link AmazonClientException} and
+ * translate it for better error report and easier debugging.
+ * @param keysToDelete primary keys to be deleted; can be null
+ * @param itemsToPut new items to be put; can be null
+ */
+ private void processBatchWriteRequest(PrimaryKey[] keysToDelete,
+ Item[] itemsToPut) throws IOException {
+ final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
+ final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
+ int count = 0;
+ while (count < totalToDelete + totalToPut) {
+ final TableWriteItems writeItems = new TableWriteItems(tableName);
+ int numToDelete = 0;
+ if (keysToDelete != null
+ && count < totalToDelete) {
+ numToDelete = Math.min(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT,
+ totalToDelete - count);
+ writeItems.withPrimaryKeysToDelete(
+ Arrays.copyOfRange(keysToDelete, count, count + numToDelete));
+ count += numToDelete;
+ }
+
+ if (numToDelete < S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT
+ && itemsToPut != null
+ && count < totalToDelete + totalToPut) {
+ final int numToPut = Math.min(
+ S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT - numToDelete,
+ totalToDelete + totalToPut - count);
+ final int index = count - totalToDelete;
+ writeItems.withItemsToPut(
+ Arrays.copyOfRange(itemsToPut, index, index + numToPut));
+ count += numToPut;
+ }
+
+ BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
+ // Check for unprocessed keys in case of exceeding provisioned throughput
+ Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
+ int retryCount = 0;
+ while (unprocessed.size() > 0) {
+ retryBackoff(retryCount++);
+ res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
+ unprocessed = res.getUnprocessedItems();
+ }
+ }
+ }
+
+ /**
+ * Put the current thread to sleep to implement exponential backoff
+ * depending on retryCount. If max retries are exceeded, throws an
+ * exception instead.
+ * @param retryCount number of retries so far
+ * @throws IOException when max retryCount is exceeded.
+ */
+ private void retryBackoff(int retryCount) throws IOException {
+ try {
+ // Our RetryPolicy ignores everything but retryCount here.
+ RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
+ retryCount, 0, true);
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ throw new IOException(
+ String.format("Max retries exceeded (%d) for DynamoDB",
+ retryCount));
+ } else {
+ LOG.debug("Sleeping {} msec before next retry", action.delayMillis);
+ Thread.sleep(action.delayMillis);
+ }
+ } catch (Exception e) {
+ throw new IOException("Unexpected exception", e);
+ }
+ }
+
+ @Override
+ public void put(PathMetadata meta) throws IOException {
+ // For a deeply nested path, this method will automatically create the full
+ // ancestry and save respective item in DynamoDB table.
+ // So after put operation, we maintain the invariant that if a path exists,
+ // all its ancestors will also exist in the table.
+ // For performance purpose, we generate the full paths to put and use batch
+ // write item request to save the items.
+ LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+
+ Collection<PathMetadata> wrapper = new ArrayList<>(1);
+ wrapper.add(meta);
+ put(wrapper);
+ }
+
+ @Override
+ public void put(Collection<PathMetadata> metas) throws IOException {
+ LOG.debug("Saving batch to table {} in region {}", tableName, region);
+
+ processBatchWriteRequest(null, pathMetadataToItem(completeAncestry(metas)));
+ }
+
+ /**
+ * Helper method to get full path of ancestors that are nonexistent in table.
+ */
+ private Collection<PathMetadata> fullPathsToPut(PathMetadata meta)
+ throws IOException {
+ checkPathMetadata(meta);
+ final Collection<PathMetadata> metasToPut = new ArrayList<>();
+ // root path is not persisted
+ if (!meta.getFileStatus().getPath().isRoot()) {
+ metasToPut.add(meta);
+ }
+
+ // put all its ancestors if not present; as an optimization we return at its
+ // first existent ancestor
+ Path path = meta.getFileStatus().getPath().getParent();
+ while (path != null && !path.isRoot()) {
+ final Item item = getConsistentItem(pathToKey(path));
+ if (!itemExists(item)) {
+ final FileStatus status = makeDirStatus(path, username);
+ metasToPut.add(new PathMetadata(status, Tristate.FALSE, false));
+ path = path.getParent();
+ } else {
+ break;
+ }
+ }
+ return metasToPut;
+ }
+
+ private boolean itemExists(Item item) {
+ if (item == null) {
+ return false;
+ }
+ if (item.hasAttribute(IS_DELETED) &&
+ item.getBoolean(IS_DELETED)) {
+ return false;
+ }
+ return true;
+ }
+
+ /** Create a directory FileStatus using current system time as mod time. */
+ static FileStatus makeDirStatus(Path f, String owner) {
+ return new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+ null, owner, owner, f);
+ }
+
+ @Override
+ public void put(DirListingMetadata meta) throws IOException {
+ LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+
+ // directory path
+ PathMetadata p = new PathMetadata(makeDirStatus(meta.getPath(), username),
+ meta.isEmpty(), false);
+
+ // First add any missing ancestors...
+ final Collection<PathMetadata> metasToPut = fullPathsToPut(p);
+
+ // next add all children of the directory
+ metasToPut.addAll(meta.getListing());
+
+ try {
+ processBatchWriteRequest(null, pathMetadataToItem(metasToPut));
+ } catch (AmazonClientException e) {
+ throw translateException("put", (String) null, e);
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (instrumentation != null) {
+ instrumentation.storeClosed();
+ }
+ if (dynamoDB != null) {
+ LOG.debug("Shutting down {}", this);
+ dynamoDB.shutdown();
+ dynamoDB = null;
+ }
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ if (table == null) {
+ LOG.info("In destroy(): no table to delete");
+ return;
+ }
+ LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
+ Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
+ try {
+ table.delete();
+ table.waitForDelete();
+ } catch (ResourceNotFoundException rnfe) {
+ LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in "
+ + "region {}. This may indicate that the table does not exist, "
+ + "or has been deleted by another concurrent thread or process.",
+ tableName, region);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
+ tableName, ie);
+ throw new InterruptedIOException("Table " + tableName
+ + " in region " + region + " has not been deleted");
+ } catch (AmazonClientException e) {
+ throw translateException("destroy", (String) null, e);
+ }
+ }
+
+ private ItemCollection<ScanOutcome> expiredFiles(long modTime) {
+ String filterExpression = "mod_time < :mod_time";
+ String projectionExpression = "parent,child";
+ ValueMap map = new ValueMap().withLong(":mod_time", modTime);
+ return table.scan(filterExpression, projectionExpression, null, map);
+ }
+
+ @Override
+ public void prune(long modTime) throws IOException {
+ int itemCount = 0;
+ try {
+ Collection<Path> deletionBatch =
+ new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
+ int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
+ S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
+ for (Item item : expiredFiles(modTime)) {
+ PathMetadata md = PathMetadataDynamoDBTranslation
+ .itemToPathMetadata(item, username);
+ Path path = md.getFileStatus().getPath();
+ deletionBatch.add(path);
+ itemCount++;
+ if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
+ Thread.sleep(delay);
+ processBatchWriteRequest(pathToKey(deletionBatch), null);
+ deletionBatch.clear();
+ }
+ }
+ if (deletionBatch.size() > 0) {
+ Thread.sleep(delay);
+ processBatchWriteRequest(pathToKey(deletionBatch), null);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new InterruptedIOException("Pruning was interrupted");
+ }
+ LOG.info("Finished pruning {} items in batches of {}", itemCount,
+ S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + '{'
+ + "region=" + region
+ + ", tableName=" + tableName
+ + '}';
+ }
+
+ /**
+ * Create a table if it does not exist and wait for it to become active.
+ *
+ * If a table with the intended name already exists, then it uses that table.
+ * Otherwise, it will automatically create the table if the config
+ * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
+ * enabled. The DynamoDB table creation API is asynchronous. This method wait
+ * for the table to become active after sending the creation request, so
+ * overall, this method is synchronous, and the table is guaranteed to exist
+ * after this method returns successfully.
+ *
+ * @throws IOException if table does not exist and auto-creation is disabled;
+ * or table is being deleted, or any other I/O exception occurred.
+ */
+ @VisibleForTesting
+ void initTable() throws IOException {
+ table = dynamoDB.getTable(tableName);
+ try {
+ try {
+ LOG.debug("Binding to table {}", tableName);
+ final String status = table.describe().getTableStatus();
+ switch (status) {
+ case "CREATING":
+ case "UPDATING":
+ LOG.debug("Table {} in region {} is being created/updated. This may"
+ + " indicate that the table is being operated by another "
+ + "concurrent thread or process. Waiting for active...",
+ tableName, region);
+ waitForTableActive(table);
+ break;
+ case "DELETING":
+ throw new FileNotFoundException("DynamoDB table "
+ + "'" + tableName + "' is being "
+ + "deleted in region " + region);
+ case "ACTIVE":
+ break;
+ default:
+ throw new IOException("Unknown DynamoDB table status " + status
+ + ": tableName='" + tableName + "', region=" + region);
+ }
+
+ final Item versionMarker = getVersionMarkerItem();
+ verifyVersionCompatibility(tableName, versionMarker);
+ Long created = extractCreationTimeFromMarker(versionMarker);
+ LOG.debug("Using existing DynamoDB table {} in region {} created {}",
+ tableName, region, (created != null) ? new Date(created) : null);
+ } catch (ResourceNotFoundException rnfe) {
+ if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
+ final ProvisionedThroughput capacity = new ProvisionedThroughput(
+ conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
+ S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT),
+ conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
+ S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT));
+
+ createTable(capacity);
+ } else {
+ throw new FileNotFoundException("DynamoDB table "
+ + "'" + tableName + "' does not "
+ + "exist in region " + region + "; auto-creation is turned off");
+ }
+ }
+
+ } catch (AmazonClientException e) {
+ throw translateException("initTable", (String) null, e);
+ }
+ }
+
+ /**
+ * Get the version mark item in the existing DynamoDB table.
+ *
+ * As the version marker item may be created by another concurrent thread or
+ * process, we retry a limited times before we fail to get it.
+ */
+ private Item getVersionMarkerItem() throws IOException {
+ final PrimaryKey versionMarkerKey =
+ createVersionMarkerPrimaryKey(VERSION_MARKER);
+ int retryCount = 0;
+ Item versionMarker = table.getItem(versionMarkerKey);
+ while (versionMarker == null) {
+ try {
+ RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
+ retryCount, 0, true);
+ if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+ break;
+ } else {
+ LOG.debug("Sleeping {} ms before next retry", action.delayMillis);
+ Thread.sleep(action.delayMillis);
+ }
+ } catch (Exception e) {
+ throw new IOException("initTable: Unexpected exception", e);
+ }
+ retryCount++;
+ versionMarker = table.getItem(versionMarkerKey);
+ }
+ return versionMarker;
+ }
+
+ /**
+ * Verify that a table version is compatible with this S3Guard client.
+ * @param tableName name of the table (for error messages)
+ * @param versionMarker the version marker retrieved from the table
+ * @throws IOException on any incompatibility
+ */
+ @VisibleForTesting
+ static void verifyVersionCompatibility(String tableName,
+ Item versionMarker) throws IOException {
+ if (versionMarker == null) {
+ LOG.warn("Table {} contains no version marker", tableName);
+ throw new IOException(E_NO_VERSION_MARKER
+ + " Table: " + tableName);
+ } else {
+ final int version = extractVersionFromMarker(versionMarker);
+ if (VERSION != version) {
+ // version mismatch. Unless/until there is support for
+ // upgrading versions, treat this as an incompatible change
+ // and fail.
+ throw new IOException(E_INCOMPATIBLE_VERSION
+ + " Table "+ tableName
+ + " Expected version " + VERSION + " actual " + version);
+ }
+ }
+ }
+
+ /**
+ * Wait for table being active.
+ * @param t table to block on.
+ * @throws IOException IO problems
+ * @throws InterruptedIOException if the wait was interrupted
+ */
+ private void waitForTableActive(Table t) throws IOException {
+ try {
+ t.waitForActive();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for table {} in region {} active",
+ tableName, region, e);
+ Thread.currentThread().interrupt();
+ throw (IOException) new InterruptedIOException("DynamoDB table '"
+ + tableName + "' is not active yet in region " + region).initCause(e);
+ }
+ }
+
+ /**
+ * Create a table, wait for it to become active, then add the version
+ * marker.
+ * @param capacity capacity to provision
+ * @throws IOException on any failure.
+ * @throws InterruptedIOException if the wait was interrupted
+ */
+ private void createTable(ProvisionedThroughput capacity) throws IOException {
+ try {
+ LOG.info("Creating non-existent DynamoDB table {} in region {}",
+ tableName, region);
+ table = dynamoDB.createTable(new CreateTableRequest()
+ .withTableName(tableName)
+ .withKeySchema(keySchema())
+ .withAttributeDefinitions(attributeDefinitions())
+ .withProvisionedThroughput(capacity));
+ LOG.debug("Awaiting table becoming active");
+ } catch (ResourceInUseException e) {
+ LOG.warn("ResourceInUseException while creating DynamoDB table {} "
+ + "in region {}. This may indicate that the table was "
+ + "created by another concurrent thread or process.",
+ tableName, region);
+ }
+ waitForTableActive(table);
+ final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
+ System.currentTimeMillis());
+ putItem(marker);
+ }
+
+ /**
+ * PUT a single item to the table.
+ * @param item item to put
+ * @return the outcome.
+ */
+ PutItemOutcome putItem(Item item) {
+ LOG.debug("Putting item {}", item);
+ return table.putItem(item);
+ }
+
+ /**
+ * Provision the table with given read and write capacity units.
+ */
+ void provisionTable(Long readCapacity, Long writeCapacity)
+ throws IOException {
+ final ProvisionedThroughput toProvision = new ProvisionedThroughput()
+ .withReadCapacityUnits(readCapacity)
+ .withWriteCapacityUnits(writeCapacity);
+ try {
+ final ProvisionedThroughputDescription p =
+ table.updateTable(toProvision).getProvisionedThroughput();
+ LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+ + "writeCapacityUnits={}",
+ tableName, region, p.getReadCapacityUnits(),
+ p.getWriteCapacityUnits());
+ } catch (AmazonClientException e) {
+ throw translateException("provisionTable", (String) null, e);
+ }
+ }
+
+ Table getTable() {
+ return table;
+ }
+
+ String getRegion() {
+ return region;
+ }
+
+ @VisibleForTesting
+ DynamoDB getDynamoDB() {
+ return dynamoDB;
+ }
+
+ /**
+ * Validates a path object; it must be absolute, and contain a host
+ * (bucket) component.
+ */
+ private Path checkPath(Path path) {
+ Preconditions.checkNotNull(path);
+ Preconditions.checkArgument(path.isAbsolute(), "Path %s is not absolute",
+ path);
+ URI uri = path.toUri();
+ Preconditions.checkNotNull(uri.getScheme(), "Path %s missing scheme", path);
+ Preconditions.checkArgument(uri.getScheme().equals(Constants.FS_S3A),
+ "Path %s scheme must be %s", path, Constants.FS_S3A);
+ Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()), "Path %s" +
+ " is missing bucket.", path);
+ return path;
+ }
+
+ /**
+ * Validates a path meta-data object.
+ */
+ private static void checkPathMetadata(PathMetadata meta) {
+ Preconditions.checkNotNull(meta);
+ Preconditions.checkNotNull(meta.getFileStatus());
+ Preconditions.checkNotNull(meta.getFileStatus().getPath());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
new file mode 100644
index 0000000..1ef8b0d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Tristate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * This is a local, in-memory, implementation of MetadataStore.
+ * This is <i>not</i> a coherent cache across processes. It is only
+ * locally-coherent.
+ *
+ * The purpose of this is for unit and integration testing.
+ * It could also be used to accelerate local-only operations where only one
+ * process is operating on a given object store, or multiple processes are
+ * accessing a read-only storage bucket.
+ *
+ * This MetadataStore does not enforce filesystem rules such as disallowing
+ * non-recursive removal of non-empty directories. It is assumed the caller
+ * already has to perform these sorts of checks.
+ */
+public class LocalMetadataStore implements MetadataStore {
+
+ public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class);
+ // TODO HADOOP-13649: use time instead of capacity for eviction.
+ public static final int DEFAULT_MAX_RECORDS = 128;
+
+ /**
+ * Maximum number of records.
+ */
+ public static final String CONF_MAX_RECORDS =
+ "fs.metadatastore.local.max_records";
+
+ /** Contains directories and files. */
+ private LruHashMap<Path, PathMetadata> fileHash;
+
+ /** Contains directory listings. */
+ private LruHashMap<Path, DirListingMetadata> dirHash;
+
+ private FileSystem fs;
+ /* Null iff this FS does not have an associated URI host. */
+ private String uriHost;
+
+ @Override
+ public void initialize(FileSystem fileSystem) throws IOException {
+ Preconditions.checkNotNull(fileSystem);
+ fs = fileSystem;
+ URI fsURI = fs.getUri();
+ uriHost = fsURI.getHost();
+ if (uriHost != null && uriHost.equals("")) {
+ uriHost = null;
+ }
+
+ initialize(fs.getConf());
+ }
+
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ Preconditions.checkNotNull(conf);
+ int maxRecords = conf.getInt(CONF_MAX_RECORDS, DEFAULT_MAX_RECORDS);
+ if (maxRecords < 4) {
+ maxRecords = 4;
+ }
+ // Start w/ less than max capacity. Space / time trade off.
+ fileHash = new LruHashMap<>(maxRecords/2, maxRecords);
+ dirHash = new LruHashMap<>(maxRecords/4, maxRecords);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "LocalMetadataStore{");
+ sb.append(", uriHost='").append(uriHost).append('\'');
+ sb.append('}');
+ return sb.toString();
+ }
+
+ @Override
+ public void delete(Path p) throws IOException {
+ doDelete(p, false, true);
+ }
+
+ @Override
+ public void forgetMetadata(Path p) throws IOException {
+ doDelete(p, false, false);
+ }
+
+ @Override
+ public void deleteSubtree(Path path) throws IOException {
+ doDelete(path, true, true);
+ }
+
+ private synchronized void doDelete(Path p, boolean recursive, boolean
+ tombstone) {
+
+ Path path = standardize(p);
+
+ // Delete entry from file cache, then from cached parent directory, if any
+
+ deleteHashEntries(path, tombstone);
+
+ if (recursive) {
+ // Remove all entries that have this dir as path prefix.
+ deleteHashByAncestor(path, dirHash, tombstone);
+ deleteHashByAncestor(path, fileHash, tombstone);
+ }
+ }
+
+ @Override
+ public synchronized PathMetadata get(Path p) throws IOException {
+ return get(p, false);
+ }
+
+ @Override
+ public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag)
+ throws IOException {
+ Path path = standardize(p);
+ synchronized (this) {
+ PathMetadata m = fileHash.mruGet(path);
+
+ if (wantEmptyDirectoryFlag && m != null &&
+ m.getFileStatus().isDirectory()) {
+ m.setIsEmptyDirectory(isEmptyDirectory(p));
+ }
+
+ LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint());
+ return m;
+ }
+ }
+
+ /**
+ * Determine if directory is empty.
+ * Call with lock held.
+ * @param p a Path, already filtered through standardize()
+ * @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
+ */
+ private Tristate isEmptyDirectory(Path p) {
+ DirListingMetadata dirMeta = dirHash.get(p);
+ return dirMeta.withoutTombstones().isEmpty();
+ }
+
+ @Override
+ public synchronized DirListingMetadata listChildren(Path p) throws
+ IOException {
+ Path path = standardize(p);
+ DirListingMetadata listing = dirHash.mruGet(path);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("listChildren({}) -> {}", path,
+ listing == null ? "null" : listing.prettyPrint());
+ }
+ // Make a copy so callers can mutate without affecting our state
+ return listing == null ? null : new DirListingMetadata(listing);
+ }
+
+ @Override
+ public void move(Collection<Path> pathsToDelete,
+ Collection<PathMetadata> pathsToCreate) throws IOException {
+
+ Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
+ Preconditions.checkNotNull(pathsToCreate, "pathsToCreate is null");
+ Preconditions.checkArgument(pathsToDelete.size() == pathsToCreate.size(),
+ "Must supply same number of paths to delete/create.");
+
+ // I feel dirty for using reentrant lock. :-|
+ synchronized (this) {
+
+ // 1. Delete pathsToDelete
+ for (Path meta : pathsToDelete) {
+ LOG.debug("move: deleting metadata {}", meta);
+ delete(meta);
+ }
+
+ // 2. Create new destination path metadata
+ for (PathMetadata meta : pathsToCreate) {
+ LOG.debug("move: adding metadata {}", meta);
+ put(meta);
+ }
+
+ // 3. We now know full contents of all dirs in destination subtree
+ for (PathMetadata meta : pathsToCreate) {
+ FileStatus status = meta.getFileStatus();
+ if (status == null || status.isDirectory()) {
+ continue;
+ }
+ DirListingMetadata dir = listChildren(status.getPath());
+ if (dir != null) { // could be evicted already
+ dir.setAuthoritative(true);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void put(PathMetadata meta) throws IOException {
+
+ Preconditions.checkNotNull(meta);
+ FileStatus status = meta.getFileStatus();
+ Path path = standardize(status.getPath());
+ synchronized (this) {
+
+ /* Add entry for this file. */
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("put {} -> {}", path, meta.prettyPrint());
+ }
+ fileHash.put(path, meta);
+
+ /* Directory case:
+ * We also make sure we have an entry in the dirHash, so subsequent
+ * listStatus(path) at least see the directory.
+ *
+ * If we had a boolean flag argument "isNew", we would know whether this
+ * is an existing directory the client discovered via getFileStatus(),
+ * or if it is a newly-created directory. In the latter case, we would
+ * be able to mark the directory as authoritative (fully-cached),
+ * saving round trips to underlying store for subsequent listStatus()
+ */
+
+ if (status.isDirectory()) {
+ DirListingMetadata dir = dirHash.mruGet(path);
+ if (dir == null) {
+ dirHash.put(path, new DirListingMetadata(path, DirListingMetadata
+ .EMPTY_DIR, false));
+ }
+ }
+
+ /* Update cached parent dir. */
+ Path parentPath = path.getParent();
+ if (parentPath != null) {
+ DirListingMetadata parent = dirHash.mruGet(parentPath);
+ if (parent == null) {
+ /* Track this new file's listing in parent. Parent is not
+ * authoritative, since there may be other items in it we don't know
+ * about. */
+ parent = new DirListingMetadata(parentPath,
+ DirListingMetadata.EMPTY_DIR, false);
+ dirHash.put(parentPath, parent);
+ }
+ parent.put(status);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void put(DirListingMetadata meta) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("put dirMeta {}", meta.prettyPrint());
+ }
+ dirHash.put(standardize(meta.getPath()), meta);
+ }
+
+ public synchronized void put(Collection<PathMetadata> metas) throws
+ IOException {
+ for (PathMetadata meta : metas) {
+ put(meta);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ if (dirHash != null) {
+ dirHash.clear();
+ }
+ }
+
+ @Override
+ public synchronized void prune(long modTime) throws IOException {
+ Iterator<Map.Entry<Path, PathMetadata>> files =
+ fileHash.entrySet().iterator();
+ while (files.hasNext()) {
+ Map.Entry<Path, PathMetadata> entry = files.next();
+ if (expired(entry.getValue().getFileStatus(), modTime)) {
+ files.remove();
+ }
+ }
+ Iterator<Map.Entry<Path, DirListingMetadata>> dirs =
+ dirHash.entrySet().iterator();
+ while (dirs.hasNext()) {
+ Map.Entry<Path, DirListingMetadata> entry = dirs.next();
+ Path path = entry.getKey();
+ DirListingMetadata metadata = entry.getValue();
+ Collection<PathMetadata> oldChildren = metadata.getListing();
+ Collection<PathMetadata> newChildren = new LinkedList<>();
+
+ for (PathMetadata child : oldChildren) {
+ FileStatus status = child.getFileStatus();
+ if (!expired(status, modTime)) {
+ newChildren.add(child);
+ }
+ }
+ if (newChildren.size() != oldChildren.size()) {
+ dirHash.put(path, new DirListingMetadata(path, newChildren, false));
+ if (!path.isRoot()) {
+ DirListingMetadata parent = dirHash.get(path.getParent());
+ if (parent != null) {
+ parent.setAuthoritative(false);
+ }
+ }
+ }
+ }
+ }
+
+ private boolean expired(FileStatus status, long expiry) {
+ // Note: S3 doesn't track modification time on directories, so for
+ // consistency with the DynamoDB implementation we ignore that here
+ return status.getModificationTime() < expiry && !status.isDirectory();
+ }
+
+ @VisibleForTesting
+ static <T> void deleteHashByAncestor(Path ancestor, Map<Path, T> hash,
+ boolean tombstone) {
+ for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();
+ it.hasNext();) {
+ Map.Entry<Path, T> entry = it.next();
+ Path f = entry.getKey();
+ T meta = entry.getValue();
+ if (isAncestorOf(ancestor, f)) {
+ if (tombstone) {
+ if (meta instanceof PathMetadata) {
+ entry.setValue((T) PathMetadata.tombstone(f));
+ } else if (meta instanceof DirListingMetadata) {
+ it.remove();
+ } else {
+ throw new IllegalStateException("Unknown type in hash");
+ }
+ } else {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * @return true iff 'ancestor' is ancestor dir in path 'f'.
+ * All paths here are absolute. Dir does not count as its own ancestor.
+ */
+ private static boolean isAncestorOf(Path ancestor, Path f) {
+ String aStr = ancestor.toString();
+ if (!ancestor.isRoot()) {
+ aStr += "/";
+ }
+ String fStr = f.toString();
+ return (fStr.startsWith(aStr));
+ }
+
+ /**
+ * Update fileHash and dirHash to reflect deletion of file 'f'. Call with
+ * lock held.
+ */
+ private void deleteHashEntries(Path path, boolean tombstone) {
+
+ // Remove target file/dir
+ LOG.debug("delete file entry for {}", path);
+ if (tombstone) {
+ fileHash.put(path, PathMetadata.tombstone(path));
+ } else {
+ fileHash.remove(path);
+ }
+
+ // Update this and parent dir listing, if any
+
+ /* If this path is a dir, remove its listing */
+ LOG.debug("removing listing of {}", path);
+
+ dirHash.remove(path);
+
+ /* Remove this path from parent's dir listing */
+ Path parent = path.getParent();
+ if (parent != null) {
+ DirListingMetadata dir = dirHash.get(parent);
+ if (dir != null) {
+ LOG.debug("removing parent's entry for {} ", path);
+ if (tombstone) {
+ dir.markDeleted(path);
+ } else {
+ dir.remove(path);
+ }
+ }
+ }
+ }
+
+ /**
+ * Return a "standardized" version of a path so we always have a consistent
+ * hash value. Also asserts the path is absolute, and contains host
+ * component.
+ * @param p input Path
+ * @return standardized version of Path, suitable for hash key
+ */
+ private Path standardize(Path p) {
+ Preconditions.checkArgument(p.isAbsolute(), "Path must be absolute");
+ URI uri = p.toUri();
+ if (uriHost != null) {
+ Preconditions.checkArgument(!isEmpty(uri.getHost()));
+ }
+ return p;
+ }
+
+ private static boolean isEmpty(String s) {
+ return (s == null || s.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
new file mode 100644
index 0000000..e355095
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * LinkedHashMap that implements a maximum size and LRU eviction policy.
+ */
+public class LruHashMap<K, V> extends LinkedHashMap<K, V> {
+ private final int maxSize;
+ public LruHashMap(int initialCapacity, int maxSize) {
+ super(initialCapacity);
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ return size() > maxSize;
+ }
+
+ /**
+ * get() plus side-effect of making the element Most Recently Used.
+ * @param key lookup key
+ * @return value
+ */
+
+ public V mruGet(K key) {
+ V val = remove(key);
+ if (val != null) {
+ put(key, val);
+ }
+ return val;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
new file mode 100644
index 0000000..dd8077b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@code MetadataStore} defines the set of operations that any metadata store
+ * implementation must provide. Note that all {@link Path} objects provided
+ * to methods must be absolute, not relative paths.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface MetadataStore extends Closeable {
+
+ /**
+ * Performs one-time initialization of the metadata store.
+ *
+ * @param fs {@code FileSystem} associated with the MetadataStore
+ * @throws IOException if there is an error
+ */
+ void initialize(FileSystem fs) throws IOException;
+
+ /**
+ * Performs one-time initialization of the metadata store via configuration.
+ * @see #initialize(FileSystem)
+ * @param conf Configuration.
+ * @throws IOException if there is an error
+ */
+ void initialize(Configuration conf) throws IOException;
+
+ /**
+ * Deletes exactly one path, leaving a tombstone to prevent lingering,
+ * inconsistent copies of it from being listed.
+ *
+ * @param path the path to delete
+ * @throws IOException if there is an error
+ */
+ void delete(Path path) throws IOException;
+
+ /**
+ * Removes the record of exactly one path. Does not leave a tombstone (see
+ * {@link MetadataStore#delete(Path)}. It is currently intended for testing
+ * only, and a need to use it as part of normal FileSystem usage is not
+ * anticipated.
+ *
+ * @param path the path to delete
+ * @throws IOException if there is an error
+ */
+ @VisibleForTesting
+ void forgetMetadata(Path path) throws IOException;
+
+ /**
+ * Deletes the entire sub-tree rooted at the given path, leaving tombstones
+ * to prevent lingering, inconsistent copies of it from being listed.
+ *
+ * In addition to affecting future calls to {@link #get(Path)},
+ * implementations must also update any stored {@code DirListingMetadata}
+ * objects which track the parent of this file.
+ *
+ * @param path the root of the sub-tree to delete
+ * @throws IOException if there is an error
+ */
+ void deleteSubtree(Path path) throws IOException;
+
+ /**
+ * Gets metadata for a path.
+ *
+ * @param path the path to get
+ * @return metadata for {@code path}, {@code null} if not found
+ * @throws IOException if there is an error
+ */
+ PathMetadata get(Path path) throws IOException;
+
+ /**
+ * Gets metadata for a path. Alternate method that includes a hint
+ * whether or not the MetadataStore should do work to compute the value for
+ * {@link PathMetadata#isEmptyDirectory()}. Since determining emptiness
+ * may be an expensive operation, this can save wasted work.
+ *
+ * @param path the path to get
+ * @param wantEmptyDirectoryFlag Set to true to give a hint to the
+ * MetadataStore that it should try to compute the empty directory flag.
+ * @return metadata for {@code path}, {@code null} if not found
+ * @throws IOException if there is an error
+ */
+ PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+ throws IOException;
+
+ /**
+ * Lists metadata for all direct children of a path.
+ *
+ * @param path the path to list
+ * @return metadata for all direct children of {@code path} which are being
+ * tracked by the MetadataStore, or {@code null} if the path was not found
+ * in the MetadataStore.
+ * @throws IOException if there is an error
+ */
+ DirListingMetadata listChildren(Path path) throws IOException;
+
+ /**
+ * Record the effects of a {@link FileSystem#rename(Path, Path)} in the
+ * MetadataStore. Clients provide explicit enumeration of the affected
+ * paths (recursively), before and after the rename.
+ *
+ * This operation is not atomic, unless specific implementations claim
+ * otherwise.
+ *
+ * On the need to provide an enumeration of directory trees instead of just
+ * source and destination paths:
+ * Since a MetadataStore does not have to track all metadata for the
+ * underlying storage system, and a new MetadataStore may be created on an
+ * existing underlying filesystem, this move() may be the first time the
+ * MetadataStore sees the affected paths. Therefore, simply providing src
+ * and destination paths may not be enough to record the deletions (under
+ * src path) and creations (at destination) that are happening during the
+ * rename().
+ *
+ * @param pathsToDelete Collection of all paths that were removed from the
+ * source directory tree of the move.
+ * @param pathsToCreate Collection of all PathMetadata for the new paths
+ * that were created at the destination of the rename
+ * ().
+ * @throws IOException if there is an error
+ */
+ void move(Collection<Path> pathsToDelete,
+ Collection<PathMetadata> pathsToCreate) throws IOException;
+
+ /**
+ * Saves metadata for exactly one path.
+ *
+ * Implementations may pre-create all the path's ancestors automatically.
+ * Implementations must update any {@code DirListingMetadata} objects which
+ * track the immediate parent of this file.
+ *
+ * @param meta the metadata to save
+ * @throws IOException if there is an error
+ */
+ void put(PathMetadata meta) throws IOException;
+
+ /**
+ * Saves metadata for any number of paths.
+ *
+ * Semantics are otherwise the same as single-path puts.
+ *
+ * @param metas the metadata to save
+ * @throws IOException if there is an error
+ */
+ void put(Collection<PathMetadata> metas) throws IOException;
+
+ /**
+ * Save directory listing metadata. Callers may save a partial directory
+ * listing for a given path, or may store a complete and authoritative copy
+ * of the directory listing. {@code MetadataStore} implementations may
+ * subsequently keep track of all modifications to the directory contents at
+ * this path, and return authoritative results from subsequent calls to
+ * {@link #listChildren(Path)}. See {@link DirListingMetadata}.
+ *
+ * Any authoritative results returned are only authoritative for the scope
+ * of the {@code MetadataStore}: A per-process {@code MetadataStore}, for
+ * example, would only show results visible to that process, potentially
+ * missing metadata updates (create, delete) made to the same path by
+ * another process.
+ *
+ * @param meta Directory listing metadata.
+ * @throws IOException if there is an error
+ */
+ void put(DirListingMetadata meta) throws IOException;
+
+ /**
+ * Destroy all resources associated with the metadata store.
+ *
+ * The destroyed resources can be DynamoDB tables, MySQL databases/tables, or
+ * HDFS directories. Any operations after calling this method may possibly
+ * fail.
+ *
+ * This operation is idempotent.
+ *
+ * @throws IOException if there is an error
+ */
+ void destroy() throws IOException;
+
+ /**
+ * Clear any metadata older than a specified time from the repository.
+ * Implementations MUST clear file metadata, and MAY clear directory metadata
+ * (s3a itself does not track modification time for directories).
+ * Implementations may also choose to throw UnsupportedOperationException
+ * istead. Note that modification times should be in UTC, as returned by
+ * System.currentTimeMillis at the time of modification.
+ *
+ * @param modTime Oldest modification time to allow
+ * @throws IOException if there is an error
+ * @throws UnsupportedOperationException if not implemented
+ */
+ void prune(long modTime) throws IOException, UnsupportedOperationException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
new file mode 100644
index 0000000..378d109
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * {@code MetadataStoreListFilesIterator} is a {@link RemoteIterator} that
+ * is similar to {@code DescendantsIterator} but does not return directories
+ * that have (or may have) children, and will also provide access to the set of
+ * tombstones to allow recently deleted S3 objects to be filtered out from a
+ * corresponding request. In other words, it returns tombstones and the same
+ * set of objects that should exist in S3: empty directories, and files, and not
+ * other directories whose existence is inferred therefrom.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * /dir1
+ * |-- dir2
+ * | |-- file1
+ * | `-- file2
+ * `-- dir3
+ * |-- dir4
+ * | `-- file3
+ * |-- dir5
+ * | `-- file4
+ * `-- dir6
+ * </pre>
+ *
+ * Consider this code sample:
+ * <pre>
+ * final PathMetadata dir1 = get(new Path("/dir1"));
+ * for (MetadataStoreListFilesIterator files =
+ * new MetadataStoreListFilesIterator(dir1); files.hasNext(); ) {
+ * final FileStatus status = files.next().getFileStatus();
+ * System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F',
+ * status.getPath());
+ * }
+ * </pre>
+ *
+ * The output is:
+ * <pre>
+ * F /dir1/dir2/file1
+ * F /dir1/dir2/file2
+ * F /dir1/dir3/dir4/file3
+ * F /dir1/dir3/dir5/file4
+ * D /dir1/dir3/dir6
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MetadataStoreListFilesIterator implements
+ RemoteIterator<FileStatus> {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ MetadataStoreListFilesIterator.class);
+
+ private final boolean allowAuthoritative;
+ private final MetadataStore metadataStore;
+ private final Set<Path> tombstones = new HashSet<>();
+ private Iterator<FileStatus> leafNodesIterator = null;
+
+ public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta,
+ boolean allowAuthoritative) throws IOException {
+ Preconditions.checkNotNull(ms);
+ this.metadataStore = ms;
+ this.allowAuthoritative = allowAuthoritative;
+ prefetch(meta);
+ }
+
+ private void prefetch(PathMetadata meta) throws IOException {
+ final Queue<PathMetadata> queue = new LinkedList<>();
+ final Collection<FileStatus> leafNodes = new ArrayList<>();
+
+ if (meta != null) {
+ final Path path = meta.getFileStatus().getPath();
+ if (path.isRoot()) {
+ DirListingMetadata rootListing = metadataStore.listChildren(path);
+ if (rootListing != null) {
+ tombstones.addAll(rootListing.listTombstones());
+ queue.addAll(rootListing.withoutTombstones().getListing());
+ }
+ } else {
+ queue.add(meta);
+ }
+ }
+
+ while(!queue.isEmpty()) {
+ PathMetadata nextMetadata = queue.poll();
+ FileStatus nextStatus = nextMetadata.getFileStatus();
+ if (nextStatus.isFile()) {
+ // All files are leaf nodes by definition
+ leafNodes.add(nextStatus);
+ continue;
+ }
+ if (nextStatus.isDirectory()) {
+ final Path path = nextStatus.getPath();
+ DirListingMetadata children = metadataStore.listChildren(path);
+ if (children != null) {
+ tombstones.addAll(children.listTombstones());
+ Collection<PathMetadata> liveChildren =
+ children.withoutTombstones().getListing();
+ if (!liveChildren.isEmpty()) {
+ // If it's a directory, has children, not all deleted, then we
+ // add the children to the queue and move on to the next node
+ queue.addAll(liveChildren);
+ continue;
+ } else if (allowAuthoritative && children.isAuthoritative()) {
+ leafNodes.add(nextStatus);
+ }
+ }
+ }
+ // Directories that *might* be empty are ignored for now, since we
+ // cannot confirm that they are empty without incurring other costs.
+ // Users of this class can still discover empty directories via S3's
+ // fake directories, subject to the same consistency semantics as before.
+ // The only other possibility is a symlink, which is unsupported on S3A.
+ }
+ leafNodesIterator = leafNodes.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return leafNodesIterator.hasNext();
+ }
+
+ @Override
+ public FileStatus next() {
+ return leafNodesIterator.next();
+ }
+
+ public Set<Path> listTombstones() {
+ return tombstones;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
new file mode 100644
index 0000000..08ae89e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A no-op implementation of MetadataStore. Clients that use this
+ * implementation should behave the same as they would without any
+ * MetadataStore.
+ */
+public class NullMetadataStore implements MetadataStore {
+
+ @Override
+ public void initialize(FileSystem fs) throws IOException {
+ }
+
+ @Override
+ public void initialize(Configuration conf) throws IOException {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void delete(Path path) throws IOException {
+ }
+
+ @Override
+ public void forgetMetadata(Path path) throws IOException {
+ }
+
+ @Override
+ public void deleteSubtree(Path path) throws IOException {
+ }
+
+ @Override
+ public PathMetadata get(Path path) throws IOException {
+ return null;
+ }
+
+ @Override
+ public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public DirListingMetadata listChildren(Path path) throws IOException {
+ return null;
+ }
+
+ @Override
+ public void move(Collection<Path> pathsToDelete,
+ Collection<PathMetadata> pathsToCreate) throws IOException {
+ }
+
+ @Override
+ public void put(PathMetadata meta) throws IOException {
+ }
+
+ @Override
+ public void put(Collection<PathMetadata> meta) throws IOException {
+ }
+
+ @Override
+ public void put(DirListingMetadata meta) throws IOException {
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ }
+
+ @Override
+ public void prune(long modTime) {
+ }
+
+ @Override
+ public String toString() {
+ return "NullMetadataStore";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1afc6aa/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
new file mode 100644
index 0000000..2a0219e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Tristate;
+
+/**
+ * {@code PathMetadata} models path metadata stored in the
+ * {@link MetadataStore}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PathMetadata {
+
+ private final FileStatus fileStatus;
+ private Tristate isEmptyDirectory;
+ private boolean isDeleted;
+
+ /**
+ * Create a tombstone from the current time.
+ * @param path path to tombstone
+ * @return the entry.
+ */
+ public static PathMetadata tombstone(Path path) {
+ long now = System.currentTimeMillis();
+ FileStatus status = new FileStatus(0, false, 0, 0, now, path);
+ return new PathMetadata(status, Tristate.UNKNOWN, true);
+ }
+
+ /**
+ * Creates a new {@code PathMetadata} containing given {@code FileStatus}.
+ * @param fileStatus file status containing an absolute path.
+ */
+ public PathMetadata(FileStatus fileStatus) {
+ this(fileStatus, Tristate.UNKNOWN);
+ }
+
+ public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir) {
+ this(fileStatus, isEmptyDir, false);
+ }
+
+ public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean
+ isDeleted) {
+ Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null");
+ Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" +
+ " non-null");
+ Preconditions.checkArgument(fileStatus.getPath().isAbsolute(), "path must" +
+ " be absolute");
+ this.fileStatus = fileStatus;
+ this.isEmptyDirectory = isEmptyDir;
+ this.isDeleted = isDeleted;
+ }
+
+ /**
+ * @return {@code FileStatus} contained in this {@code PathMetadata}.
+ */
+ public final FileStatus getFileStatus() {
+ return fileStatus;
+ }
+
+ /**
+ * Query if a directory is empty.
+ * @return Tristate.TRUE if this is known to be an empty directory,
+ * Tristate.FALSE if known to not be empty, and Tristate.UNKNOWN if the
+ * MetadataStore does have enough information to determine either way.
+ */
+ public Tristate isEmptyDirectory() {
+ return isEmptyDirectory;
+ }
+
+ void setIsEmptyDirectory(Tristate isEmptyDirectory) {
+ this.isEmptyDirectory = isEmptyDirectory;
+ }
+
+ public boolean isDeleted() {
+ return isDeleted;
+ }
+
+ void setIsDeleted(boolean isDeleted) {
+ this.isDeleted = isDeleted;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PathMetadata)) {
+ return false;
+ }
+ return this.fileStatus.equals(((PathMetadata)o).fileStatus);
+ }
+
+ @Override
+ public int hashCode() {
+ return fileStatus.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "PathMetadata{" +
+ "fileStatus=" + fileStatus +
+ "; isEmptyDirectory=" + isEmptyDirectory +
+ "; isDeleted=" + isDeleted +
+ '}';
+ }
+
+ /**
+ * Log contents to supplied StringBuilder in a pretty fashion.
+ * @param sb target StringBuilder
+ */
+ public void prettyPrint(StringBuilder sb) {
+ sb.append(String.format("%-5s %-20s %-7d %-8s %-6s",
+ fileStatus.isDirectory() ? "dir" : "file",
+ fileStatus.getPath().toString(), fileStatus.getLen(),
+ isEmptyDirectory.name(), isDeleted));
+ sb.append(fileStatus);
+ }
+
+ public String prettyPrint() {
+ StringBuilder sb = new StringBuilder();
+ prettyPrint(sb);
+ return sb.toString();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org