You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/09/01 20:01:53 UTC

[65/74] [abbrv] hadoop git commit: HADOOP-13345 HS3Guard: Improved Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
new file mode 100644
index 0000000..66ada49
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+
+/**
+ * Interface to create a DynamoDB client.
+ *
+ * Implementation should be configured for setting and getting configuration.
+ */
+@InterfaceAudience.Private
+public interface DynamoDBClientFactory extends Configurable {
+  Logger LOG = LoggerFactory.getLogger(DynamoDBClientFactory.class);
+
+  /**
+   * Create a DynamoDB client object from configuration.
+   *
+   * The DynamoDB client to create does not have to relate to any S3 buckets.
+   * All information needed to create a DynamoDB client is from the hadoop
+   * configuration. Specially, if the region is not configured, it will use the
+   * provided region parameter. If region is neither configured nor provided,
+   * it will indicate an error.
+   *
+   * @param defaultRegion the default region of the AmazonDynamoDB client
+   * @return a new DynamoDB client
+   * @throws IOException if any IO error happens
+   */
+  AmazonDynamoDB createDynamoDBClient(String defaultRegion) throws IOException;
+
+  /**
+   * The default implementation for creating an AmazonDynamoDB.
+   */
+  class DefaultDynamoDBClientFactory extends Configured
+      implements DynamoDBClientFactory {
+    @Override
+    public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
+        throws IOException {
+      Preconditions.checkNotNull(getConf(),
+          "Should have been configured before usage");
+
+      final Configuration conf = getConf();
+      final AWSCredentialsProvider credentials =
+          createAWSCredentialProviderSet(null, conf);
+      final ClientConfiguration awsConf =
+          DefaultS3ClientFactory.createAwsConf(conf);
+
+      final String region = getRegion(conf, defaultRegion);
+      LOG.debug("Creating DynamoDB client in region {}", region);
+
+      return AmazonDynamoDBClientBuilder.standard()
+          .withCredentials(credentials)
+          .withClientConfiguration(awsConf)
+          .withRegion(region)
+          .build();
+    }
+
+    /**
+     * Helper method to get and validate the AWS region for DynamoDBClient.
+     *
+     * @param conf configuration
+     * @param defaultRegion the default region
+     * @return configured region or else the provided default region
+     * @throws IOException if the region is not valid
+     */
+    static String getRegion(Configuration conf, String defaultRegion)
+        throws IOException {
+      String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+      if (StringUtils.isEmpty(region)) {
+        region = defaultRegion;
+      }
+      try {
+        Regions.fromName(region);
+      } catch (IllegalArgumentException | NullPointerException e) {
+        throw new IOException("Invalid region specified: " + region + "; " +
+            "Region can be configured with " + S3GUARD_DDB_REGION_KEY + ": " +
+            validRegionsString());
+      }
+      return region;
+    }
+
+    private static String validRegionsString() {
+      final String delimiter = ", ";
+      Regions[] regions = Regions.values();
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < regions.length; i++) {
+        if (i > 0) {
+          sb.append(delimiter);
+        }
+        sb.append(regions[i].getName());
+      }
+      return sb.toString();
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
new file mode 100644
index 0000000..1bed03d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -0,0 +1,1010 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.ItemCollection;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
+import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.Tristate;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
+
+/**
+ * DynamoDBMetadataStore is a {@link MetadataStore} that persists
+ * file system metadata to DynamoDB.
+ *
+ * The current implementation uses a schema consisting of a single table.  The
+ * name of the table can be configured by config key
+ * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_NAME_KEY}.
+ * By default, it matches the name of the S3 bucket.  Each item in the table
+ * represents a single directory or file.  Its path is split into separate table
+ * attributes:
+ * <ul>
+ * <li> parent (absolute path of the parent, with bucket name inserted as
+ * first path component). </li>
+ * <li> child (path of that specific child, relative to parent). </li>
+ * <li> optional boolean attribute tracking whether the path is a directory.
+ *      Absence or a false value indicates the path is a file. </li>
+ * <li> optional long attribute revealing modification time of file.
+ *      This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing file length.
+ *      This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing block size of the file.
+ *      This attribute is meaningful only to file items.</li>
+ * </ul>
+ *
+ * The DynamoDB partition key is the parent, and the range key is the child.
+ *
+ * To allow multiple buckets to share the same DynamoDB table, the bucket
+ * name is treated as the root directory.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * s3a://bucket/dir1
+ * |-- dir2
+ * |   |-- file1
+ * |   `-- file2
+ * `-- dir3
+ *     |-- dir4
+ *     |   `-- file3
+ *     |-- dir5
+ *     |   `-- file4
+ *     `-- dir6
+ * </pre>
+ *
+ * This is persisted to a single DynamoDB table as:
+ *
+ * <pre>
+ * =========================================================================
+ * | parent                 | child | is_dir | mod_time | len |     ...    |
+ * =========================================================================
+ * | /bucket                | dir1  | true   |          |     |            |
+ * | /bucket/dir1           | dir2  | true   |          |     |            |
+ * | /bucket/dir1           | dir3  | true   |          |     |            |
+ * | /bucket/dir1/dir2      | file1 |        |   100    | 111 |            |
+ * | /bucket/dir1/dir2      | file2 |        |   200    | 222 |            |
+ * | /bucket/dir1/dir3      | dir4  | true   |          |     |            |
+ * | /bucket/dir1/dir3      | dir5  | true   |          |     |            |
+ * | /bucket/dir1/dir3/dir4 | file3 |        |   300    | 333 |            |
+ * | /bucket/dir1/dir3/dir5 | file4 |        |   400    | 444 |            |
+ * | /bucket/dir1/dir3      | dir6  | true   |          |     |            |
+ * =========================================================================
+ * </pre>
+ *
+ * This choice of schema is efficient for read access patterns.
+ * {@link #get(Path)} can be served from a single item lookup.
+ * {@link #listChildren(Path)} can be served from a query against all rows
+ * matching the parent (the partition key) and the returned list is guaranteed
+ * to be sorted by child (the range key).  Tracking whether or not a path is a
+ * directory helps prevent unnecessary queries during traversal of an entire
+ * sub-tree.
+ *
+ * Some mutating operations, notably {@link #deleteSubtree(Path)} and
+ * {@link #move(Collection, Collection)}, are less efficient with this schema.
+ * They require mutating multiple items in the DynamoDB table.
+ *
+ * By default, DynamoDB access is performed within the same AWS region as
+ * the S3 bucket that hosts the S3A instance.  During initialization, it checks
+ * the location of the S3 bucket and creates a DynamoDB client connected to the
+ * same region. The region may also be set explicitly by setting the config
+ * parameter {@code fs.s3a.s3guard.ddb.region} to the corresponding region.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DynamoDBMetadataStore implements MetadataStore {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DynamoDBMetadataStore.class);
+
+  /** parent/child name to use in the version marker. */
+  public static final String VERSION_MARKER = "../VERSION";
+
+  /** Current version number. */
+  public static final int VERSION = 100;
+
+  /** Error: version marker not found in table. */
+  public static final String E_NO_VERSION_MARKER
+      = "S3Guard table lacks version marker.";
+
+  /** Error: version mismatch. */
+  public static final String E_INCOMPATIBLE_VERSION
+      = "Database table is from an incompatible S3Guard version.";
+
+  /** Initial delay for retries when batched operations get throttled by
+   * DynamoDB. Value is {@value} msec. */
+  public static final long MIN_RETRY_SLEEP_MSEC = 100;
+
+  private static ValueMap deleteTrackingValueMap =
+      new ValueMap().withBoolean(":false", false);
+
+  private DynamoDB dynamoDB;
+  private String region;
+  private Table table;
+  private String tableName;
+  private Configuration conf;
+  private String username;
+
+  private RetryPolicy dataAccessRetryPolicy;
+  private S3AInstrumentation.S3GuardInstrumentation instrumentation;
+
+  /**
+   * A utility function to create DynamoDB instance.
+   * @param conf the file system configuration
+   * @param s3Region region of the associated S3 bucket (if any).
+   * @return DynamoDB instance.
+   * @throws IOException I/O error.
+   */
+  private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
+      throws IOException {
+    Preconditions.checkNotNull(conf);
+    final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
+        S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+        S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
+        DynamoDBClientFactory.class);
+    LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
+    final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
+        .createDynamoDBClient(s3Region);
+    return new DynamoDB(dynamoDBClient);
+  }
+
+  @Override
+  public void initialize(FileSystem fs) throws IOException {
+    Preconditions.checkArgument(fs instanceof S3AFileSystem,
+        "DynamoDBMetadataStore only supports S3A filesystem.");
+    final S3AFileSystem s3afs = (S3AFileSystem) fs;
+    instrumentation = s3afs.getInstrumentation().getS3GuardInstrumentation();
+    final String bucket = s3afs.getBucket();
+    String confRegion = s3afs.getConf().getTrimmed(S3GUARD_DDB_REGION_KEY);
+    if (!StringUtils.isEmpty(confRegion)) {
+      region = confRegion;
+      LOG.debug("Overriding S3 region with configured DynamoDB region: {}",
+          region);
+    } else {
+      region = s3afs.getBucketLocation();
+      LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
+    }
+    username = s3afs.getUsername();
+    conf = s3afs.getConf();
+    dynamoDB = createDynamoDB(conf, region);
+
+    // use the bucket as the DynamoDB table name if not specified in config
+    tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
+    setMaxRetries(conf);
+
+    initTable();
+
+    instrumentation.initialized();
+  }
+
+  /**
+   * Performs one-time initialization of the metadata store via configuration.
+   *
+   * This initialization depends on the configuration object to get AWS
+   * credentials, DynamoDBFactory implementation class, DynamoDB endpoints,
+   * DynamoDB table names etc. After initialization, this metadata store does
+   * not explicitly relate to any S3 bucket, which be nonexistent.
+   *
+   * This is used to operate the metadata store directly beyond the scope of the
+   * S3AFileSystem integration, e.g. command line tools.
+   * Generally, callers should use {@link #initialize(FileSystem)}
+   * with an initialized {@code S3AFileSystem} instance.
+   *
+   * Without a filesystem to act as a reference point, the configuration itself
+   * must declare the table name and region in the
+   * {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and
+   * {@link Constants#S3GUARD_DDB_REGION_KEY} respectively.
+   *
+   * @see #initialize(FileSystem)
+   * @throws IOException if there is an error
+   * @throws IllegalArgumentException if the configuration is incomplete
+   */
+  @Override
+  public void initialize(Configuration config) throws IOException {
+    conf = config;
+    // use the bucket as the DynamoDB table name if not specified in config
+    tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
+    Preconditions.checkArgument(!StringUtils.isEmpty(tableName),
+        "No DynamoDB table name configured");
+    region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+    Preconditions.checkArgument(!StringUtils.isEmpty(region),
+        "No DynamoDB region configured");
+    dynamoDB = createDynamoDB(conf, region);
+
+    username = UserGroupInformation.getCurrentUser().getShortUserName();
+    setMaxRetries(conf);
+
+    initTable();
+  }
+
+  /**
+   * Set retry policy. This is driven by the value of
+   * {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff
+   * between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds.
+   * @param config
+   */
+  private void setMaxRetries(Configuration config) {
+    int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES,
+        S3GUARD_DDB_MAX_RETRIES_DEFAULT);
+    dataAccessRetryPolicy = RetryPolicies
+        .exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC,
+            TimeUnit.MILLISECONDS);
+  }
+
+  @Override
+  public void delete(Path path) throws IOException {
+    innerDelete(path, true);
+  }
+
+  @Override
+  public void forgetMetadata(Path path) throws IOException {
+    innerDelete(path, false);
+  }
+
+  /**
+   * Inner delete option, action based on the {@code tombstone} flag.
+   * No tombstone: delete the entry. Tombstone: create a tombstone entry.
+   * There is no check as to whether the entry exists in the table first.
+   * @param path path to delete
+   * @param tombstone flag to create a tombstone marker
+   * @throws IOException I/O error.
+   */
+  private void innerDelete(Path path, boolean tombstone)
+      throws IOException {
+    path = checkPath(path);
+    LOG.debug("Deleting from table {} in region {}: {}",
+        tableName, region, path);
+
+    // deleting nonexistent item consumes 1 write capacity; skip it
+    if (path.isRoot()) {
+      LOG.debug("Skip deleting root directory as it does not exist in table");
+      return;
+    }
+
+    try {
+      if (tombstone) {
+        Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
+            PathMetadata.tombstone(path));
+        table.putItem(item);
+      } else {
+        table.deleteItem(pathToKey(path));
+      }
+    } catch (AmazonClientException e) {
+      throw translateException("delete", path, e);
+    }
+  }
+
+  @Override
+  public void deleteSubtree(Path path) throws IOException {
+    path = checkPath(path);
+    LOG.debug("Deleting subtree from table {} in region {}: {}",
+        tableName, region, path);
+
+    final PathMetadata meta = get(path);
+    if (meta == null || meta.isDeleted()) {
+      LOG.debug("Subtree path {} does not exist; this will be a no-op", path);
+      return;
+    }
+
+    for (DescendantsIterator desc = new DescendantsIterator(this, meta);
+         desc.hasNext();) {
+      innerDelete(desc.next().getPath(), true);
+    }
+  }
+
+  private Item getConsistentItem(PrimaryKey key) {
+    final GetItemSpec spec = new GetItemSpec()
+        .withPrimaryKey(key)
+        .withConsistentRead(true); // strictly consistent read
+    return table.getItem(spec);
+  }
+
+  @Override
+  public PathMetadata get(Path path) throws IOException {
+    return get(path, false);
+  }
+
+  @Override
+  public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+      throws IOException {
+    path = checkPath(path);
+    LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
+
+    try {
+      final PathMetadata meta;
+      if (path.isRoot()) {
+        // Root does not persist in the table
+        meta = new PathMetadata(makeDirStatus(username, path));
+      } else {
+        final Item item = getConsistentItem(pathToKey(path));
+        meta = itemToPathMetadata(item, username);
+        LOG.debug("Get from table {} in region {} returning for {}: {}",
+            tableName, region, path, meta);
+      }
+
+      if (wantEmptyDirectoryFlag && meta != null) {
+        final FileStatus status = meta.getFileStatus();
+        // for directory, we query its direct children to determine isEmpty bit
+        if (status.isDirectory()) {
+          final QuerySpec spec = new QuerySpec()
+              .withHashKey(pathToParentKeyAttribute(path))
+              .withConsistentRead(true)
+              .withFilterExpression(IS_DELETED + " = :false")
+              .withValueMap(deleteTrackingValueMap);
+          final ItemCollection<QueryOutcome> items = table.query(spec);
+          boolean hasChildren = items.iterator().hasNext();
+          // When this class has support for authoritative
+          // (fully-cached) directory listings, we may also be able to answer
+          // TRUE here.  Until then, we don't know if we have full listing or
+          // not, thus the UNKNOWN here:
+          meta.setIsEmptyDirectory(
+              hasChildren ? Tristate.FALSE : Tristate.UNKNOWN);
+        }
+      }
+
+      return meta;
+    } catch (AmazonClientException e) {
+      throw translateException("get", path, e);
+    }
+  }
+
+  /**
+   * Make a FileStatus object for a directory at given path.  The FileStatus
+   * only contains what S3A needs, and omits mod time since S3A uses its own
+   * implementation which returns current system time.
+   * @param owner  username of owner
+   * @param path   path to dir
+   * @return new FileStatus
+   */
+  private FileStatus makeDirStatus(String owner, Path path) {
+    return new FileStatus(0, true, 1, 0, 0, 0, null,
+            owner, null, path);
+  }
+
+  @Override
+  public DirListingMetadata listChildren(Path path) throws IOException {
+    path = checkPath(path);
+    LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
+
+    // find the children in the table
+    try {
+      final QuerySpec spec = new QuerySpec()
+          .withHashKey(pathToParentKeyAttribute(path))
+          .withConsistentRead(true); // strictly consistent read
+      final ItemCollection<QueryOutcome> items = table.query(spec);
+
+      final List<PathMetadata> metas = new ArrayList<>();
+      for (Item item : items) {
+        PathMetadata meta = itemToPathMetadata(item, username);
+        metas.add(meta);
+      }
+      LOG.trace("Listing table {} in region {} for {} returning {}",
+          tableName, region, path, metas);
+
+      return (metas.isEmpty() && get(path) == null)
+          ? null
+          : new DirListingMetadata(path, metas, false);
+    } catch (AmazonClientException e) {
+      // failure, including the path not being present
+      throw translateException("listChildren", path, e);
+    }
+  }
+
+  // build the list of all parent entries.
+  Collection<PathMetadata> completeAncestry(
+      Collection<PathMetadata> pathsToCreate) {
+    // Key on path to allow fast lookup
+    Map<Path, PathMetadata> ancestry = new HashMap<>();
+
+    for (PathMetadata meta : pathsToCreate) {
+      Preconditions.checkArgument(meta != null);
+      Path path = meta.getFileStatus().getPath();
+      if (path.isRoot()) {
+        break;
+      }
+      ancestry.put(path, meta);
+      Path parent = path.getParent();
+      while (!parent.isRoot() && !ancestry.containsKey(parent)) {
+        LOG.debug("auto-create ancestor path {} for child path {}",
+            parent, path);
+        final FileStatus status = makeDirStatus(parent, username);
+        ancestry.put(parent, new PathMetadata(status, Tristate.FALSE, false));
+        parent = parent.getParent();
+      }
+    }
+    return ancestry.values();
+  }
+
+  @Override
+  public void move(Collection<Path> pathsToDelete,
+      Collection<PathMetadata> pathsToCreate) throws IOException {
+    if (pathsToDelete == null && pathsToCreate == null) {
+      return;
+    }
+
+    LOG.debug("Moving paths of table {} in region {}: {} paths to delete and {}"
+        + " paths to create", tableName, region,
+        pathsToDelete == null ? 0 : pathsToDelete.size(),
+        pathsToCreate == null ? 0 : pathsToCreate.size());
+    LOG.trace("move: pathsToDelete = {}, pathsToCreate = {}", pathsToDelete,
+        pathsToCreate);
+
+    // In DynamoDBMetadataStore implementation, we assume that if a path
+    // exists, all its ancestors will also exist in the table.
+    // Following code is to maintain this invariant by putting all ancestor
+    // directories of the paths to create.
+    // ancestor paths that are not explicitly added to paths to create
+    Collection<PathMetadata> newItems = new ArrayList<>();
+    if (pathsToCreate != null) {
+      newItems.addAll(completeAncestry(pathsToCreate));
+    }
+    if (pathsToDelete != null) {
+      for (Path meta : pathsToDelete) {
+        newItems.add(PathMetadata.tombstone(meta));
+      }
+    }
+
+    try {
+      processBatchWriteRequest(null, pathMetadataToItem(newItems));
+    } catch (AmazonClientException e) {
+      throw translateException("move", (String) null, e);
+    }
+  }
+
+  /**
+   * Helper method to issue a batch write request to DynamoDB.
+   *
+   * Callers of this method should catch the {@link AmazonClientException} and
+   * translate it for better error report and easier debugging.
+   * @param keysToDelete primary keys to be deleted; can be null
+   * @param itemsToPut new items to be put; can be null
+   */
+  private void processBatchWriteRequest(PrimaryKey[] keysToDelete,
+      Item[] itemsToPut) throws IOException {
+    final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
+    final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
+    int count = 0;
+    while (count < totalToDelete + totalToPut) {
+      final TableWriteItems writeItems = new TableWriteItems(tableName);
+      int numToDelete = 0;
+      if (keysToDelete != null
+          && count < totalToDelete) {
+        numToDelete = Math.min(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT,
+            totalToDelete - count);
+        writeItems.withPrimaryKeysToDelete(
+            Arrays.copyOfRange(keysToDelete, count, count + numToDelete));
+        count += numToDelete;
+      }
+
+      if (numToDelete < S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT
+          && itemsToPut != null
+          && count < totalToDelete + totalToPut) {
+        final int numToPut = Math.min(
+            S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT - numToDelete,
+            totalToDelete + totalToPut - count);
+        final int index = count - totalToDelete;
+        writeItems.withItemsToPut(
+            Arrays.copyOfRange(itemsToPut, index, index + numToPut));
+        count += numToPut;
+      }
+
+      BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
+      // Check for unprocessed keys in case of exceeding provisioned throughput
+      Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
+      int retryCount = 0;
+      while (unprocessed.size() > 0) {
+        retryBackoff(retryCount++);
+        res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
+        unprocessed = res.getUnprocessedItems();
+      }
+    }
+  }
+
+  /**
+   * Put the current thread to sleep to implement exponential backoff
+   * depending on retryCount.  If max retries are exceeded, throws an
+   * exception instead.
+   * @param retryCount number of retries so far
+   * @throws IOException when max retryCount is exceeded.
+   */
+  private void retryBackoff(int retryCount) throws IOException {
+    try {
+      // Our RetryPolicy ignores everything but retryCount here.
+      RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
+          retryCount, 0, true);
+      if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+        throw new IOException(
+            String.format("Max retries exceeded (%d) for DynamoDB",
+                retryCount));
+      } else {
+        LOG.debug("Sleeping {} msec before next retry", action.delayMillis);
+        Thread.sleep(action.delayMillis);
+      }
+    } catch (Exception e) {
+      throw new IOException("Unexpected exception", e);
+    }
+  }
+
+  @Override
+  public void put(PathMetadata meta) throws IOException {
+    // For a deeply nested path, this method will automatically create the full
+    // ancestry and save respective item in DynamoDB table.
+    // So after put operation, we maintain the invariant that if a path exists,
+    // all its ancestors will also exist in the table.
+    // For performance purpose, we generate the full paths to put and use batch
+    // write item request to save the items.
+    LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+
+    Collection<PathMetadata> wrapper = new ArrayList<>(1);
+    wrapper.add(meta);
+    put(wrapper);
+  }
+
+  @Override
+  public void put(Collection<PathMetadata> metas) throws IOException {
+    LOG.debug("Saving batch to table {} in region {}", tableName, region);
+
+    processBatchWriteRequest(null, pathMetadataToItem(completeAncestry(metas)));
+  }
+
+  /**
+   * Helper method to get full path of ancestors that are nonexistent in table.
+   */
+  private Collection<PathMetadata> fullPathsToPut(PathMetadata meta)
+      throws IOException {
+    checkPathMetadata(meta);
+    final Collection<PathMetadata> metasToPut = new ArrayList<>();
+    // root path is not persisted
+    if (!meta.getFileStatus().getPath().isRoot()) {
+      metasToPut.add(meta);
+    }
+
+    // put all its ancestors if not present; as an optimization we return at its
+    // first existent ancestor
+    Path path = meta.getFileStatus().getPath().getParent();
+    while (path != null && !path.isRoot()) {
+      final Item item = getConsistentItem(pathToKey(path));
+      if (!itemExists(item)) {
+        final FileStatus status = makeDirStatus(path, username);
+        metasToPut.add(new PathMetadata(status, Tristate.FALSE, false));
+        path = path.getParent();
+      } else {
+        break;
+      }
+    }
+    return metasToPut;
+  }
+
+  private boolean itemExists(Item item) {
+    if (item == null) {
+      return false;
+    }
+    if (item.hasAttribute(IS_DELETED) &&
+        item.getBoolean(IS_DELETED)) {
+      return false;
+    }
+    return true;
+  }
+
+  /** Create a directory FileStatus using current system time as mod time. */
+  static FileStatus makeDirStatus(Path f, String owner) {
+    return  new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+        null, owner, owner, f);
+  }
+
+  @Override
+  public void put(DirListingMetadata meta) throws IOException {
+    LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+
+    // directory path
+    PathMetadata p = new PathMetadata(makeDirStatus(meta.getPath(), username),
+        meta.isEmpty(), false);
+
+    // First add any missing ancestors...
+    final Collection<PathMetadata> metasToPut = fullPathsToPut(p);
+
+    // next add all children of the directory
+    metasToPut.addAll(meta.getListing());
+
+    try {
+      processBatchWriteRequest(null, pathMetadataToItem(metasToPut));
+    } catch (AmazonClientException e) {
+      throw translateException("put", (String) null, e);
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    if (instrumentation != null) {
+      instrumentation.storeClosed();
+    }
+    if (dynamoDB != null) {
+      LOG.debug("Shutting down {}", this);
+      dynamoDB.shutdown();
+      dynamoDB = null;
+    }
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    if (table == null) {
+      LOG.info("In destroy(): no table to delete");
+      return;
+    }
+    LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
+    Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
+    try {
+      table.delete();
+      table.waitForDelete();
+    } catch (ResourceNotFoundException rnfe) {
+      LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in "
+              + "region {}.  This may indicate that the table does not exist, "
+              + "or has been deleted by another concurrent thread or process.",
+          tableName, region);
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
+          tableName, ie);
+      throw new InterruptedIOException("Table " + tableName
+          + " in region " + region + " has not been deleted");
+    } catch (AmazonClientException e) {
+      throw translateException("destroy", (String) null, e);
+    }
+  }
+
+  private ItemCollection<ScanOutcome> expiredFiles(long modTime) {
+    String filterExpression = "mod_time < :mod_time";
+    String projectionExpression = "parent,child";
+    ValueMap map = new ValueMap().withLong(":mod_time", modTime);
+    return table.scan(filterExpression, projectionExpression, null, map);
+  }
+
+  @Override
+  public void prune(long modTime) throws IOException {
+    int itemCount = 0;
+    try {
+      Collection<Path> deletionBatch =
+          new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
+      int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
+          S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
+      for (Item item : expiredFiles(modTime)) {
+        PathMetadata md = PathMetadataDynamoDBTranslation
+            .itemToPathMetadata(item, username);
+        Path path = md.getFileStatus().getPath();
+        deletionBatch.add(path);
+        itemCount++;
+        if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
+          Thread.sleep(delay);
+          processBatchWriteRequest(pathToKey(deletionBatch), null);
+          deletionBatch.clear();
+        }
+      }
+      if (deletionBatch.size() > 0) {
+        Thread.sleep(delay);
+        processBatchWriteRequest(pathToKey(deletionBatch), null);
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new InterruptedIOException("Pruning was interrupted");
+    }
+    LOG.info("Finished pruning {} items in batches of {}", itemCount,
+        S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + '{'
+        + "region=" + region
+        + ", tableName=" + tableName
+        + '}';
+  }
+
+  /**
+   * Create a table if it does not exist and wait for it to become active.
+   *
+   * If a table with the intended name already exists, then it uses that table.
+   * Otherwise, it will automatically create the table if the config
+   * {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
+   * enabled. The DynamoDB table creation API is asynchronous.  This method wait
+   * for the table to become active after sending the creation request, so
+   * overall, this method is synchronous, and the table is guaranteed to exist
+   * after this method returns successfully.
+   *
+   * @throws IOException if table does not exist and auto-creation is disabled;
+   * or table is being deleted, or any other I/O exception occurred.
+   */
+  @VisibleForTesting
+  void initTable() throws IOException {
+    table = dynamoDB.getTable(tableName);
+    try {
+      try {
+        LOG.debug("Binding to table {}", tableName);
+        final String status = table.describe().getTableStatus();
+        switch (status) {
+        case "CREATING":
+        case "UPDATING":
+          LOG.debug("Table {} in region {} is being created/updated. This may"
+                  + " indicate that the table is being operated by another "
+                  + "concurrent thread or process. Waiting for active...",
+              tableName, region);
+          waitForTableActive(table);
+          break;
+        case "DELETING":
+          throw new FileNotFoundException("DynamoDB table "
+              + "'" + tableName + "' is being "
+              + "deleted in region " + region);
+        case "ACTIVE":
+          break;
+        default:
+          throw new IOException("Unknown DynamoDB table status " + status
+              + ": tableName='" + tableName + "', region=" + region);
+        }
+
+        final Item versionMarker = getVersionMarkerItem();
+        verifyVersionCompatibility(tableName, versionMarker);
+        Long created = extractCreationTimeFromMarker(versionMarker);
+        LOG.debug("Using existing DynamoDB table {} in region {} created {}",
+            tableName, region, (created != null) ? new Date(created) : null);
+      } catch (ResourceNotFoundException rnfe) {
+        if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
+          final ProvisionedThroughput capacity = new ProvisionedThroughput(
+              conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
+                  S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT),
+              conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
+                  S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT));
+
+          createTable(capacity);
+        } else {
+          throw new FileNotFoundException("DynamoDB table "
+              + "'" + tableName + "' does not "
+              + "exist in region " + region + "; auto-creation is turned off");
+        }
+      }
+
+    } catch (AmazonClientException e) {
+      throw translateException("initTable", (String) null, e);
+    }
+  }
+
+  /**
+   * Get the version mark item in the existing DynamoDB table.
+   *
+   * As the version marker item may be created by another concurrent thread or
+   * process, we retry a limited times before we fail to get it.
+   */
+  private Item getVersionMarkerItem() throws IOException {
+    final PrimaryKey versionMarkerKey =
+        createVersionMarkerPrimaryKey(VERSION_MARKER);
+    int retryCount = 0;
+    Item versionMarker = table.getItem(versionMarkerKey);
+    while (versionMarker == null) {
+      try {
+        RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
+            retryCount, 0, true);
+        if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+          break;
+        } else {
+          LOG.debug("Sleeping {} ms before next retry", action.delayMillis);
+          Thread.sleep(action.delayMillis);
+        }
+      } catch (Exception e) {
+        throw new IOException("initTable: Unexpected exception", e);
+      }
+      retryCount++;
+      versionMarker = table.getItem(versionMarkerKey);
+    }
+    return versionMarker;
+  }
+
+  /**
+   * Verify that a table version is compatible with this S3Guard client.
+   * @param tableName name of the table (for error messages)
+   * @param versionMarker the version marker retrieved from the table
+   * @throws IOException on any incompatibility
+   */
+  @VisibleForTesting
+  static void verifyVersionCompatibility(String tableName,
+      Item versionMarker) throws IOException {
+    if (versionMarker == null) {
+      LOG.warn("Table {} contains no version marker", tableName);
+      throw new IOException(E_NO_VERSION_MARKER
+      + " Table: " + tableName);
+    } else {
+      final int version = extractVersionFromMarker(versionMarker);
+      if (VERSION != version) {
+        // version mismatch. Unless/until there is support for
+        // upgrading versions, treat this as an incompatible change
+        // and fail.
+        throw new IOException(E_INCOMPATIBLE_VERSION
+            + " Table "+  tableName
+            + " Expected version " + VERSION + " actual " + version);
+      }
+    }
+  }
+
+  /**
+   * Wait for table being active.
+   * @param t table to block on.
+   * @throws IOException IO problems
+   * @throws InterruptedIOException if the wait was interrupted
+   */
+  private void waitForTableActive(Table t) throws IOException {
+    try {
+      t.waitForActive();
+    } catch (InterruptedException e) {
+      LOG.warn("Interrupted while waiting for table {} in region {} active",
+          tableName, region, e);
+      Thread.currentThread().interrupt();
+      throw (IOException) new InterruptedIOException("DynamoDB table '"
+          + tableName + "' is not active yet in region " + region).initCause(e);
+    }
+  }
+
+  /**
+   * Create a table, wait for it to become active, then add the version
+   * marker.
+   * @param capacity capacity to provision
+   * @throws IOException on any failure.
+   * @throws InterruptedIOException if the wait was interrupted
+   */
+  private void createTable(ProvisionedThroughput capacity) throws IOException {
+    try {
+      LOG.info("Creating non-existent DynamoDB table {} in region {}",
+          tableName, region);
+      table = dynamoDB.createTable(new CreateTableRequest()
+          .withTableName(tableName)
+          .withKeySchema(keySchema())
+          .withAttributeDefinitions(attributeDefinitions())
+          .withProvisionedThroughput(capacity));
+      LOG.debug("Awaiting table becoming active");
+    } catch (ResourceInUseException e) {
+      LOG.warn("ResourceInUseException while creating DynamoDB table {} "
+              + "in region {}.  This may indicate that the table was "
+              + "created by another concurrent thread or process.",
+          tableName, region);
+    }
+    waitForTableActive(table);
+    final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
+        System.currentTimeMillis());
+    putItem(marker);
+  }
+
+  /**
+   * PUT a single item to the table.
+   * @param item item to put
+   * @return the outcome.
+   */
+  PutItemOutcome putItem(Item item) {
+    LOG.debug("Putting item {}", item);
+    return table.putItem(item);
+  }
+
+  /**
+   * Provision the table with given read and write capacity units.
+   */
+  void provisionTable(Long readCapacity, Long writeCapacity)
+      throws IOException {
+    final ProvisionedThroughput toProvision = new ProvisionedThroughput()
+        .withReadCapacityUnits(readCapacity)
+        .withWriteCapacityUnits(writeCapacity);
+    try {
+      final ProvisionedThroughputDescription p =
+          table.updateTable(toProvision).getProvisionedThroughput();
+      LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+              + "writeCapacityUnits={}",
+          tableName, region, p.getReadCapacityUnits(),
+          p.getWriteCapacityUnits());
+    } catch (AmazonClientException e) {
+      throw translateException("provisionTable", (String) null, e);
+    }
+  }
+
+  Table getTable() {
+    return table;
+  }
+
+  String getRegion() {
+    return region;
+  }
+
+  @VisibleForTesting
+  DynamoDB getDynamoDB() {
+    return dynamoDB;
+  }
+
+  /**
+   * Validates a path object; it must be absolute, and contain a host
+   * (bucket) component.
+   */
+  private Path checkPath(Path path) {
+    Preconditions.checkNotNull(path);
+    Preconditions.checkArgument(path.isAbsolute(), "Path %s is not absolute",
+        path);
+    URI uri = path.toUri();
+    Preconditions.checkNotNull(uri.getScheme(), "Path %s missing scheme", path);
+    Preconditions.checkArgument(uri.getScheme().equals(Constants.FS_S3A),
+        "Path %s scheme must be %s", path, Constants.FS_S3A);
+    Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()), "Path %s" +
+        " is missing bucket.", path);
+    return path;
+  }
+
+  /**
+   * Validates a path meta-data object.
+   */
+  private static void checkPathMetadata(PathMetadata meta) {
+    Preconditions.checkNotNull(meta);
+    Preconditions.checkNotNull(meta.getFileStatus());
+    Preconditions.checkNotNull(meta.getFileStatus().getPath());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
new file mode 100644
index 0000000..1ef8b0d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Tristate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * This is a local, in-memory, implementation of MetadataStore.
+ * This is <i>not</i> a coherent cache across processes.  It is only
+ * locally-coherent.
+ *
+ * The purpose of this is for unit and integration testing.
+ * It could also be used to accelerate local-only operations where only one
+ * process is operating on a given object store, or multiple processes are
+ * accessing a read-only storage bucket.
+ *
+ * This MetadataStore does not enforce filesystem rules such as disallowing
+ * non-recursive removal of non-empty directories.  It is assumed the caller
+ * already has to perform these sorts of checks.
+ */
+public class LocalMetadataStore implements MetadataStore {
+
+  public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class);
+  // TODO HADOOP-13649: use time instead of capacity for eviction.
+  public static final int DEFAULT_MAX_RECORDS = 128;
+
+  /**
+   * Maximum number of records.
+   */
+  public static final String CONF_MAX_RECORDS =
+      "fs.metadatastore.local.max_records";
+
+  /** Contains directories and files. */
+  private LruHashMap<Path, PathMetadata> fileHash;
+
+  /** Contains directory listings. */
+  private LruHashMap<Path, DirListingMetadata> dirHash;
+
+  private FileSystem fs;
+  /* Null iff this FS does not have an associated URI host. */
+  private String uriHost;
+
+  @Override
+  public void initialize(FileSystem fileSystem) throws IOException {
+    Preconditions.checkNotNull(fileSystem);
+    fs = fileSystem;
+    URI fsURI = fs.getUri();
+    uriHost = fsURI.getHost();
+    if (uriHost != null && uriHost.equals("")) {
+      uriHost = null;
+    }
+
+    initialize(fs.getConf());
+  }
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+    Preconditions.checkNotNull(conf);
+    int maxRecords = conf.getInt(CONF_MAX_RECORDS, DEFAULT_MAX_RECORDS);
+    if (maxRecords < 4) {
+      maxRecords = 4;
+    }
+    // Start w/ less than max capacity.  Space / time trade off.
+    fileHash = new LruHashMap<>(maxRecords/2, maxRecords);
+    dirHash = new LruHashMap<>(maxRecords/4, maxRecords);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "LocalMetadataStore{");
+    sb.append(", uriHost='").append(uriHost).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+
+  @Override
+  public void delete(Path p) throws IOException {
+    doDelete(p, false, true);
+  }
+
+  @Override
+  public void forgetMetadata(Path p) throws IOException {
+    doDelete(p, false, false);
+  }
+
+  @Override
+  public void deleteSubtree(Path path) throws IOException {
+    doDelete(path, true, true);
+  }
+
+  private synchronized void doDelete(Path p, boolean recursive, boolean
+      tombstone) {
+
+    Path path = standardize(p);
+
+    // Delete entry from file cache, then from cached parent directory, if any
+
+    deleteHashEntries(path, tombstone);
+
+    if (recursive) {
+      // Remove all entries that have this dir as path prefix.
+      deleteHashByAncestor(path, dirHash, tombstone);
+      deleteHashByAncestor(path, fileHash, tombstone);
+    }
+  }
+
+  @Override
+  public synchronized PathMetadata get(Path p) throws IOException {
+    return get(p, false);
+  }
+
+  @Override
+  public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag)
+      throws IOException {
+    Path path = standardize(p);
+    synchronized (this) {
+      PathMetadata m = fileHash.mruGet(path);
+
+      if (wantEmptyDirectoryFlag && m != null &&
+          m.getFileStatus().isDirectory()) {
+        m.setIsEmptyDirectory(isEmptyDirectory(p));
+      }
+
+      LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint());
+      return m;
+    }
+  }
+
+  /**
+   * Determine if directory is empty.
+   * Call with lock held.
+   * @param p a Path, already filtered through standardize()
+   * @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
+   */
+  private Tristate isEmptyDirectory(Path p) {
+    DirListingMetadata dirMeta = dirHash.get(p);
+    return dirMeta.withoutTombstones().isEmpty();
+  }
+
+  @Override
+  public synchronized DirListingMetadata listChildren(Path p) throws
+      IOException {
+    Path path = standardize(p);
+    DirListingMetadata listing = dirHash.mruGet(path);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("listChildren({}) -> {}", path,
+          listing == null ? "null" : listing.prettyPrint());
+    }
+    // Make a copy so callers can mutate without affecting our state
+    return listing == null ? null : new DirListingMetadata(listing);
+  }
+
+  @Override
+  public void move(Collection<Path> pathsToDelete,
+      Collection<PathMetadata> pathsToCreate) throws IOException {
+
+    Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
+    Preconditions.checkNotNull(pathsToCreate, "pathsToCreate is null");
+    Preconditions.checkArgument(pathsToDelete.size() == pathsToCreate.size(),
+        "Must supply same number of paths to delete/create.");
+
+    // I feel dirty for using reentrant lock. :-|
+    synchronized (this) {
+
+      // 1. Delete pathsToDelete
+      for (Path meta : pathsToDelete) {
+        LOG.debug("move: deleting metadata {}", meta);
+        delete(meta);
+      }
+
+      // 2. Create new destination path metadata
+      for (PathMetadata meta : pathsToCreate) {
+        LOG.debug("move: adding metadata {}", meta);
+        put(meta);
+      }
+
+      // 3. We now know full contents of all dirs in destination subtree
+      for (PathMetadata meta : pathsToCreate) {
+        FileStatus status = meta.getFileStatus();
+        if (status == null || status.isDirectory()) {
+          continue;
+        }
+        DirListingMetadata dir = listChildren(status.getPath());
+        if (dir != null) {  // could be evicted already
+          dir.setAuthoritative(true);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void put(PathMetadata meta) throws IOException {
+
+    Preconditions.checkNotNull(meta);
+    FileStatus status = meta.getFileStatus();
+    Path path = standardize(status.getPath());
+    synchronized (this) {
+
+      /* Add entry for this file. */
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("put {} -> {}", path, meta.prettyPrint());
+      }
+      fileHash.put(path, meta);
+
+      /* Directory case:
+       * We also make sure we have an entry in the dirHash, so subsequent
+       * listStatus(path) at least see the directory.
+       *
+       * If we had a boolean flag argument "isNew", we would know whether this
+       * is an existing directory the client discovered via getFileStatus(),
+       * or if it is a newly-created directory.  In the latter case, we would
+       * be able to mark the directory as authoritative (fully-cached),
+       * saving round trips to underlying store for subsequent listStatus()
+       */
+
+      if (status.isDirectory()) {
+        DirListingMetadata dir = dirHash.mruGet(path);
+        if (dir == null) {
+          dirHash.put(path, new DirListingMetadata(path, DirListingMetadata
+              .EMPTY_DIR, false));
+        }
+      }
+
+      /* Update cached parent dir. */
+      Path parentPath = path.getParent();
+      if (parentPath != null) {
+        DirListingMetadata parent = dirHash.mruGet(parentPath);
+        if (parent == null) {
+        /* Track this new file's listing in parent.  Parent is not
+         * authoritative, since there may be other items in it we don't know
+         * about. */
+          parent = new DirListingMetadata(parentPath,
+              DirListingMetadata.EMPTY_DIR, false);
+          dirHash.put(parentPath, parent);
+        }
+        parent.put(status);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void put(DirListingMetadata meta) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("put dirMeta {}", meta.prettyPrint());
+    }
+    dirHash.put(standardize(meta.getPath()), meta);
+  }
+
+  public synchronized void put(Collection<PathMetadata> metas) throws
+      IOException {
+    for (PathMetadata meta : metas) {
+      put(meta);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    if (dirHash != null) {
+      dirHash.clear();
+    }
+  }
+
+  @Override
+  public synchronized void prune(long modTime) throws IOException {
+    Iterator<Map.Entry<Path, PathMetadata>> files =
+        fileHash.entrySet().iterator();
+    while (files.hasNext()) {
+      Map.Entry<Path, PathMetadata> entry = files.next();
+      if (expired(entry.getValue().getFileStatus(), modTime)) {
+        files.remove();
+      }
+    }
+    Iterator<Map.Entry<Path, DirListingMetadata>> dirs =
+        dirHash.entrySet().iterator();
+    while (dirs.hasNext()) {
+      Map.Entry<Path, DirListingMetadata> entry = dirs.next();
+      Path path = entry.getKey();
+      DirListingMetadata metadata = entry.getValue();
+      Collection<PathMetadata> oldChildren = metadata.getListing();
+      Collection<PathMetadata> newChildren = new LinkedList<>();
+
+      for (PathMetadata child : oldChildren) {
+        FileStatus status = child.getFileStatus();
+        if (!expired(status, modTime)) {
+          newChildren.add(child);
+        }
+      }
+      if (newChildren.size() != oldChildren.size()) {
+        dirHash.put(path, new DirListingMetadata(path, newChildren, false));
+        if (!path.isRoot()) {
+          DirListingMetadata parent = dirHash.get(path.getParent());
+          if (parent != null) {
+            parent.setAuthoritative(false);
+          }
+        }
+      }
+    }
+  }
+
+  private boolean expired(FileStatus status, long expiry) {
+    // Note: S3 doesn't track modification time on directories, so for
+    // consistency with the DynamoDB implementation we ignore that here
+    return status.getModificationTime() < expiry && !status.isDirectory();
+  }
+
+  @VisibleForTesting
+  static <T> void deleteHashByAncestor(Path ancestor, Map<Path, T> hash,
+                                       boolean tombstone) {
+    for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();
+         it.hasNext();) {
+      Map.Entry<Path, T> entry = it.next();
+      Path f = entry.getKey();
+      T meta = entry.getValue();
+      if (isAncestorOf(ancestor, f)) {
+        if (tombstone) {
+          if (meta instanceof PathMetadata) {
+            entry.setValue((T) PathMetadata.tombstone(f));
+          } else if (meta instanceof DirListingMetadata) {
+            it.remove();
+          } else {
+            throw new IllegalStateException("Unknown type in hash");
+          }
+        } else {
+          it.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * @return true iff 'ancestor' is ancestor dir in path 'f'.
+   * All paths here are absolute.  Dir does not count as its own ancestor.
+   */
+  private static boolean isAncestorOf(Path ancestor, Path f) {
+    String aStr = ancestor.toString();
+    if (!ancestor.isRoot()) {
+      aStr += "/";
+    }
+    String fStr = f.toString();
+    return (fStr.startsWith(aStr));
+  }
+
+  /**
+   * Update fileHash and dirHash to reflect deletion of file 'f'.  Call with
+   * lock held.
+   */
+  private void deleteHashEntries(Path path, boolean tombstone) {
+
+    // Remove target file/dir
+    LOG.debug("delete file entry for {}", path);
+    if (tombstone) {
+      fileHash.put(path, PathMetadata.tombstone(path));
+    } else {
+      fileHash.remove(path);
+    }
+
+    // Update this and parent dir listing, if any
+
+    /* If this path is a dir, remove its listing */
+    LOG.debug("removing listing of {}", path);
+
+    dirHash.remove(path);
+
+    /* Remove this path from parent's dir listing */
+    Path parent = path.getParent();
+    if (parent != null) {
+      DirListingMetadata dir = dirHash.get(parent);
+      if (dir != null) {
+        LOG.debug("removing parent's entry for {} ", path);
+        if (tombstone) {
+          dir.markDeleted(path);
+        } else {
+          dir.remove(path);
+        }
+      }
+    }
+  }
+
+  /**
+   * Return a "standardized" version of a path so we always have a consistent
+   * hash value.  Also asserts the path is absolute, and contains host
+   * component.
+   * @param p input Path
+   * @return standardized version of Path, suitable for hash key
+   */
+  private Path standardize(Path p) {
+    Preconditions.checkArgument(p.isAbsolute(), "Path must be absolute");
+    URI uri = p.toUri();
+    if (uriHost != null) {
+      Preconditions.checkArgument(!isEmpty(uri.getHost()));
+    }
+    return p;
+  }
+
+  private static boolean isEmpty(String s) {
+    return (s == null || s.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
new file mode 100644
index 0000000..e355095
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
@@ -0,0 +1,50 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * LinkedHashMap that implements a maximum size and LRU eviction policy.
+ */
+public class LruHashMap<K, V> extends LinkedHashMap<K, V> {
+  private final int maxSize;
+  public LruHashMap(int initialCapacity, int maxSize) {
+    super(initialCapacity);
+    this.maxSize = maxSize;
+  }
+
+  @Override
+  protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+    return size() > maxSize;
+  }
+
+  /**
+   * get() plus side-effect of making the element Most Recently Used.
+   * @param key lookup key
+   * @return value
+   */
+
+  public V mruGet(K key) {
+    V val = remove(key);
+    if (val != null) {
+      put(key, val);
+    }
+    return val;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
new file mode 100644
index 0000000..dd8077b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * {@code MetadataStore} defines the set of operations that any metadata store
+ * implementation must provide.  Note that all {@link Path} objects provided
+ * to methods must be absolute, not relative paths.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface MetadataStore extends Closeable {
+
+  /**
+   * Performs one-time initialization of the metadata store.
+   *
+   * @param fs {@code FileSystem} associated with the MetadataStore
+   * @throws IOException if there is an error
+   */
+  void initialize(FileSystem fs) throws IOException;
+
+  /**
+   * Performs one-time initialization of the metadata store via configuration.
+   * @see #initialize(FileSystem)
+   * @param conf Configuration.
+   * @throws IOException if there is an error
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Deletes exactly one path, leaving a tombstone to prevent lingering,
+   * inconsistent copies of it from being listed.
+   *
+   * @param path the path to delete
+   * @throws IOException if there is an error
+   */
+  void delete(Path path) throws IOException;
+
+  /**
+   * Removes the record of exactly one path.  Does not leave a tombstone (see
+   * {@link MetadataStore#delete(Path)}. It is currently intended for testing
+   * only, and a need to use it as part of normal FileSystem usage is not
+   * anticipated.
+   *
+   * @param path the path to delete
+   * @throws IOException if there is an error
+   */
+  @VisibleForTesting
+  void forgetMetadata(Path path) throws IOException;
+
+  /**
+   * Deletes the entire sub-tree rooted at the given path, leaving tombstones
+   * to prevent lingering, inconsistent copies of it from being listed.
+   *
+   * In addition to affecting future calls to {@link #get(Path)},
+   * implementations must also update any stored {@code DirListingMetadata}
+   * objects which track the parent of this file.
+   *
+   * @param path the root of the sub-tree to delete
+   * @throws IOException if there is an error
+   */
+  void deleteSubtree(Path path) throws IOException;
+
+  /**
+   * Gets metadata for a path.
+   *
+   * @param path the path to get
+   * @return metadata for {@code path}, {@code null} if not found
+   * @throws IOException if there is an error
+   */
+  PathMetadata get(Path path) throws IOException;
+
+  /**
+   * Gets metadata for a path.  Alternate method that includes a hint
+   * whether or not the MetadataStore should do work to compute the value for
+   * {@link PathMetadata#isEmptyDirectory()}.  Since determining emptiness
+   * may be an expensive operation, this can save wasted work.
+   *
+   * @param path the path to get
+   * @param wantEmptyDirectoryFlag Set to true to give a hint to the
+   *   MetadataStore that it should try to compute the empty directory flag.
+   * @return metadata for {@code path}, {@code null} if not found
+   * @throws IOException if there is an error
+   */
+  PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+      throws IOException;
+
+  /**
+   * Lists metadata for all direct children of a path.
+   *
+   * @param path the path to list
+   * @return metadata for all direct children of {@code path} which are being
+   *     tracked by the MetadataStore, or {@code null} if the path was not found
+   *     in the MetadataStore.
+   * @throws IOException if there is an error
+   */
+  DirListingMetadata listChildren(Path path) throws IOException;
+
+  /**
+   * Record the effects of a {@link FileSystem#rename(Path, Path)} in the
+   * MetadataStore.  Clients provide explicit enumeration of the affected
+   * paths (recursively), before and after the rename.
+   *
+   * This operation is not atomic, unless specific implementations claim
+   * otherwise.
+   *
+   * On the need to provide an enumeration of directory trees instead of just
+   * source and destination paths:
+   * Since a MetadataStore does not have to track all metadata for the
+   * underlying storage system, and a new MetadataStore may be created on an
+   * existing underlying filesystem, this move() may be the first time the
+   * MetadataStore sees the affected paths.  Therefore, simply providing src
+   * and destination paths may not be enough to record the deletions (under
+   * src path) and creations (at destination) that are happening during the
+   * rename().
+   *
+   * @param pathsToDelete Collection of all paths that were removed from the
+   *                      source directory tree of the move.
+   * @param pathsToCreate Collection of all PathMetadata for the new paths
+   *                      that were created at the destination of the rename
+   *                      ().
+   * @throws IOException if there is an error
+   */
+  void move(Collection<Path> pathsToDelete,
+      Collection<PathMetadata> pathsToCreate) throws IOException;
+
+  /**
+   * Saves metadata for exactly one path.
+   *
+   * Implementations may pre-create all the path's ancestors automatically.
+   * Implementations must update any {@code DirListingMetadata} objects which
+   * track the immediate parent of this file.
+   *
+   * @param meta the metadata to save
+   * @throws IOException if there is an error
+   */
+  void put(PathMetadata meta) throws IOException;
+
+  /**
+   * Saves metadata for any number of paths.
+   *
+   * Semantics are otherwise the same as single-path puts.
+   *
+   * @param metas the metadata to save
+   * @throws IOException if there is an error
+   */
+  void put(Collection<PathMetadata> metas) throws IOException;
+
+  /**
+   * Save directory listing metadata. Callers may save a partial directory
+   * listing for a given path, or may store a complete and authoritative copy
+   * of the directory listing.  {@code MetadataStore} implementations may
+   * subsequently keep track of all modifications to the directory contents at
+   * this path, and return authoritative results from subsequent calls to
+   * {@link #listChildren(Path)}. See {@link DirListingMetadata}.
+   *
+   * Any authoritative results returned are only authoritative for the scope
+   * of the {@code MetadataStore}:  A per-process {@code MetadataStore}, for
+   * example, would only show results visible to that process, potentially
+   * missing metadata updates (create, delete) made to the same path by
+   * another process.
+   *
+   * @param meta Directory listing metadata.
+   * @throws IOException if there is an error
+   */
+  void put(DirListingMetadata meta) throws IOException;
+
+  /**
+   * Destroy all resources associated with the metadata store.
+   *
+   * The destroyed resources can be DynamoDB tables, MySQL databases/tables, or
+   * HDFS directories. Any operations after calling this method may possibly
+   * fail.
+   *
+   * This operation is idempotent.
+   *
+   * @throws IOException if there is an error
+   */
+  void destroy() throws IOException;
+
+  /**
+   * Clear any metadata older than a specified time from the repository.
+   * Implementations MUST clear file metadata, and MAY clear directory metadata
+   * (s3a itself does not track modification time for directories).
+   * Implementations may also choose to throw UnsupportedOperationException
+   * istead. Note that modification times should be in UTC, as returned by
+   * System.currentTimeMillis at the time of modification.
+   *
+   * @param modTime Oldest modification time to allow
+   * @throws IOException if there is an error
+   * @throws UnsupportedOperationException if not implemented
+   */
+  void prune(long modTime) throws IOException, UnsupportedOperationException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
new file mode 100644
index 0000000..378d109
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreListFilesIterator.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * {@code MetadataStoreListFilesIterator} is a {@link RemoteIterator} that
+ * is similar to {@code DescendantsIterator} but does not return directories
+ * that have (or may have) children, and will also provide access to the set of
+ * tombstones to allow recently deleted S3 objects to be filtered out from a
+ * corresponding request.  In other words, it returns tombstones and the same
+ * set of objects that should exist in S3: empty directories, and files, and not
+ * other directories whose existence is inferred therefrom.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * /dir1
+ * |-- dir2
+ * |   |-- file1
+ * |   `-- file2
+ * `-- dir3
+ *     |-- dir4
+ *     |   `-- file3
+ *     |-- dir5
+ *     |   `-- file4
+ *     `-- dir6
+ * </pre>
+ *
+ * Consider this code sample:
+ * <pre>
+ * final PathMetadata dir1 = get(new Path("/dir1"));
+ * for (MetadataStoreListFilesIterator files =
+ *     new MetadataStoreListFilesIterator(dir1); files.hasNext(); ) {
+ *   final FileStatus status = files.next().getFileStatus();
+ *   System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F',
+ *       status.getPath());
+ * }
+ * </pre>
+ *
+ * The output is:
+ * <pre>
+ * F /dir1/dir2/file1
+ * F /dir1/dir2/file2
+ * F /dir1/dir3/dir4/file3
+ * F /dir1/dir3/dir5/file4
+ * D /dir1/dir3/dir6
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class MetadataStoreListFilesIterator implements
+    RemoteIterator<FileStatus> {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      MetadataStoreListFilesIterator.class);
+
+  private final boolean allowAuthoritative;
+  private final MetadataStore metadataStore;
+  private final Set<Path> tombstones = new HashSet<>();
+  private Iterator<FileStatus> leafNodesIterator = null;
+
+  public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta,
+      boolean allowAuthoritative) throws IOException {
+    Preconditions.checkNotNull(ms);
+    this.metadataStore = ms;
+    this.allowAuthoritative = allowAuthoritative;
+    prefetch(meta);
+  }
+
+  private void prefetch(PathMetadata meta) throws IOException {
+    final Queue<PathMetadata> queue = new LinkedList<>();
+    final Collection<FileStatus> leafNodes = new ArrayList<>();
+
+    if (meta != null) {
+      final Path path = meta.getFileStatus().getPath();
+      if (path.isRoot()) {
+        DirListingMetadata rootListing = metadataStore.listChildren(path);
+        if (rootListing != null) {
+          tombstones.addAll(rootListing.listTombstones());
+          queue.addAll(rootListing.withoutTombstones().getListing());
+        }
+      } else {
+        queue.add(meta);
+      }
+    }
+
+    while(!queue.isEmpty()) {
+      PathMetadata nextMetadata = queue.poll();
+      FileStatus nextStatus = nextMetadata.getFileStatus();
+      if (nextStatus.isFile()) {
+        // All files are leaf nodes by definition
+        leafNodes.add(nextStatus);
+        continue;
+      }
+      if (nextStatus.isDirectory()) {
+        final Path path = nextStatus.getPath();
+        DirListingMetadata children = metadataStore.listChildren(path);
+        if (children != null) {
+          tombstones.addAll(children.listTombstones());
+          Collection<PathMetadata> liveChildren =
+              children.withoutTombstones().getListing();
+          if (!liveChildren.isEmpty()) {
+            // If it's a directory, has children, not all deleted, then we
+            // add the children to the queue and move on to the next node
+            queue.addAll(liveChildren);
+            continue;
+          } else if (allowAuthoritative && children.isAuthoritative()) {
+            leafNodes.add(nextStatus);
+          }
+        }
+      }
+      // Directories that *might* be empty are ignored for now, since we
+      // cannot confirm that they are empty without incurring other costs.
+      // Users of this class can still discover empty directories via S3's
+      // fake directories, subject to the same consistency semantics as before.
+      // The only other possibility is a symlink, which is unsupported on S3A.
+    }
+    leafNodesIterator = leafNodes.iterator();
+  }
+
+  @Override
+  public boolean hasNext() {
+    return leafNodesIterator.hasNext();
+  }
+
+  @Override
+  public FileStatus next() {
+    return leafNodesIterator.next();
+  }
+
+  public Set<Path> listTombstones() {
+    return tombstones;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
new file mode 100644
index 0000000..08ae89e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A no-op implementation of MetadataStore.  Clients that use this
+ * implementation should behave the same as they would without any
+ * MetadataStore.
+ */
+public class NullMetadataStore implements MetadataStore {
+
+  @Override
+  public void initialize(FileSystem fs) throws IOException {
+  }
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+
+  @Override
+  public void delete(Path path) throws IOException {
+  }
+
+  @Override
+  public void forgetMetadata(Path path) throws IOException {
+  }
+
+  @Override
+  public void deleteSubtree(Path path) throws IOException {
+  }
+
+  @Override
+  public PathMetadata get(Path path) throws IOException {
+    return null;
+  }
+
+  @Override
+  public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public DirListingMetadata listChildren(Path path) throws IOException {
+    return null;
+  }
+
+  @Override
+  public void move(Collection<Path> pathsToDelete,
+      Collection<PathMetadata> pathsToCreate) throws IOException {
+  }
+
+  @Override
+  public void put(PathMetadata meta) throws IOException {
+  }
+
+  @Override
+  public void put(Collection<PathMetadata> meta) throws IOException {
+  }
+
+  @Override
+  public void put(DirListingMetadata meta) throws IOException {
+  }
+
+  @Override
+  public void destroy() throws IOException {
+  }
+
+  @Override
+  public void prune(long modTime) {
+  }
+
+  @Override
+  public String toString() {
+    return "NullMetadataStore";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
new file mode 100644
index 0000000..2a0219e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Tristate;
+
+/**
+ * {@code PathMetadata} models path metadata stored in the
+ * {@link MetadataStore}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PathMetadata {
+
+  private final FileStatus fileStatus;
+  private Tristate isEmptyDirectory;
+  private boolean isDeleted;
+
+  /**
+   * Create a tombstone from the current time.
+   * @param path path to tombstone
+   * @return the entry.
+   */
+  public static PathMetadata tombstone(Path path) {
+    long now = System.currentTimeMillis();
+    FileStatus status = new FileStatus(0, false, 0, 0, now, path);
+    return new PathMetadata(status, Tristate.UNKNOWN, true);
+  }
+
+  /**
+   * Creates a new {@code PathMetadata} containing given {@code FileStatus}.
+   * @param fileStatus file status containing an absolute path.
+   */
+  public PathMetadata(FileStatus fileStatus) {
+    this(fileStatus, Tristate.UNKNOWN);
+  }
+
+  public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir) {
+    this(fileStatus, isEmptyDir, false);
+  }
+
+  public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir, boolean
+      isDeleted) {
+    Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null");
+    Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" +
+        " non-null");
+    Preconditions.checkArgument(fileStatus.getPath().isAbsolute(), "path must" +
+        " be absolute");
+    this.fileStatus = fileStatus;
+    this.isEmptyDirectory = isEmptyDir;
+    this.isDeleted = isDeleted;
+  }
+
+  /**
+   * @return {@code FileStatus} contained in this {@code PathMetadata}.
+   */
+  public final FileStatus getFileStatus() {
+    return fileStatus;
+  }
+
+  /**
+   * Query if a directory is empty.
+   * @return Tristate.TRUE if this is known to be an empty directory,
+   * Tristate.FALSE if known to not be empty, and Tristate.UNKNOWN if the
+   * MetadataStore does have enough information to determine either way.
+   */
+  public Tristate isEmptyDirectory() {
+    return isEmptyDirectory;
+  }
+
+  void setIsEmptyDirectory(Tristate isEmptyDirectory) {
+    this.isEmptyDirectory = isEmptyDirectory;
+  }
+
+  public boolean isDeleted() {
+    return isDeleted;
+  }
+
+  void setIsDeleted(boolean isDeleted) {
+    this.isDeleted = isDeleted;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PathMetadata)) {
+      return false;
+    }
+    return this.fileStatus.equals(((PathMetadata)o).fileStatus);
+  }
+
+  @Override
+  public int hashCode() {
+    return fileStatus.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "PathMetadata{" +
+        "fileStatus=" + fileStatus +
+        "; isEmptyDirectory=" + isEmptyDirectory +
+        "; isDeleted=" + isDeleted +
+        '}';
+  }
+
+  /**
+   * Log contents to supplied StringBuilder in a pretty fashion.
+   * @param sb target StringBuilder
+   */
+  public void prettyPrint(StringBuilder sb) {
+    sb.append(String.format("%-5s %-20s %-7d %-8s %-6s",
+        fileStatus.isDirectory() ? "dir" : "file",
+        fileStatus.getPath().toString(), fileStatus.getLen(),
+        isEmptyDirectory.name(), isDeleted));
+    sb.append(fileStatus);
+  }
+
+  public String prettyPrint() {
+    StringBuilder sb = new StringBuilder();
+    prettyPrint(sb);
+    return sb.toString();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org