You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fa...@apache.org on 2016/12/09 23:30:41 UTC
[2/2] hadoop git commit: HADOOP-13449 S3Guard: Implement
DynamoDBMetadataStore. Contributed by Mingliang Liu
HADOOP-13449 S3Guard: Implement DynamoDBMetadataStore. Contributed by Mingliang Liu
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d354cd18
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d354cd18
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d354cd18
Branch: refs/heads/HADOOP-13345
Commit: d354cd182f9ff87660cbb61bb2a2448c8f30fd04
Parents: 881de1f
Author: Aaron Fabbri <fa...@apache.org>
Authored: Thu Dec 8 19:26:06 2016 -0800
Committer: Aaron Fabbri <fa...@apache.org>
Committed: Thu Dec 8 19:26:06 2016 -0800
----------------------------------------------------------------------
.../src/main/resources/core-default.xml | 42 ++
hadoop-project/pom.xml | 18 +
hadoop-tools/hadoop-aws/pom.xml | 32 ++
.../hadoop/fs/s3a/DefaultS3ClientFactory.java | 16 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 3 +-
.../apache/hadoop/fs/s3a/S3ClientFactory.java | 4 +-
.../fs/s3a/s3guard/DescendantsIterator.java | 137 +++++
.../fs/s3a/s3guard/DirListingMetadata.java | 6 +-
.../fs/s3a/s3guard/DynamoDBClientFactory.java | 101 ++++
.../fs/s3a/s3guard/DynamoDBMetadataStore.java | 545 +++++++++++++++++++
.../fs/s3a/s3guard/LocalMetadataStore.java | 7 +
.../hadoop/fs/s3a/s3guard/MetadataStore.java | 13 +
.../fs/s3a/s3guard/NullMetadataStore.java | 5 +
.../PathMetadataDynamoDBTranslation.java | 209 +++++++
.../apache/hadoop/fs/s3a/s3guard/S3Guard.java | 54 ++
.../hadoop/fs/s3a/AbstractS3AMockTest.java | 10 +-
.../hadoop/fs/s3a/ITestS3AEmptyDirectory.java | 78 +++
.../fs/s3a/s3guard/MetadataStoreTestBase.java | 136 +++--
.../s3a/s3guard/TestDynamoDBMetadataStore.java | 325 +++++++++++
.../fs/s3a/s3guard/TestLocalMetadataStore.java | 24 +
.../TestPathMetadataDynamoDBTranslation.java | 219 ++++++++
21 files changed, 1923 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 37b8f6d..7683e7d 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1241,6 +1241,48 @@
</property>
<property>
+ <name>fs.s3a.s3guard.ddb.endpoint</name>
+ <value></value>
+ <description>
+ AWS DynamoDB endpoint to connect to. An up-to-date list is
+ provided in the AWS Documentation: regions and endpoints. Without this
+ property, the AWS SDK will look up a regional endpoint automatically
+ according to the S3 region.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.table</name>
+ <value></value>
+ <description>
+ The DynamoDB table name to operate. Without this property, the respective
+ S3 bucket name will be used.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.table.capacity.read</name>
+ <value>500</value>
+ <description>
+ Provisioned throughput requirements for read operations in terms of capacity
+ units for the DynamoDB table. This config value will only be used when
+ creating a new DynamoDB table, though later you can manually provision by
+ increasing or decreasing read capacity as needed for existing tables.
+ See DynamoDB documents for more information.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.table.capacity.write</name>
+ <value>100</value>
+ <description>
+ Provisioned throughput requirements for write operations in terms of
+ capacity units for the DynamoDB table. Refer to related config
+ fs.s3a.s3guard.ddb.table.capacity.read before usage.
+ </description>
+</property>
+
+ <property>
<name>fs.AbstractFileSystem.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3A</value>
<description>The implementation class of the S3A AbstractFileSystem.</description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 1ae60ed..d8cff61 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -756,6 +756,16 @@
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-dynamodb</artifactId>
+ <version>${aws-java-sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>DynamoDBLocal</artifactId>
+ <version>1.11.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
@@ -1627,4 +1637,12 @@
</build>
</profile>
</profiles>
+
+ <repositories>
+ <repository>
+ <id>dynamodblocal</id>
+ <name>AWS DynamoDB Local Release Repository</name>
+ <url>http://dynamodb-local.s3-website-us-west-2.amazonaws.com/release</url>
+ </repository>
+ </repositories>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 1407661..b7b945d 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -310,6 +310,18 @@
<outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
</configuration>
</execution>
+ <execution>
+ <id>copy</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeScope>test</includeScope>
+ <includeTypes>so,dll,dylib</includeTypes>
+ <outputDirectory>${project.build.directory}/native-libs</outputDirectory>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
@@ -354,6 +366,26 @@
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-dynamodb</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>DynamoDBLocal</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<scope>test</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
index a43a746..c411fdd 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -50,12 +50,22 @@ public class DefaultS3ClientFactory extends Configured implements
Configuration conf = getConf();
AWSCredentialsProvider credentials =
createAWSCredentialProviderSet(name, conf, uri);
- ClientConfiguration awsConf = new ClientConfiguration();
+ final ClientConfiguration awsConf = createAwsConf(getConf());
+ AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
+ return createAmazonS3Client(s3, conf, credentials, awsConf);
+ }
+
+ /**
+ * Create a new {@link ClientConfiguration}.
+ * @param conf The Hadoop configuration
+ * @return new AWS client configuration
+ */
+ public static ClientConfiguration createAwsConf(Configuration conf) {
+ final ClientConfiguration awsConf = new ClientConfiguration();
initConnectionSettings(conf, awsConf);
initProxySupport(conf, awsConf);
initUserAgent(conf, awsConf);
- AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
- return createAmazonS3Client(s3, conf, credentials, awsConf);
+ return awsConf;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index cb08e57..bcc841c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -362,8 +362,7 @@ public class S3AFileSystem extends FileSystem {
* Returns the S3 client used by this filesystem.
* @return AmazonS3Client
*/
- @VisibleForTesting
- AmazonS3 getAmazonS3Client() {
+ public AmazonS3 getAmazonS3Client() {
return s3;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 5169840..387eb43 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -27,11 +27,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
- * Factory for creation of S3 client instances to be used by {@link S3Store}.
+ * Factory for creation of {@link AmazonS3} client instances.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-interface S3ClientFactory {
+public interface S3ClientFactory {
/**
* Creates a new {@link AmazonS3} client. This method accepts the S3A file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
new file mode 100644
index 0000000..afd3266
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DescendantsIterator.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+/**
+ * {@code DescendantsIterator} is a {@link RemoteIterator} that implements
+ * pre-ordering breadth-first traversal (BFS) of a path and all of its
+ * descendants recursively. After visiting each path, that path's direct
+ * children are discovered by calling {@link MetadataStore#listChildren(Path)}.
+ * Each iteration returns the next direct child, and if that child is a
+ * directory, also pushes it onto a queue to discover its children later.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * {@code
+ * /dir1
+ * |-- dir2
+ * | |-- file1
+ * | `-- file2
+ * `-- dir3
+ * |-- dir4
+ * | `-- file3
+ * |-- dir5
+ * | `-- file4
+ * `-- dir6
+ * }
+ * </pre>
+ *
+ * Consider this code sample:
+ * <pre>
+ * {@code
+ * final PathMetadata dir1 = get(new Path("/dir1"));
+ * for (DescendantsIterator descendants = new DescendantsIterator(dir1);
+ * descendants.hasNext(); ) {
+ * final FileStatus status = descendants.next().getFileStatus();
+ * System.out.printf("%s %s%n", status.isDirectory() ? 'D' : 'F',
+ * status.getPath());
+ * }
+ * }
+ * </pre>
+ *
+ * The output is:
+ * <pre>
+ * {@code
+ * D /dir1
+ * D /dir1/dir2
+ * D /dir1/dir3
+ * F /dir1/dir2/file1
+ * F /dir1/dir2/file2
+ * D /dir1/dir3/dir4
+ * D /dir1/dir3/dir5
+ * F /dir1/dir3/dir4/file3
+ * F /dir1/dir3/dir5/file4
+ * D /dir1/dir3/dir6
+ * }
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+class DescendantsIterator implements RemoteIterator<PathMetadata> {
+
+ private final MetadataStore metadataStore;
+ private final Queue<PathMetadata> queue = new LinkedList<>();
+
+ /**
+ * Creates a new {@code DescendantsIterator}.
+ *
+ * @param ms the associated {@link MetadataStore}
+ * @param meta base path for descendants iteration, which will be the first
+ * path returned during iteration (except root)
+ */
+ DescendantsIterator(MetadataStore ms, PathMetadata meta)
+ throws IOException {
+ Preconditions.checkNotNull(ms);
+ Preconditions.checkNotNull(meta);
+ this.metadataStore = ms;
+
+ final Path path = meta.getFileStatus().getPath();
+ if (path.isRoot()) {
+ final DirListingMetadata rootListing = ms.listChildren(path);
+ if (rootListing != null) {
+ queue.addAll(rootListing.getListing());
+ }
+ } else {
+ queue.add(meta);
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return !queue.isEmpty();
+ }
+
+ @Override
+ public PathMetadata next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more descendants.");
+ }
+ final PathMetadata next;
+ next = queue.poll();
+ if (next.getFileStatus().isDirectory()) {
+ final Path path = next.getFileStatus().getPath();
+ queue.addAll(metadataStore.listChildren(path).getListing());
+ }
+ return next;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
index c25ad3a..4a9df55 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
@@ -200,13 +200,15 @@ public class DirListingMetadata {
Preconditions.checkNotNull(childUri.getHost(), "Expected non-null URI " +
"host");
Preconditions.checkArgument(
- childUri.getHost().equals(parentUri.getHost()));
+ childUri.getHost().equals(parentUri.getHost()),
+ "childUri '" + childUri + "' and parentUri '" + parentUri
+ + "' should have the same host");
Preconditions.checkNotNull(childUri.getScheme());
}
Preconditions.checkArgument(!childPath.isRoot(),
"childPath cannot be the root path");
Preconditions.checkArgument(childPath.getParent().equals(path),
- "childPath must be a child of path");
+ "childPath '" + childPath + "' must be a child of path '" + path + "'");
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
new file mode 100644
index 0000000..a06197f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBClientFactory.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.s3.model.Region;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+
+/**
+ * Interface to create a DynamoDB client.
+ *
+ * Implementation should be configured for setting and getting configuration.
+ */
+interface DynamoDBClientFactory extends Configurable {
+ Logger LOG = LoggerFactory.getLogger(DynamoDBClientFactory.class);
+
+ /**
+ * To create a DynamoDB client with the same region as the s3 bucket.
+ *
+ * @param fsUri FileSystem URI after any login details have been stripped
+ * @param s3Region the s3 region
+ * @return a new DynamoDB client
+ * @throws IOException if any IO error happens
+ */
+ AmazonDynamoDBClient createDynamoDBClient(URI fsUri, String s3Region)
+ throws IOException;
+
+ /**
+ * The default implementation for creating an AmazonDynamoDBClient.
+ */
+ class DefaultDynamoDBClientFactory extends Configured
+ implements DynamoDBClientFactory {
+ @Override
+ public AmazonDynamoDBClient createDynamoDBClient(URI fsUri, String s3Region)
+ throws IOException {
+ assert getConf() != null : "Should have been configured before usage";
+ Region region;
+ try {
+ region = Region.fromValue(s3Region);
+ } catch (IllegalArgumentException e) {
+ final String msg = "Region '" + s3Region +
+ "' is invalid; should use the same region as S3 bucket";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg, e);
+ }
+ LOG.info("Creating DynamoDBClient for fsUri {} in region {}",
+ fsUri, region);
+
+ final Configuration conf = getConf();
+ final AWSCredentialsProvider credentials =
+ createAWSCredentialProviderSet(fsUri, conf, fsUri);
+ final ClientConfiguration awsConf =
+ DefaultS3ClientFactory.createAwsConf(conf);
+ AmazonDynamoDBClient ddb = new AmazonDynamoDBClient(credentials, awsConf);
+
+ ddb.withRegion(region.toAWSRegion());
+ final String endPoint = conf.get(S3Guard.S3GUARD_DDB_ENDPOINT_KEY);
+ if (StringUtils.isNotEmpty(endPoint)) {
+ try {
+ ddb.withEndpoint(conf.get(S3Guard.S3GUARD_DDB_ENDPOINT_KEY));
+ } catch (IllegalArgumentException e) {
+ final String msg = "Incorrect DynamoDB endpoint: " + endPoint;
+ LOG.error(msg, e);
+ throw new IllegalArgumentException(msg, e);
+ }
+ }
+ return ddb;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
new file mode 100644
index 0000000..07ee542
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
+import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.ItemCollection;
+import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
+import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
+import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
+import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+
+import com.amazonaws.services.dynamodbv2.model.WriteRequest;
+import com.amazonaws.services.s3.AmazonS3;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
+
+/**
+ * DynamoDBMetadataStore is a {@link MetadataStore} that persists
+ * file system metadata to DynamoDB.
+ *
+ * The current implementation uses a schema consisting of a single table. The
+ * name of the table can be configured by config key
+ * {@link S3Guard#S3GUARD_DDB_TABLE_NAME_KEY}.
+ * By default, it matches the name of the S3 bucket. Each item in the table
+ * represents a single directory or file. Its path is split into separate table
+ * attributes:
+ * <ul>
+ * <li> parent (absolute path of the parent). </li>
+ * <li> child (path of that specific child, relative to parent). </li>
+ * <li> optional boolean attribute tracking whether the path is a directory.
+ * Absence or a false value indicates the path is a file. </li>
+ * <li> optional long attribute revealing modification time of file.
+ * This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing file length.
+ * This attribute is meaningful only to file items.</li>
+ * <li> optional long attribute revealing block size of the file.
+ * This attribute is meaningful only to file items.</li>
+ * </ul>
+ *
+ * The DynamoDB partition key is the parent, and the range key is the child.
+ *
+ * Root is a special case. It has no parent, so it cannot be split into
+ * separate parent and child attributes. To avoid confusion in the DynamoDB
+ * table, we simply do not persist root and instead treat it as a special case
+ * path that always exists.
+ *
+ * For example, assume the consistent store contains metadata representing this
+ * file system structure:
+ *
+ * <pre>
+ * /dir1
+ * |-- dir2
+ * | |-- file1
+ * | `-- file2
+ * `-- dir3
+ * |-- dir4
+ * | `-- file3
+ * |-- dir5
+ * | `-- file4
+ * `-- dir6
+ * </pre>
+ *
+ * This is persisted to a single DynamoDB table as:
+ *
+ * <pre>
+ * ==================================================================
+ * | parent | child | is_dir | mod_time | len | ... |
+ * ==================================================================
+ * | / | dir1 | true | | | |
+ * | /dir1 | dir2 | true | | | |
+ * | /dir1 | dir3 | true | | | |
+ * | /dir1/dir2 | file1 | | 100 | 111 | |
+ * | /dir1/dir2 | file2 | | 200 | 222 | |
+ * | /dir1/dir3 | dir4 | true | | | |
+ * | /dir1/dir3 | dir5 | true | | | |
+ * | /dir1/dir3/dir4 | file3 | | 300 | 333 | |
+ * | /dir1/dir3/dir5 | file4 | | 400 | 444 | |
+ * | /dir1/dir3 | dir6 | true | | | |
+ * ==================================================================
+ * </pre>
+ *
+ * This choice of schema is efficient for read access patterns.
+ * {@link #get(Path)} can be served from a single item lookup.
+ * {@link #listChildren(Path)} can be served from a query against all rows
+ * matching the parent (the partition key) and the returned list is guaranteed
+ * to be sorted by child (the range key). Tracking whether or not a path is a
+ * directory helps prevent unnecessary queries during traversal of an entire
+ * sub-tree.
+ *
+ * Some mutating operations, notably {@link #deleteSubtree(Path)} and
+ * {@link #move(Collection, Collection)}, are less efficient with this schema.
+ * They require mutating multiple items in the DynamoDB table.
+ *
+ * All DynamoDB access is performed within the same AWS region as the S3 bucket
+ * that hosts the S3A instance. During initialization, it checks the location
+ * of the S3 bucket and creates a DynamoDB client connected to the same region.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DynamoDBMetadataStore implements MetadataStore {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ DynamoDBMetadataStore.class);
+
+ private DynamoDB dynamoDB;
+ private String region;
+ private Table table;
+ private String tableName;
+ private S3AFileSystem s3afs;
+ private String username;
+
+ @Override
+ public void initialize(FileSystem fs) throws IOException {
+ Preconditions.checkArgument(fs instanceof S3AFileSystem,
+ "DynamoDBMetadataStore only supports S3A filesystem.");
+ s3afs = (S3AFileSystem) fs;
+ final String bucket = s3afs.getUri().getAuthority();
+ try {
+ region = s3afs.getAmazonS3Client().getBucketLocation(bucket);
+ } catch (AmazonClientException e) {
+ throw new IOException("Can not find location for bucket " + bucket, e);
+ }
+
+ username = s3afs.getUsername();
+
+ final Configuration conf = s3afs.getConf();
+ Class<? extends DynamoDBClientFactory> cls = conf.getClass(
+ S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+ S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
+ DynamoDBClientFactory.class);
+ AmazonDynamoDBClient dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
+ .createDynamoDBClient(s3afs.getUri(), region);
+ dynamoDB = new DynamoDB(dynamoDBClient);
+
+ // use the bucket as the DynamoDB table name if not specified in config
+ tableName = conf.getTrimmed(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, bucket);
+
+ // create the table unless it's explicitly told not to do so
+ if (conf.getBoolean(S3Guard.S3GUARD_DDB_TABLE_CREATE_KEY, true)) {
+ createTable();
+ }
+ }
+
+ /**
+ * Performs one-time initialization of the metadata store via configuration.
+ *
+ * This initialization depends on the configuration object to get DEFAULT
+ * S3AFileSystem URI, AWS credentials, S3ClientFactory implementation class,
+ * DynamoDBFactor implementation class, DynamoDB endpoints, metadata table
+ * names etc. Generally you should use {@link #initialize(FileSystem)} instead
+ * given an initialized S3 file system.
+ *
+ * @see #initialize(FileSystem)
+ * @throws IOException if there is an error
+ */
+ void initialize(Configuration conf) throws IOException {
+ final FileSystem defautFs = FileSystem.get(conf);
+ Preconditions.checkArgument(defautFs instanceof S3AFileSystem,
+ "DynamoDBMetadataStore only supports S3A filesystem.");
+ s3afs = (S3AFileSystem) defautFs;
+
+ // use the bucket as the DynamoDB table name if not specified in config
+ tableName = conf.getTrimmed(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY);
+ Preconditions.checkNotNull(tableName, "No DynamoDB table name configured!");
+
+ final Class<? extends S3ClientFactory> clsS3 = conf.getClass(
+ Constants.S3_CLIENT_FACTORY_IMPL,
+ Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL,
+ S3ClientFactory.class);
+ final S3ClientFactory factory = ReflectionUtils.newInstance(clsS3, conf);
+ AmazonS3 s3 = factory.createS3Client(s3afs.getUri(), s3afs.getUri());
+ try {
+ region = s3.getBucketLocation(tableName);
+ } catch (AmazonClientException e) {
+ throw new IOException("Can not find location for bucket " + tableName, e);
+ }
+
+ Class<? extends DynamoDBClientFactory> clsDdb = conf.getClass(
+ S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+ S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
+ DynamoDBClientFactory.class);
+ AmazonDynamoDBClient dynamoDBClient =
+ ReflectionUtils.newInstance(clsDdb, conf)
+ .createDynamoDBClient(s3afs.getUri(), region);
+ dynamoDB = new DynamoDB(dynamoDBClient);
+
+ createTable();
+ }
+
+ @Override
+ public void delete(Path path) throws IOException {
+ path = checkPath(path);
+ LOG.debug("Deleting from table {} in region {}: {}",
+ tableName, region, path);
+
+ // deleting nonexistent item consumes 1 write capacity; skip it
+ if (path.isRoot()) {
+ LOG.debug("Skip deleting root directory as it does not exist in table");
+ return;
+ }
+
+ try {
+ table.deleteItem(pathToKey(path));
+ } catch (AmazonClientException e) {
+ throw translateException("delete", path, e);
+ }
+ }
+
+ @Override
+ public void deleteSubtree(Path path) throws IOException {
+ path = checkPath(path);
+ LOG.debug("Deleting subtree from table {} in region {}: {}",
+ tableName, region, path);
+
+ final PathMetadata meta = get(path);
+ if (meta == null) {
+ LOG.debug("Subtree path {} does not exist; this will be a no-op", path);
+ return;
+ }
+
+ for (DescendantsIterator desc = new DescendantsIterator(this, meta);
+ desc.hasNext();) {
+ delete(desc.next().getFileStatus().getPath());
+ }
+ }
+
+ @Override
+ public PathMetadata get(Path path) throws IOException {
+ path = checkPath(path);
+ LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
+
+ try {
+ final PathMetadata meta;
+ if (path.isRoot()) {
+ // Root does not persist in the table
+ meta = new PathMetadata(new S3AFileStatus(true, path, username));
+ } else {
+ final GetItemSpec spec = new GetItemSpec()
+ .withPrimaryKey(pathToKey(path))
+ .withConsistentRead(true); // strictly consistent read
+ final Item item = table.getItem(spec);
+ meta = itemToPathMetadata(s3afs.getUri(), item, username);
+ LOG.debug("Get from table {} in region {} returning for {}: {}",
+ tableName, region, path, meta);
+ }
+
+ if (meta != null) {
+ final S3AFileStatus status = (S3AFileStatus) meta.getFileStatus();
+ // for directory, we query its direct children to determine isEmpty bit
+ if (status.isDirectory()) {
+ final QuerySpec spec = new QuerySpec()
+ .withHashKey(pathToParentKeyAttribute(path))
+ .withConsistentRead(true)
+ .withMaxResultSize(1); // limit 1
+ final ItemCollection<QueryOutcome> items = table.query(spec);
+ status.setIsEmptyDirectory(!(items.iterator().hasNext()));
+ }
+ }
+
+ return meta;
+ } catch (AmazonClientException e) {
+ throw translateException("get", path, e);
+ }
+ }
+
+ @Override
+ public DirListingMetadata listChildren(Path path) throws IOException {
+ path = checkPath(path);
+ LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
+
+ try {
+ final QuerySpec spec = new QuerySpec()
+ .withHashKey(pathToParentKeyAttribute(path))
+ .withConsistentRead(true); // strictly consistent read
+ final ItemCollection<QueryOutcome> items = table.query(spec);
+
+ final List<PathMetadata> metas = new ArrayList<>();
+ for (Item item : items) {
+ metas.add(itemToPathMetadata(s3afs.getUri(), item, username));
+ }
+ LOG.trace("Listing table {} in region {} for {} returning {}",
+ tableName, region, path, metas);
+
+ return (metas.isEmpty() && get(path) == null)
+ ? null
+ : new DirListingMetadata(path, metas, false);
+ } catch (AmazonClientException e) {
+ throw translateException("listChildren", path, e);
+ }
+ }
+
+ @Override
+ public void move(Collection<Path> pathsToDelete,
+ Collection<PathMetadata> pathsToCreate) throws IOException {
+ final TableWriteItems writeItems = new TableWriteItems(tableName)
+ .withItemsToPut(pathMetadataToItem(pathsToCreate))
+ .withPrimaryKeysToDelete(pathToKey(pathsToDelete));
+ try {
+ BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
+
+ // Check for unprocessed keys in case of exceeding provisioned throughput
+ Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
+ while (unprocessed.size() > 0) {
+ res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
+ unprocessed = res.getUnprocessedItems();
+ }
+ } catch (AmazonClientException e) {
+ throw translateException("createTable", (String) null, e);
+ }
+ }
+
+ @Override
+ public void put(PathMetadata meta) throws IOException {
+ checkPathMetadata(meta);
+ LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+ innerPut(meta);
+
+ // put all its ancestors if not present; as an optimization we return at its
+ // first existent ancestor
+ Path path = meta.getFileStatus().getPath().getParent();
+ while (path != null && !path.isRoot()) {
+ final GetItemSpec spec = new GetItemSpec()
+ .withPrimaryKey(pathToKey(path))
+ .withConsistentRead(true); // strictly consistent read
+ final Item item = table.getItem(spec);
+ if (item == null) {
+ final S3AFileStatus status = new S3AFileStatus(false, path, username);
+ innerPut(new PathMetadata(status));
+ path = path.getParent();
+ } else {
+ break;
+ }
+ }
+ }
+
+ private void innerPut(PathMetadata meta) throws IOException {
+ final Path path = meta.getFileStatus().getPath();
+ if (path.isRoot()) {
+ LOG.debug("Root path / is not persisted");
+ return;
+ }
+
+ try {
+ table.putItem(pathMetadataToItem(meta));
+ } catch (AmazonClientException e) {
+ throw translateException("put", path, e);
+ }
+ }
+
+ @Override
+ public void put(DirListingMetadata meta) throws IOException {
+ LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
+
+ for (PathMetadata pathMetadata : meta.getListing()) {
+ put(pathMetadata);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (dynamoDB != null) {
+ LOG.info("Shutting down {}", this);
+ dynamoDB.shutdown();
+ dynamoDB = null;
+ }
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
+ try {
+ table.delete();
+ table.waitForDelete();
+ } catch (ResourceNotFoundException rnfe) {
+ LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in "
+ + "region {}. This may indicate that the table does not exist, "
+ + "or has been deleted by another concurrent thread or process.",
+ tableName, region);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
+ tableName, ie);
+ throw new IOException("Table " + tableName + " in region " + region
+ + " has not been deleted");
+ } catch (AmazonClientException e) {
+ throw translateException("destroy", (String) null, e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + '{'
+ + "region=" + region
+ + ", tableName=" + tableName
+ + '}';
+ }
+
+ /**
+ * Get the existing table and wait for it to become active.
+ *
+ * If a table with the intended name already exists, then it logs the
+ * {@link ResourceInUseException} and uses that table. The DynamoDB table
+ * creation API is asynchronous. This method wait for the table to become
+ * active after sending the creation request, so overall, this method is
+ * synchronous, and the table is guaranteed to exist after this method
+ * returns successfully.
+ */
+ void createTable() throws IOException {
+ final Configuration conf = s3afs.getConf();
+ final ProvisionedThroughput capacity = new ProvisionedThroughput(
+ conf.getLong(S3Guard.S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
+ S3Guard.S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT),
+ conf.getLong(S3Guard.S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
+ S3Guard.S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT));
+
+ try {
+ LOG.info("Creating DynamoDB table {} in region {}", tableName, region);
+ table = dynamoDB.createTable(new CreateTableRequest()
+ .withTableName(tableName)
+ .withKeySchema(keySchema())
+ .withAttributeDefinitions(attributeDefinitions())
+ .withProvisionedThroughput(capacity));
+ } catch (ResourceInUseException e) {
+ LOG.info("ResourceInUseException while creating DynamoDB table {} in "
+ + "region {}. This may indicate that the table was created by "
+ + "another concurrent thread or process.",
+ tableName, region);
+ table = dynamoDB.getTable(tableName);
+ }
+
+ try {
+ table.waitForActive();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted while waiting for DynamoDB table {} active",
+ tableName, e);
+ Thread.currentThread().interrupt();
+ throw new IOException("DynamoDB table '" + tableName + "' is not active "
+ + "yet in region " + region);
+ } catch (AmazonClientException e) {
+ throw translateException("createTable", (String) null, e);
+ }
+ }
+
+ /**
+ * Provision the table with given read and write capacity units.
+ */
+ void provisionTable(Long readCapacity, Long writeCapacity)
+ throws IOException {
+ final ProvisionedThroughput toProvision = new ProvisionedThroughput()
+ .withReadCapacityUnits(readCapacity)
+ .withWriteCapacityUnits(writeCapacity);
+ try {
+ final ProvisionedThroughputDescription p =
+ table.updateTable(toProvision).getProvisionedThroughput();
+ LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+ + "writeCapacityUnits={}",
+ tableName, region, p.getReadCapacityUnits(),
+ p.getWriteCapacityUnits());
+ } catch (AmazonClientException e) {
+ throw translateException("provisionTable", (String) null, e);
+ }
+ }
+
+ Table getTable() {
+ return table;
+ }
+
+ String getRegion() {
+ return region;
+ }
+
+ @VisibleForTesting
+ DynamoDB getDynamoDB() {
+ return dynamoDB;
+ }
+
+ /**
+ * Validates a path object; and make it qualified if it's not.
+ */
+ private Path checkPath(Path path) {
+ Preconditions.checkNotNull(path);
+ Preconditions.checkArgument(path.isAbsolute(),
+ "Path '" + path + "' is not absolute!");
+ return path.makeQualified(s3afs.getUri(), null);
+ }
+
+ /**
+ * Validates a path meta-data object.
+ */
+ private static void checkPathMetadata(PathMetadata meta) {
+ Preconditions.checkNotNull(meta);
+ Preconditions.checkNotNull(meta.getFileStatus());
+ Preconditions.checkNotNull(meta.getFileStatus().getPath());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 63c931d..bf1fdd7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -249,6 +249,13 @@ public class LocalMetadataStore implements MetadataStore {
}
+ @Override
+ public void destroy() throws IOException {
+ if (dirHash != null) {
+ dirHash.clear();
+ }
+ }
+
@VisibleForTesting
static <T> void clearHashByAncestor(Path ancestor, Map<Path, T> hash) {
for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
index f261fc9..6d3c440 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
@@ -143,4 +143,17 @@ public interface MetadataStore extends Closeable {
* @throws IOException if there is an error
*/
void put(DirListingMetadata meta) throws IOException;
+
+ /**
+ * Destroy all resources associated with the metadata store.
+ *
+ * The destroyed resources can be DynamoDB tables, MySQL databases/tables, or
+ * HDFS directories. Any operations after calling this method may possibly
+ * fail.
+ *
+ * This operation is idempotent.
+ *
+ * @throws IOException if there is an error
+ */
+ void destroy() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
index b3c9648..8041870 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
@@ -76,4 +76,9 @@ public class NullMetadataStore implements MetadataStore {
public void put(DirListingMetadata meta) throws IOException {
return;
}
+
+ @Override
+ public void destroy() throws IOException {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
new file mode 100644
index 0000000..b3e23eb
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.KeyAttribute;
+import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
+import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+
+/**
+ * Defines methods for translating between domain model objects and their
+ * representations in the DynamoDB schema.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+final class PathMetadataDynamoDBTranslation {
+
+ /** The HASH key name of each item. */
+ @VisibleForTesting
+ static final String PARENT = "parent";
+ /** The RANGE key name of each item. */
+ @VisibleForTesting
+ static final String CHILD = "child";
+ @VisibleForTesting
+ static final String IS_DIR = "is_dir";
+ @VisibleForTesting
+ static final String MOD_TIME = "mod_time";
+ @VisibleForTesting
+ static final String FILE_LENGTH = "file_length";
+ @VisibleForTesting
+ static final String BLOCK_SIZE = "block_size";
+
+ /**
+ * Returns the key schema for the DynamoDB table.
+ *
+ * @return DynamoDB key schema
+ */
+ static Collection<KeySchemaElement> keySchema() {
+ return Arrays.asList(
+ new KeySchemaElement(PARENT, KeyType.HASH),
+ new KeySchemaElement(CHILD, KeyType.RANGE));
+ }
+
+ /**
+ * Returns the attribute definitions for the DynamoDB table.
+ *
+ * @return DynamoDB attribute definitions
+ */
+ static Collection<AttributeDefinition> attributeDefinitions() {
+ return Arrays.asList(
+ new AttributeDefinition(PARENT, ScalarAttributeType.S),
+ new AttributeDefinition(CHILD, ScalarAttributeType.S));
+ }
+
+ /**
+ * Converts a DynamoDB item to a {@link PathMetadata}.
+ *
+ * @param item DynamoDB item to convert
+ * @return {@code item} converted to a {@link PathMetadata}
+ */
+ static PathMetadata itemToPathMetadata(URI s3aUri, Item item, String username)
+ throws IOException {
+ if (item == null) {
+ return null;
+ }
+
+ Path path = new Path(item.getString(PARENT), item.getString(CHILD));
+ if (!path.isAbsoluteAndSchemeAuthorityNull()) {
+ return null;
+ }
+
+ path = path.makeQualified(s3aUri, null);
+ boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
+ final FileStatus fileStatus;
+ if (isDir) {
+ fileStatus = new S3AFileStatus(true, path, username);
+ } else {
+ long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
+ long modTime = item.hasAttribute(MOD_TIME) ? item.getLong(MOD_TIME) : 0;
+ long block = item.hasAttribute(BLOCK_SIZE) ? item.getLong(BLOCK_SIZE) : 0;
+ fileStatus = new S3AFileStatus(len, modTime, path, block, username);
+ }
+
+ return new PathMetadata(fileStatus);
+ }
+
+ /**
+ * Converts a {@link PathMetadata} to a DynamoDB item.
+ *
+ * @param meta {@link PathMetadata} to convert
+ * @return {@code meta} converted to DynamoDB item
+ */
+ static Item pathMetadataToItem(PathMetadata meta) {
+ Preconditions.checkNotNull(meta);
+ assert meta.getFileStatus() instanceof S3AFileStatus;
+ final S3AFileStatus status = (S3AFileStatus) meta.getFileStatus();
+ final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
+ if (status.isDirectory()) {
+ item.withBoolean(IS_DIR, true);
+ } else {
+ item.withLong(FILE_LENGTH, status.getLen())
+ .withLong(MOD_TIME, status.getModificationTime())
+ .withLong(BLOCK_SIZE, status.getBlockSize());
+ }
+
+ return item;
+ }
+
+ /**
+ * Converts a collection {@link PathMetadata} to a collection DynamoDB items.
+ *
+ * @see #pathMetadataToItem(PathMetadata)
+ */
+ static Collection<Item> pathMetadataToItem(Collection<PathMetadata> metas) {
+ final List<Item> items = new ArrayList<>(metas.size());
+ for (PathMetadata meta : metas) {
+ items.add(pathMetadataToItem(meta));
+ }
+ return items;
+ }
+
+ /**
+ * Converts a {@link Path} to a DynamoDB equality condition on that path as
+ * parent, suitable for querying all direct children of the path.
+ *
+ * @param path the path; can not be null
+ * @return DynamoDB equality condition on {@code path} as parent
+ */
+ static KeyAttribute pathToParentKeyAttribute(Path path) {
+ removeSchemeAndAuthority(path);
+ return new KeyAttribute(PARENT, path.toUri().getPath());
+ }
+
+ /**
+ * Converts a {@link Path} to a DynamoDB key, suitable for getting the item
+ * matching the path.
+ *
+ * @param path the path; can not be null
+ * @return DynamoDB key for item matching {@code path}
+ */
+ static PrimaryKey pathToKey(Path path) {
+ path = removeSchemeAndAuthority(path);
+ Preconditions.checkArgument(!path.isRoot(),
+ "Root path is not mapped to any PrimaryKey");
+ return new PrimaryKey(PARENT, path.getParent().toUri().getPath(),
+ CHILD, path.getName());
+ }
+
+ /**
+ * Converts a collection of {@link Path} to a collection of DynamoDB keys.
+ *
+ * @see #pathToKey(Path)
+ */
+ static PrimaryKey[] pathToKey(Collection<Path> paths) {
+ Preconditions.checkNotNull(paths);
+ final PrimaryKey[] keys = new PrimaryKey[paths.size()];
+ int i = 0;
+ for (Path p : paths) {
+ keys[i++] = pathToKey(p);
+ }
+ return keys;
+ }
+
+ private static Path removeSchemeAndAuthority(Path path) {
+ Preconditions.checkNotNull(path);
+ return Path.getPathWithoutSchemeAndAuthority(path);
+ }
+
+ /**
+ * There is no need to instantiate this class.
+ */
+ private PathMetadataDynamoDBTranslation() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 904a1c3..d8469de 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.fs.s3a.s3guard;
import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -45,6 +48,57 @@ final public class S3Guard {
public static final String S3_METADATA_STORE_IMPL =
"fs.s3a.metadatastore.impl";
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ static final String S3GUARD_DDB_CLIENT_FACTORY_IMPL =
+ "fs.s3a.s3guard.ddb.client.factory.impl";
+
+ static final Class<? extends DynamoDBClientFactory>
+ S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT =
+ DynamoDBClientFactory.DefaultDynamoDBClientFactory.class;
+
+ /**
+ * The endpoint of the DynamoDB service.
+ *
+ * This config has not default value. If the user does not set this, the AWS
+ * SDK will find the endpoint automatically by the Region.
+ */
+ @InterfaceStability.Unstable
+ static final String S3GUARD_DDB_ENDPOINT_KEY =
+ "fs.s3a.s3guard.ddb.endpoint";
+
+ /**
+ * The DynamoDB table name to use.
+ *
+ * This config has no default value. If the user does not set this, the
+ * S3Guard implementation will use the respective S3 bucket name.
+ */
+ @InterfaceStability.Unstable
+ static final String S3GUARD_DDB_TABLE_NAME_KEY =
+ "fs.s3a.s3guard.ddb.table";
+
+ /**
+ * Whether to create the table.
+ *
+ * This is for internal usage and users should not set this one directly.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ static final String S3GUARD_DDB_TABLE_CREATE_KEY =
+ "fs.s3a.s3guard.ddb.table.create";
+
+ @InterfaceStability.Unstable
+ static final String S3GUARD_DDB_TABLE_CAPACITY_READ_KEY =
+ "fs.s3a.s3guard.ddb.table.capacity.read";
+
+ static final long S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT = 500;
+
+ @InterfaceStability.Unstable
+ static final String S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY =
+ "fs.s3a.s3guard.ddb.table.capacity.write";
+
+ static final long S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT = 100;
+
// Utility class. All static functions.
private S3Guard() { }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 6734947..398d671 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
@@ -26,6 +27,8 @@ import com.amazonaws.services.s3.AmazonS3;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
import org.junit.After;
import org.junit.Before;
@@ -33,7 +36,8 @@ import org.junit.Rule;
import org.junit.rules.ExpectedException;
/**
- * Abstract base class for S3A unit tests using a mock S3 client.
+ * Abstract base class for S3A unit tests using a mock S3 client and a null
+ * metadata store.
*/
public abstract class AbstractS3AMockTest {
@@ -55,6 +59,10 @@ public abstract class AbstractS3AMockTest {
Configuration conf = new Configuration();
conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
S3ClientFactory.class);
+ // We explicitly disable MetadataStore even if it's configured. For unit
+ // test we don't issue request to AWS DynamoDB service.
+ conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
+ MetadataStore.class);
fs = new S3AFileSystem();
URI uri = URI.create(FS_S3A + "://" + BUCKET);
fs.initialize(uri, conf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
new file mode 100644
index 0000000..66367c6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Tests which exercise treatment of empty/non-empty directories.
+ */
+public class ITestS3AEmptyDirectory extends AbstractS3ATestBase {
+
+ @Test
+ public void testDirectoryBecomesEmpty() throws Exception {
+ S3AFileSystem fs = getFileSystem();
+
+ // 1. set up non-empty dir
+ Path dir = path("testEmptyDir");
+ Path child = path("testEmptyDir/dir2");
+ mkdirs(child);
+
+ S3AFileStatus status = getS3AFileStatus(fs, dir);
+ assertFalse("Dir status should not be empty", status.isEmptyDirectory());
+
+ // 2. Make testEmptyDir empty
+ assertDeleted(child, false);
+ status = getS3AFileStatus(fs, dir);
+
+ assertTrue("Dir status should be empty", status.isEmptyDirectory());
+ }
+
+ @Test
+ public void testDirectoryBecomesNonEmpty() throws Exception {
+ S3AFileSystem fs = getFileSystem();
+
+ // 1. create empty dir
+ Path dir = path("testEmptyDir");
+ mkdirs(dir);
+
+ S3AFileStatus status = getS3AFileStatus(fs, dir);
+ assertTrue("Dir status should be empty", status.isEmptyDirectory());
+
+ // 2. Make testEmptyDir non-empty
+
+ ContractTestUtils.touch(fs, path("testEmptyDir/file1"));
+ status = getS3AFileStatus(fs, dir);
+
+ assertFalse("Dir status should not be empty", status.isEmptyDirectory());
+ }
+
+ private S3AFileStatus getS3AFileStatus(S3AFileSystem fs, Path p) throws
+ IOException {
+ FileStatus s = fs.getFileStatus(p);
+ assertTrue(s instanceof S3AFileStatus);
+ return (S3AFileStatus)s;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
index fb46357..478cfb3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a.s3guard;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+
+import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -29,8 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -50,11 +50,11 @@ public abstract class MetadataStoreTestBase extends Assert {
LoggerFactory.getLogger(MetadataStoreTestBase.class);
/** Some dummy values for sanity-checking FileStatus contents. */
- protected static final long BLOCK_SIZE = 32 * 1024 * 1024;
- protected static final int REPLICATION = 1;
- private static final FsPermission PERMISSION = new FsPermission((short)0755);
- private static final String OWNER = "bob";
- private static final String GROUP = "uncles";
+ static final long BLOCK_SIZE = 32 * 1024 * 1024;
+ static final int REPLICATION = 1;
+ static final FsPermission PERMISSION = new FsPermission((short)0755);
+ static final String OWNER = "bob";
+ static final String GROUP = "uncles";
private final long accessTime = System.currentTimeMillis();
private final long modTime = accessTime - 5000;
@@ -62,7 +62,7 @@ public abstract class MetadataStoreTestBase extends Assert {
* Each test should override this.
* @return Contract which specifies the MetadataStore under test plus config.
*/
- public abstract AbstractMSContract createContract();
+ public abstract AbstractMSContract createContract() throws IOException;
/**
* Tests assume that implementations will return recently set results. If
@@ -94,11 +94,52 @@ public abstract class MetadataStoreTestBase extends Assert {
public void tearDown() throws Exception {
LOG.debug("== Tear down. ==");
if (ms != null) {
- ms.close();
+ ms.destroy();
ms = null;
}
}
+ /**
+ * Test that we can get the whole sub-tree by iterating DescendantsIterator.
+ *
+ * The tree is similar to or same as the example in code comment.
+ */
+ @Test
+ public void testDescendantsIterator() throws IOException {
+ final String[] tree = new String[] {
+ "/dir1",
+ "/dir1/dir2",
+ "/dir1/dir3",
+ "/dir1/dir2/file1",
+ "/dir1/dir2/file2",
+ "/dir1/dir3/dir4",
+ "/dir1/dir3/dir5",
+ "/dir1/dir3/dir4/file3",
+ "/dir1/dir3/dir5/file4",
+ "/dir1/dir3/dir6"
+ };
+ // we set up the example file system tree in metadata store
+ for (String pathStr : tree) {
+ final FileStatus status = pathStr.contains("file")
+ ? basicFileStatus(strToPath(pathStr), 100, false)
+ : basicFileStatus(strToPath(pathStr), 0, true);
+ ms.put(new PathMetadata(status));
+ }
+
+ final Set<String> actual = new HashSet<>();
+ final PathMetadata rootMeta = new PathMetadata(makeDirStatus("/"));
+ for (DescendantsIterator desc = new DescendantsIterator(ms, rootMeta);
+ desc.hasNext();) {
+ final Path p = desc.next().getFileStatus().getPath();
+ actual.add(Path.getPathWithoutSchemeAndAuthority(p).toString());
+ }
+ LOG.info("We got {} by iterating DescendantsIterator", actual);
+
+ if (!allowMissing()) {
+ assertEquals(Sets.newHashSet(tree), actual);
+ }
+ }
+
@Test
public void testPutNew() throws Exception {
/* create three dirs /da1, /da2, /da3 */
@@ -110,13 +151,13 @@ public abstract class MetadataStoreTestBase extends Assert {
*/
ms.put(new PathMetadata(makeFileStatus("/da1/db1/fc1", 100)));
- assertEmptyDirs("/da1", "/da2", "/da3");
+ assertEmptyDirs("/da2", "/da3");
assertDirectorySize("/da1/db1", 1);
/* Check contents of dir status. */
PathMetadata dirMeta = ms.get(strToPath("/da1"));
if (!allowMissing() || dirMeta != null) {
- verifyDirStatus(dirMeta);
+ verifyDirStatus(dirMeta.getFileStatus());
}
/* This already exists, and should silently replace it. */
@@ -136,8 +177,7 @@ public abstract class MetadataStoreTestBase extends Assert {
PathMetadata meta = ms.get(strToPath("/da1/db1/fc2"));
if (!allowMissing() || meta != null) {
assertNotNull("Get file after put new.", meta);
- assertEquals("Cached file size correct.", 200,
- meta.getFileStatus().getLen());
+ verifyFileStatus(meta.getFileStatus(), 200);
}
}
@@ -149,13 +189,13 @@ public abstract class MetadataStoreTestBase extends Assert {
ms.put(new PathMetadata(makeDirStatus(dirPath)));
PathMetadata meta = ms.get(strToPath(filePath));
if (!allowMissing() || meta != null) {
- verifyBasicFileStatus(meta);
+ verifyFileStatus(meta.getFileStatus(), 100);
}
ms.put(new PathMetadata(basicFileStatus(strToPath(filePath), 9999, false)));
meta = ms.get(strToPath(filePath));
if (!allowMissing() || meta != null) {
- assertEquals("Updated size", 9999, meta.getFileStatus().getLen());
+ verifyFileStatus(meta.getFileStatus(), 9999);
}
}
@@ -194,7 +234,7 @@ public abstract class MetadataStoreTestBase extends Assert {
@Test
public void testDeleteSubtreeHostPath() throws Exception {
- deleteSubtreeHelper("s3a://test-bucket-name");
+ deleteSubtreeHelper(contract.getFileSystem().getUri().toString());
}
private void deleteSubtreeHelper(String pathPrefix) throws Exception {
@@ -273,7 +313,7 @@ public abstract class MetadataStoreTestBase extends Assert {
PathMetadata meta = ms.get(strToPath(filePath));
if (!allowMissing() || meta != null) {
assertNotNull("Get found file", meta);
- verifyBasicFileStatus(meta);
+ verifyFileStatus(meta.getFileStatus(), 100);
}
meta = ms.get(strToPath(dirPath));
@@ -389,7 +429,7 @@ public abstract class MetadataStoreTestBase extends Assert {
meta = ms.get(strToPath("/b1/file1"));
if (!allowMissing() || meta != null) {
assertNotNull("dest file not null", meta);
- verifyBasicFileStatus(meta);
+ verifyFileStatus(meta.getFileStatus(), 100);
}
dirMeta = ms.listChildren(strToPath("/b1"));
@@ -455,7 +495,7 @@ public abstract class MetadataStoreTestBase extends Assert {
for (String ps : pathStrs) {
b.add(strToPath(ps));
}
- assertTrue("Same set of files", a.equals(b));
+ assertEquals("Same set of files", b, a);
}
private void putListStatusFiles(String dirPath, boolean authoritative,
@@ -508,21 +548,13 @@ public abstract class MetadataStoreTestBase extends Assert {
assertNotNull(pathStr + " should be cached.", meta);
}
- // Convenience to add scheme if missing
+ /**
+ * Convenience to create a fully qualified Path from string.
+ */
private Path strToPath(String p) {
- Path path = new Path(p);
- URI uri = path.toUri();
- if (uri.getScheme() == null) {
- String fsScheme = contract.getFileSystem().getScheme();
- try {
- return new Path(new URI(fsScheme, uri.getHost(), uri.getPath(),
- uri.getFragment()));
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("FileStatus path invalid with " +
- "scheme " + fsScheme + " added", e);
- }
- }
- return path;
+ final Path path = new Path(p);
+ assert path.isAbsolute();
+ return path.makeQualified(contract.getFileSystem().getUri(), null);
}
private void assertEmptyDirectory(String pathStr) throws IOException {
@@ -535,40 +567,42 @@ public abstract class MetadataStoreTestBase extends Assert {
}
}
- private FileStatus basicFileStatus(Path path, int size, boolean isDir) {
+ FileStatus basicFileStatus(Path path, int size, boolean isDir)
+ throws IOException {
return new FileStatus(size, isDir, REPLICATION, BLOCK_SIZE, modTime,
accessTime, PERMISSION, OWNER, GROUP, path);
}
- private FileStatus makeFileStatus(String pathStr, int size) {
+ private FileStatus makeFileStatus(String pathStr, int size)
+ throws IOException {
return basicFileStatus(strToPath(pathStr), size, false);
}
- private void verifyBasicFileStatus(PathMetadata meta) {
- FileStatus status = meta.getFileStatus();
+ void verifyFileStatus(FileStatus status, long size) {
assertFalse("Not a dir", status.isDirectory());
- assertEquals("Replication value", REPLICATION, status.getReplication());
- assertEquals("Access time", accessTime, status.getAccessTime());
assertEquals("Mod time", modTime, status.getModificationTime());
+ assertEquals("File size", size, status.getLen());
assertEquals("Block size", BLOCK_SIZE, status.getBlockSize());
- assertEquals("Owner", OWNER, status.getOwner());
- assertEquals("Group", GROUP, status.getGroup());
- assertEquals("Permission", PERMISSION, status.getPermission());
}
- private FileStatus makeDirStatus(String pathStr) {
+ private FileStatus makeDirStatus(String pathStr) throws IOException {
return basicFileStatus(strToPath(pathStr), 0, true);
}
- private void verifyDirStatus(PathMetadata meta) {
- FileStatus status = meta.getFileStatus();
+ /**
+ * Verify the directory file status. Subclass may verify additional fields.
+ */
+ void verifyDirStatus(FileStatus status) {
assertTrue("Is a dir", status.isDirectory());
assertEquals("zero length", 0, status.getLen());
- assertEquals("Replication value", REPLICATION, status.getReplication());
- assertEquals("Access time", accessTime, status.getAccessTime());
- assertEquals("Mod time", modTime, status.getModificationTime());
- assertEquals("Owner", OWNER, status.getOwner());
- assertEquals("Group", GROUP, status.getGroup());
- assertEquals("Permission", PERMISSION, status.getPermission());
}
+
+ long getModTime() {
+ return modTime;
+ }
+
+ long getAccessTime() {
+ return accessTime;
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
new file mode 100644
index 0000000..daeb9ac
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
+import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import com.amazonaws.services.dynamodbv2.model.TableDescription;
+
+import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.MockS3ClientFactory;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.keySchema;
+
+/**
+ * Test that {@link DynamoDBMetadataStore} implements {@link MetadataStore}.
+ *
+ * In this unit test, we create an in-memory DynamoDBLocal server instance for
+ * all unit test cases. You won't be charged bills for DynamoDB requests when
+ * you run this test. An {@link S3AFileSystem} object is created and shared for
+ * initializing {@link DynamoDBMetadataStore} objects. There are no real S3
+ * request issued as the underlying AWS S3Client is mocked.
+ *
+ * According to the base class, every test case will have independent contract
+ * to create a new {@link DynamoDBMetadataStore} instance and initializes it.
+ * A table will be created for each test by the test contract, and will be
+ * destroyed after the test case finishes.
+ */
+public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDynamoDBMetadataStore.class);
+ private static final String BUCKET = "TestDynamoDBMetadataStore";
+
+ /** The DynamoDBLocal dynamoDBLocalServer instance for testing. */
+ private static DynamoDBProxyServer dynamoDBLocalServer;
+ private static String ddbEndpoint;
+ /** The DynamoDB instance that can issue requests directly to server. */
+ private static DynamoDB dynamoDB;
+
+ @Rule
+ public final Timeout timeout = new Timeout(60 * 1000);
+
+ /**
+ * Sets up the in-memory DynamoDBLocal server and initializes s3 file system.
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ GenericTestUtils.setLogLevel(DynamoDBMetadataStore.LOG, Level.ALL);
+ // sqlite4java library should have been copied to target/native-libs
+ System.setProperty("sqlite4java.library.path", "target/native-libs");
+
+ // Set up the in-memory local DynamoDB instance for all test cases
+ final String port = String.valueOf(ServerSocketUtil.getPort(0, 100));
+ dynamoDBLocalServer = ServerRunner.createServerFromCommandLineArgs(
+ new String[] {"-inMemory", "-port", port});
+ dynamoDBLocalServer.start();
+ ddbEndpoint = "http://localhost:" + port;
+ LOG.info("DynamoDBLocal for test was started at {}", ddbEndpoint);
+
+ try {
+ dynamoDB = new DynamoDBMSContract().getMetadataStore().getDynamoDB();
+ } catch (AmazonServiceException e) {
+ final String msg = "Cannot initialize a DynamoDBMetadataStore instance "
+ + "against the local DynamoDB server. Perhaps the DynamoDBLocal "
+ + "server is not configured correctly. ";
+ LOG.error(msg, e);
+ // fail fast if the DynamoDBLocal server can not work
+ fail(msg + e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ if (dynamoDB != null) {
+ dynamoDB.shutdown();
+ }
+ if (dynamoDBLocalServer != null) {
+ LOG.info("Shutting down the in-memory local DynamoDB server");
+ try {
+ dynamoDBLocalServer.stop();
+ } catch (Exception e) {
+ final String msg = "Got exception to stop the DynamoDBLocal server. ";
+ LOG.error(msg, e);
+ fail(msg + e.getLocalizedMessage());
+ }
+ }
+ }
+
+ /**
+ * Each contract has its own S3AFileSystem and DynamoDBMetadataStore objects.
+ */
+ private static class DynamoDBMSContract extends AbstractMSContract {
+ private final S3AFileSystem s3afs;
+ private final DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+
+ DynamoDBMSContract() throws IOException {
+ final Configuration conf = new Configuration();
+ // using mocked S3 clients
+ conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
+ S3ClientFactory.class);
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ URI.create(Constants.FS_S3A + "://" + BUCKET).toString());
+ // setting config for creating a DynamoDBClient against local server
+ conf.set(Constants.ACCESS_KEY, "dummy-access-key");
+ conf.set(Constants.SECRET_KEY, "dummy-secret-key");
+ conf.set(S3Guard.S3GUARD_DDB_ENDPOINT_KEY, ddbEndpoint);
+
+ // always create new file system object for a test contract
+ s3afs = (S3AFileSystem) FileSystem.newInstance(conf);
+ ms.initialize(s3afs);
+ }
+
+ @Override
+ public S3AFileSystem getFileSystem() {
+ return s3afs;
+ }
+
+ @Override
+ public DynamoDBMetadataStore getMetadataStore() {
+ return ms;
+ }
+ }
+
+ @Override
+ public DynamoDBMSContract createContract() throws IOException {
+ return new DynamoDBMSContract();
+ }
+
+ @Override
+ FileStatus basicFileStatus(Path path, int size, boolean isDir)
+ throws IOException {
+ String owner = UserGroupInformation.getCurrentUser().getShortUserName();
+ return isDir
+ ? new S3AFileStatus(false, path, owner)
+ : new S3AFileStatus(size, getModTime(), path, BLOCK_SIZE, owner);
+ }
+
+ /**
+ * This tests that after initialize() using an S3AFileSystem object, the
+ * instance should have been initialized successfully, and tables are ACTIVE.
+ */
+ @Test
+ public void testInitialize() throws IOException {
+ final String tableName = "testInitializeWithFileSystem";
+ final S3AFileSystem s3afs = createContract().getFileSystem();
+ final Configuration conf = s3afs.getConf();
+ conf.set(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+ try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+ ddbms.initialize(s3afs);
+ verifyTableInitialized(tableName);
+ assertNotNull(ddbms.getTable());
+ assertEquals(tableName, ddbms.getTable().getTableName());
+ assertEquals("DynamoDB table should be in the same region as S3 bucket",
+ s3afs.getAmazonS3Client().getBucketLocation(tableName),
+ ddbms.getRegion());
+ }
+ }
+
+ /**
+ * This tests that after initialize() using a Configuration object, the
+ * instance should have been initialized successfully, and tables are ACTIVE.
+ */
+ @Test
+ public void testInitializeWithConfiguration() throws IOException {
+ final String tableName = "testInitializeWithConfiguration";
+ final Configuration conf = createContract().getFileSystem().getConf();
+ conf.set(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+ try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+ ddbms.initialize(conf);
+ verifyTableInitialized(tableName);
+ assertNotNull(ddbms.getTable());
+ assertEquals(tableName, ddbms.getTable().getTableName());
+ assertEquals("Unexpected key schema found!",
+ keySchema(),
+ ddbms.getTable().describe().getKeySchema());
+ }
+ }
+
+ @Test
+ public void testCreateExistingTable() throws IOException {
+ final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
+ verifyTableInitialized(BUCKET);
+ // create existing table
+ ddbms.createTable();
+ verifyTableInitialized(BUCKET);
+ }
+
+ /**
+ * Test cases about root directory as it is not in the DynamoDB table.
+ */
+ @Test
+ public void testRootDirectory() throws IOException {
+ final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
+ verifyRootDirectory(ddbms.get(new Path("/")), true);
+
+ ddbms.put(new PathMetadata(new S3AFileStatus(true,
+ new Path("/foo"),
+ UserGroupInformation.getCurrentUser().getShortUserName())));
+ verifyRootDirectory(ddbms.get(new Path("/")), false);
+ }
+
+ private void verifyRootDirectory(PathMetadata rootMeta, boolean isEmpty) {
+ assertNotNull(rootMeta);
+ final S3AFileStatus status = (S3AFileStatus) rootMeta.getFileStatus();
+ assertNotNull(status);
+ assertTrue(status.isDirectory());
+ assertEquals(isEmpty, status.isEmptyDirectory());
+ }
+
+ @Test
+ public void testProvisionTable() throws IOException {
+ final DynamoDBMetadataStore ddbms = createContract().getMetadataStore();
+ final ProvisionedThroughputDescription oldProvision =
+ dynamoDB.getTable(BUCKET).describe().getProvisionedThroughput();
+ ddbms.provisionTable(oldProvision.getReadCapacityUnits() * 2,
+ oldProvision.getWriteCapacityUnits() * 2);
+ final ProvisionedThroughputDescription newProvision =
+ dynamoDB.getTable(BUCKET).describe().getProvisionedThroughput();
+ LOG.info("Old provision = {}, new provision = {}",
+ oldProvision, newProvision);
+ assertEquals(oldProvision.getReadCapacityUnits() * 2,
+ newProvision.getReadCapacityUnits().longValue());
+ assertEquals(oldProvision.getWriteCapacityUnits() * 2,
+ newProvision.getWriteCapacityUnits().longValue());
+ }
+
+ @Test
+ public void testDeleteTable() throws IOException {
+ final String tableName = "testDeleteTable";
+ final S3AFileSystem s3afs = createContract().getFileSystem();
+ final Configuration conf = s3afs.getConf();
+ conf.set(S3Guard.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+ try (DynamoDBMetadataStore ddbms = new DynamoDBMetadataStore()) {
+ ddbms.initialize(s3afs);
+ // we can list the empty table
+ ddbms.listChildren(new Path("/"));
+
+ ddbms.destroy();
+ verifyTableNotExist(tableName);
+
+ // delete table once more; be ResourceNotFoundException swallowed silently
+ ddbms.destroy();
+ verifyTableNotExist(tableName);
+
+ try {
+ // we can no longer list the destroyed table
+ ddbms.listChildren(new Path("/"));
+ fail("Should have failed after the table is destroyed!");
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ /**
+ * This validates the table is created and ACTIVE in DynamoDB.
+ *
+ * This should not rely on the {@link DynamoDBMetadataStore} implementation.
+ */
+ private static void verifyTableInitialized(String tableName) {
+ final Table table = dynamoDB.getTable(tableName);
+ final TableDescription td = table.describe();
+ assertEquals(tableName, td.getTableName());
+ assertEquals("ACTIVE", td.getTableStatus());
+ }
+
+ /**
+ * This validates the table is not found in DynamoDB.
+ *
+ * This should not rely on the {@link DynamoDBMetadataStore} implementation.
+ */
+ private static void verifyTableNotExist(String tableName) {
+ final Table table = dynamoDB.getTable(tableName);
+ try {
+ table.describe();
+ fail("Expecting ResourceNotFoundException for table '" + tableName + "'");
+ } catch (ResourceNotFoundException ignored) {
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d354cd18/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
index 12aa9c6..68e9842 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.s3a.s3guard;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
@@ -100,4 +101,27 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
map.clear();
}
+ @Override
+ protected void verifyFileStatus(FileStatus status, long size) {
+ super.verifyFileStatus(status, size);
+
+ assertEquals("Replication value", REPLICATION, status.getReplication());
+ assertEquals("Access time", getAccessTime(), status.getAccessTime());
+ assertEquals("Owner", OWNER, status.getOwner());
+ assertEquals("Group", GROUP, status.getGroup());
+ assertEquals("Permission", PERMISSION, status.getPermission());
+ }
+
+ @Override
+ protected void verifyDirStatus(FileStatus status) {
+ super.verifyDirStatus(status);
+
+ assertEquals("Mod time", getModTime(), status.getModificationTime());
+ assertEquals("Replication value", REPLICATION, status.getReplication());
+ assertEquals("Access time", getAccessTime(), status.getAccessTime());
+ assertEquals("Owner", OWNER, status.getOwner());
+ assertEquals("Group", GROUP, status.getGroup());
+ assertEquals("Permission", PERMISSION, status.getPermission());
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org