You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/09/01 20:01:55 UTC
[67/74] [abbrv] hadoop git commit: HADOOP-13345 HS3Guard: Improved
Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri,
Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.
HADOOP-13345 HS3Guard: Improved Consistency for S3A.
Contributed by: Chris Nauroth, Aaron Fabbri, Mingliang Liu, Lei (Eddy) Xu,
Sean Mackrory, Steve Loughran and others.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/621b43e2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/621b43e2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/621b43e2
Branch: refs/heads/HDFS-7240
Commit: 621b43e254afaff708cd6fc4698b29628f6abc33
Parents: 7a96033
Author: Steve Loughran <st...@apache.org>
Authored: Fri Sep 1 14:13:41 2017 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Fri Sep 1 14:13:41 2017 +0100
----------------------------------------------------------------------
.../main/resources/assemblies/hadoop-tools.xml | 13 +
hadoop-common-project/hadoop-common/pom.xml | 5 +
.../apache/hadoop/fs/AbstractFileSystem.java | 8 +
.../java/org/apache/hadoop/fs/FileContext.java | 9 +
.../src/main/resources/core-default.xml | 108 ++
.../hadoop/fs/FileSystemContractBaseTest.java | 16 +-
.../fs/contract/AbstractContractRenameTest.java | 63 ++
.../org/apache/hadoop/test/LambdaTestUtils.java | 112 ++
hadoop-project/pom.xml | 19 +
.../hadoop-aws/dev-support/findbugs-exclude.xml | 6 +
hadoop-tools/hadoop-aws/pom.xml | 129 ++-
.../org/apache/hadoop/fs/s3a/Constants.java | 133 ++-
.../hadoop/fs/s3a/DefaultS3ClientFactory.java | 233 ++++
.../fs/s3a/InconsistentAmazonS3Client.java | 434 ++++++++
.../fs/s3a/InconsistentS3ClientFactory.java | 40 +
.../java/org/apache/hadoop/fs/s3a/Listing.java | 263 ++++-
.../hadoop/fs/s3a/S3ABlockOutputStream.java | 17 +-
.../org/apache/hadoop/fs/s3a/S3AFileStatus.java | 45 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 572 ++++++++--
.../hadoop/fs/s3a/S3AInstrumentation.java | 81 +-
.../apache/hadoop/fs/s3a/S3AOutputStream.java | 14 +-
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 36 +-
.../apache/hadoop/fs/s3a/S3ClientFactory.java | 190 +---
.../org/apache/hadoop/fs/s3a/Statistic.java | 13 +-
.../java/org/apache/hadoop/fs/s3a/Tristate.java | 32 +
.../org/apache/hadoop/fs/s3a/UploadInfo.java | 43 +
.../fs/s3a/s3guard/DescendantsIterator.java | 142 +++
.../fs/s3a/s3guard/DirListingMetadata.java | 322 ++++++
.../fs/s3a/s3guard/DynamoDBClientFactory.java | 132 +++
.../fs/s3a/s3guard/DynamoDBMetadataStore.java | 1010 ++++++++++++++++++
.../fs/s3a/s3guard/LocalMetadataStore.java | 435 ++++++++
.../hadoop/fs/s3a/s3guard/LruHashMap.java | 50 +
.../hadoop/fs/s3a/s3guard/MetadataStore.java | 221 ++++
.../s3guard/MetadataStoreListFilesIterator.java | 169 +++
.../fs/s3a/s3guard/NullMetadataStore.java | 104 ++
.../hadoop/fs/s3a/s3guard/PathMetadata.java | 143 +++
.../PathMetadataDynamoDBTranslation.java | 304 ++++++
.../apache/hadoop/fs/s3a/s3guard/S3Guard.java | 463 ++++++++
.../hadoop/fs/s3a/s3guard/S3GuardTool.java | 924 ++++++++++++++++
.../hadoop/fs/s3a/s3guard/package-info.java | 30 +
.../hadoop/fs/s3native/S3xLoginHelper.java | 4 +
.../src/main/shellprofile.d/hadoop-s3guard.sh | 37 +
.../src/site/markdown/tools/hadoop-aws/index.md | 3 +-
.../site/markdown/tools/hadoop-aws/s3guard.md | 610 +++++++++++
.../site/markdown/tools/hadoop-aws/testing.md | 288 ++++-
.../fs/contract/s3a/ITestS3AContractCreate.java | 14 +
.../fs/contract/s3a/ITestS3AContractDelete.java | 14 +
.../fs/contract/s3a/ITestS3AContractDistCp.java | 7 +
.../s3a/ITestS3AContractGetFileStatus.java | 4 +
.../fs/contract/s3a/ITestS3AContractMkdir.java | 14 +
.../fs/contract/s3a/ITestS3AContractOpen.java | 14 +
.../fs/contract/s3a/ITestS3AContractRename.java | 13 +
.../contract/s3a/ITestS3AContractRootDir.java | 14 +
.../fs/contract/s3a/ITestS3AContractSeek.java | 14 +
.../hadoop/fs/s3a/AbstractS3AMockTest.java | 9 +-
.../hadoop/fs/s3a/AbstractS3ATestBase.java | 26 +-
.../fs/s3a/ITestS3AAWSCredentialsProvider.java | 4 +
.../hadoop/fs/s3a/ITestS3AConfiguration.java | 3 +-
.../fs/s3a/ITestS3ACopyFromLocalFile.java | 3 +-
.../hadoop/fs/s3a/ITestS3ACredentialsInURL.java | 13 +-
.../hadoop/fs/s3a/ITestS3ADelayedFNF.java | 62 ++
.../hadoop/fs/s3a/ITestS3AEmptyDirectory.java | 83 ++
.../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 319 +++---
.../fs/s3a/ITestS3AFileOperationCost.java | 40 +-
.../fs/s3a/ITestS3AFileSystemContract.java | 1 +
.../hadoop/fs/s3a/ITestS3AInconsistency.java | 100 ++
.../hadoop/fs/s3a/ITestS3AMiscOperations.java | 27 +
.../hadoop/fs/s3a/ITestS3GuardCreate.java | 61 ++
.../hadoop/fs/s3a/ITestS3GuardEmptyDirs.java | 85 ++
.../fs/s3a/ITestS3GuardListConsistency.java | 544 ++++++++++
.../hadoop/fs/s3a/ITestS3GuardWriteBack.java | 141 +++
.../hadoop/fs/s3a/MockS3ClientFactory.java | 3 +
.../apache/hadoop/fs/s3a/S3ATestConstants.java | 12 +
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 197 +++-
.../org/apache/hadoop/fs/s3a/TestListing.java | 118 ++
.../ITestS3AFileContextStatistics.java | 4 +-
.../s3a/fileContext/ITestS3AFileContextURI.java | 19 +-
.../fs/s3a/s3guard/AbstractMSContract.java | 33 +
.../s3guard/AbstractS3GuardToolTestBase.java | 161 +++
.../s3a/s3guard/DynamoDBLocalClientFactory.java | 157 +++
.../s3a/s3guard/ITestS3GuardConcurrentOps.java | 160 +++
.../s3a/s3guard/ITestS3GuardToolDynamoDB.java | 134 +++
.../fs/s3a/s3guard/ITestS3GuardToolLocal.java | 149 +++
.../fs/s3a/s3guard/MetadataStoreTestBase.java | 887 +++++++++++++++
.../fs/s3a/s3guard/TestDirListingMetadata.java | 303 ++++++
.../s3a/s3guard/TestDynamoDBMetadataStore.java | 594 ++++++++++
.../fs/s3a/s3guard/TestLocalMetadataStore.java | 140 +++
.../fs/s3a/s3guard/TestNullMetadataStore.java | 58 +
.../TestPathMetadataDynamoDBTranslation.java | 238 +++++
.../hadoop/fs/s3a/s3guard/TestS3Guard.java | 93 ++
.../AbstractITestS3AMetadataStoreScale.java | 250 +++++
.../fs/s3a/scale/AbstractSTestS3AHugeFiles.java | 13 +-
.../scale/ITestDynamoDBMetadataStoreScale.java | 48 +
.../s3a/scale/ITestLocalMetadataStoreScale.java | 37 +
.../fs/s3a/scale/ITestS3AConcurrentOps.java | 3 +-
.../fs/s3a/scale/ITestS3ACreatePerformance.java | 86 ++
.../s3a/scale/ITestS3ADirectoryPerformance.java | 5 +-
.../scale/ITestS3AInputStreamPerformance.java | 4 +-
.../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 2 +-
.../hadoop-aws/src/test/resources/core-site.xml | 26 +
.../src/test/resources/log4j.properties | 15 +-
101 files changed, 13065 insertions(+), 538 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
----------------------------------------------------------------------
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
index bc9548b..0a4367d 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
@@ -174,6 +174,19 @@
<directory>../hadoop-sls/target/hadoop-sls-${project.version}/sls</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/sls</outputDirectory>
</fileSet>
+ <fileSet>
+ <directory>../hadoop-aws/src/main/bin</directory>
+ <outputDirectory>/bin</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
+ <fileSet>
+ <directory>../hadoop-aws/src/main/shellprofile.d</directory>
+ <includes>
+ <include>*</include>
+ </includes>
+ <outputDirectory>/libexec/shellprofile.d</outputDirectory>
+ <fileMode>0755</fileMode>
+ </fileSet>
</fileSets>
<dependencySets>
<dependencySet>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index e9222bb..1d188ba 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -172,6 +172,11 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
index 9bea8f9..df14ee8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
@@ -132,6 +133,13 @@ public abstract class AbstractFileSystem {
CONSTRUCTOR_CACHE.put(theClass, meth);
}
result = meth.newInstance(uri, conf);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ throw new RuntimeException(cause);
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
index fef968b..21733b3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java
@@ -331,6 +331,15 @@ public class FileContext {
return AbstractFileSystem.get(uri, conf);
}
});
+ } catch (RuntimeException ex) {
+ // RTEs can wrap other exceptions; if there is an IOException inner,
+ // throw it direct.
+ Throwable cause = ex.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ } else {
+ throw ex;
+ }
} catch (InterruptedException ex) {
LOG.error(ex.toString());
throw new IOException("Failed to get the AbstractFileSystem for path: "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index cb061aa..9e2c553 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1309,12 +1309,120 @@
</property>
<property>
+ <name>fs.s3a.metadatastore.authoritative</name>
+ <value>false</value>
+ <description>
+ When true, allow MetadataStore implementations to act as source of
+ truth for getting file status and directory listings. Even if this
+ is set to true, MetadataStore implementations may choose not to
+ return authoritative results. If the configured MetadataStore does
+ not support being authoritative, this setting will have no effect.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.metadatastore.impl</name>
+ <value>org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore</value>
+ <description>
+ Fully-qualified name of the class that implements the MetadataStore
+ to be used by s3a. The default class, NullMetadataStore, has no
+ effect: s3a will continue to treat the backing S3 service as the one
+ and only source of truth for file and directory metadata.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.cli.prune.age</name>
+ <value>86400000</value>
+ <description>
+ Default age (in milliseconds) after which to prune metadata from the
+ metadatastore when the prune command is run. Can be overridden on the
+ command-line.
+ </description>
+</property>
+
+
+<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
<description>The implementation class of the S3A Filesystem</description>
</property>
<property>
+ <name>fs.s3a.s3guard.ddb.region</name>
+ <value></value>
+ <description>
+ AWS DynamoDB region to connect to. An up-to-date list is
+ provided in the AWS Documentation: regions and endpoints. Without this
+ property, the S3Guard will operate table in the associated S3 bucket region.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.table</name>
+ <value></value>
+ <description>
+ The DynamoDB table name to operate. Without this property, the respective
+ S3 bucket name will be used.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.table.create</name>
+ <value>false</value>
+ <description>
+ If true, the S3A client will create the table if it does not already exist.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.table.capacity.read</name>
+ <value>500</value>
+ <description>
+ Provisioned throughput requirements for read operations in terms of capacity
+ units for the DynamoDB table. This config value will only be used when
+ creating a new DynamoDB table, though later you can manually provision by
+ increasing or decreasing read capacity as needed for existing tables.
+ See DynamoDB documents for more information.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.table.capacity.write</name>
+ <value>100</value>
+ <description>
+ Provisioned throughput requirements for write operations in terms of
+ capacity units for the DynamoDB table. Refer to related config
+ fs.s3a.s3guard.ddb.table.capacity.read before usage.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.max.retries</name>
+ <value>9</value>
+ <description>
+ Max retries on batched DynamoDB operations before giving up and
+ throwing an IOException. Each retry is delayed with an exponential
+ backoff timer which starts at 100 milliseconds and approximately
+ doubles each time. The minimum wait before throwing an exception is
+ sum(100, 200, 400, 800, .. 100*2^N-1 ) == 100 * ((2^N)-1)
+ So N = 9 yields at least 51.1 seconds (51,100) milliseconds of blocking
+ before throwing an IOException.
+ </description>
+</property>
+
+<property>
+ <name>fs.s3a.s3guard.ddb.background.sleep</name>
+ <value>25</value>
+ <description>
+ Length (in milliseconds) of pause between each batch of deletes when
+ pruning metadata. Prevents prune operations (which can typically be low
+ priority background operations) from overly interfering with other I/O
+ operations.
+ </description>
+</property>
+
+<property>
<name>fs.AbstractFileSystem.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3A</value>
<description>The implementation class of the S3A AbstractFileSystem.</description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
index 92e2135..9d8cd64 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
@@ -748,13 +748,27 @@ public abstract class FileSystemContractBaseTest {
/**
* This a sanity check to make sure that any filesystem's handling of
- * renames doesn't cause any regressions
+ * renames empty dirs doesn't cause any regressions.
+ */
+ public void testRenameEmptyToDirWithSamePrefixAllowed() throws Throwable {
+ assumeTrue(renameSupported());
+ Path parentdir = path("testRenameEmptyToDirWithSamePrefixAllowed");
+ fs.mkdirs(parentdir);
+ Path dest = path("testRenameEmptyToDirWithSamePrefixAllowedDest");
+ rename(parentdir, dest, true, false, true);
+ }
+
+ /**
+ * This a sanity check to make sure that any filesystem's handling of
+ * renames non-empty dirs doesn't cause any regressions.
*/
@Test
public void testRenameToDirWithSamePrefixAllowed() throws Throwable {
assumeTrue(renameSupported());
final Path parentdir = path("testRenameToDirWithSamePrefixAllowed");
fs.mkdirs(parentdir);
+ // Before renaming, we create one file under the source parent directory
+ createFile(new Path(parentdir, "mychild"));
final Path dest = path("testRenameToDirWithSamePrefixAllowedDest");
rename(parentdir, dest, true, false, true);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
index b0dcb93..b6d0a49 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRenameTest.java
@@ -222,4 +222,67 @@ public abstract class AbstractContractRenameTest extends
assertPathDoesNotExist("not deleted",
new Path(srcDir, "source.txt"));
}
+
+ /**
+ * Test that after renaming, the nested subdirectory is moved along with all
+ * its ancestors.
+ */
+ @Test
+ public void testRenamePopulatesDirectoryAncestors() throws IOException {
+ final FileSystem fs = getFileSystem();
+ final Path src = path("testRenamePopulatesDirectoryAncestors/source");
+ fs.mkdirs(src);
+ final String nestedDir = "/dir1/dir2/dir3/dir4";
+ fs.mkdirs(path(src + nestedDir));
+
+ Path dst = path("testRenamePopulatesDirectoryAncestorsNew");
+
+ fs.rename(src, dst);
+ validateAncestorsMoved(src, dst, nestedDir);
+ }
+
+ /**
+ * Test that after renaming, the nested file is moved along with all its
+ * ancestors. It is similar to {@link #testRenamePopulatesDirectoryAncestors}.
+ */
+ @Test
+ public void testRenamePopulatesFileAncestors() throws IOException {
+ final FileSystem fs = getFileSystem();
+ final Path src = path("testRenamePopulatesFileAncestors/source");
+ fs.mkdirs(src);
+ final String nestedFile = "/dir1/dir2/dir3/file4";
+ byte[] srcDataset = dataset(256, 'a', 'z');
+ writeDataset(fs, path(src + nestedFile), srcDataset, srcDataset.length,
+ 1024, false);
+
+ Path dst = path("testRenamePopulatesFileAncestorsNew");
+
+ fs.rename(src, dst);
+ validateAncestorsMoved(src, dst, nestedFile);
+ }
+
+ /**
+ * Validate that the nested path and its ancestors should have been moved.
+ *
+ * @param src the source root to move
+ * @param dst the destination root to move
+ * @param nestedPath the nested path to move
+ */
+ private void validateAncestorsMoved(Path src, Path dst, String nestedPath)
+ throws IOException {
+ assertIsDirectory(dst);
+ assertPathDoesNotExist("src path should not exist", path(src + nestedPath));
+ assertPathExists("dst path should exist", path(dst + nestedPath));
+
+ Path path = new Path(nestedPath).getParent();
+ while (path != null && !path.isRoot()) {
+ final Path parentSrc = path(src + path.toString());
+ assertPathDoesNotExist(parentSrc + " is not deleted", parentSrc);
+ final Path parentDst = path(dst + path.toString());
+ assertPathExists(parentDst + " should exist after rename", parentDst);
+ assertIsDirectory(parentDst);
+ path = path.getParent();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
index 1fa5c3f..00cfa44 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -249,6 +249,23 @@ public final class LambdaTestUtils {
}
/**
+ * Variant of {@link #eventually(int, Callable, Callable)} method for
+ * void lambda expressions.
+ * @param timeoutMillis timeout in milliseconds.
+ * Can be zero, in which case only one attempt is made before failing.
+ * @param eval expression to evaluate
+ * @param retry retry interval generator
+ * @throws Exception the last exception thrown before timeout was triggered
+ * @throws FailFastException if raised -without any retry attempt.
+ * @throws InterruptedException if interrupted during the sleep operation.
+ */
+ public static void eventually(int timeoutMillis,
+ VoidCallable eval,
+ Callable<Integer> retry) throws Exception {
+ eventually(timeoutMillis, new VoidCaller(eval), retry);
+ }
+
+ /**
* Simplified {@link #eventually(int, Callable, Callable)} method
* with a fixed interval.
* <p>
@@ -277,6 +294,25 @@ public final class LambdaTestUtils {
}
/**
+ /**
+ * Variant of {@link #eventually(int, int, Callable)} method for
+ * void lambda expressions.
+ * @param timeoutMillis timeout in milliseconds.
+ * Can be zero, in which case only one attempt is made before failing.
+ * @param intervalMillis interval in milliseconds
+ * @param eval expression to evaluate
+ * @throws Exception the last exception thrown before timeout was triggered
+ * @throws FailFastException if raised -without any retry attempt.
+ * @throws InterruptedException if interrupted during the sleep operation.
+ */
+ public static void eventually(int timeoutMillis,
+ int intervalMillis,
+ VoidCallable eval) throws Exception {
+ eventually(timeoutMillis, eval,
+ new FixedRetryInterval(intervalMillis));
+ }
+
+ /**
* Intercept an exception; throw an {@code AssertionError} if one not raised.
* The caught exception is rethrown if it is of the wrong class or
* does not contain the text defined in {@code contained}.
@@ -319,6 +355,32 @@ public final class LambdaTestUtils {
}
/**
+ * Variant of {@link #intercept(Class, Callable)} to simplify void
+ * invocations.
+ * @param clazz class of exception; the raised exception must be this class
+ * <i>or a subclass</i>.
+ * @param eval expression to eval
+ * @param <E> exception class
+ * @return the caught exception if it was of the expected type
+ * @throws Exception any other exception raised
+ * @throws AssertionError if the evaluation call didn't raise an exception.
+ */
+ public static <E extends Throwable> E intercept(
+ Class<E> clazz,
+ VoidCallable eval)
+ throws Exception {
+ try {
+ eval.call();
+ throw new AssertionError("Expected an exception");
+ } catch (Throwable e) {
+ if (clazz.isAssignableFrom(e.getClass())) {
+ return (E)e;
+ }
+ throw e;
+ }
+ }
+
+ /**
* Intercept an exception; throw an {@code AssertionError} if one not raised.
* The caught exception is rethrown if it is of the wrong class or
* does not contain the text defined in {@code contained}.
@@ -359,6 +421,29 @@ public final class LambdaTestUtils {
}
/**
+ * Variant of {@link #intercept(Class, Callable)} to simplify void
+ * invocations.
+ * @param clazz class of exception; the raised exception must be this class
+ * <i>or a subclass</i>.
+ * @param contained string which must be in the {@code toString()} value
+ * of the exception
+ * @param eval expression to eval
+ * @param <E> exception class
+ * @return the caught exception if it was of the expected type
+ * @throws Exception any other exception raised
+ * @throws AssertionError if the evaluation call didn't raise an exception.
+ */
+ public static <E extends Throwable> E intercept(
+ Class<E> clazz,
+ String contained,
+ VoidCallable eval)
+ throws Exception {
+ E ex = intercept(clazz, eval);
+ GenericTestUtils.assertExceptionContains(contained, ex);
+ return ex;
+ }
+
+ /**
* Robust string converter for exception messages; if the {@code toString()}
* method throws an exception then that exception is caught and logged,
* then a simple string of the classname logged.
@@ -518,4 +603,31 @@ public final class LambdaTestUtils {
return new FailFastException(String.format(format, args));
}
}
+
+ /**
+ * A simple interface for lambdas, which returns nothing; this exists
+ * to simplify lambda tests on operations with no return value.
+ */
+ public interface VoidCallable {
+ void call() throws Exception;
+ }
+
+ /**
+ * Bridge class to make {@link VoidCallable} something to use in anything
+ * which takes an {@link Callable}.
+ */
+ public static class VoidCaller implements Callable<Void> {
+ private final VoidCallable callback;
+
+ public VoidCaller(VoidCallable callback) {
+ this.callback = callback;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ callback.call();
+ return null;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 938ef05..9a52d76 100755
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -870,6 +870,17 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@@ -1734,4 +1745,12 @@
</build>
</profile>
</profiles>
+
+ <repositories>
+ <repository>
+ <id>dynamodb-local-oregon</id>
+ <name>DynamoDB Local Release Repository</name>
+ <url>https://s3-us-west-2.amazonaws.com/dynamodb-local/release</url>
+ </repository>
+ </repositories>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index ffb0a79..82ec16e 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -26,4 +26,10 @@
<Match>
<Class name="org.apache.hadoop.fs.s3.INode" />
</Match>
+ <!-- Redundant null check makes code clearer, future-proof here. -->
+ <Match>
+ <Class name="org.apache.hadoop.fs.s3a.S3AFileSystem" />
+ <Method name="s3Exists" />
+ <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 91e94a6..bcb0e07 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -36,6 +36,7 @@
<downloadSources>true</downloadSources>
<hadoop.tmp.dir>${project.build.directory}/test</hadoop.tmp.dir>
+ <dynamodb.local.version>1.11.86</dynamodb.local.version>
<!-- are scale tests enabled ? -->
<fs.s3a.scale.test.enabled>unset</fs.s3a.scale.test.enabled>
<!-- Size in MB of huge files. -->
@@ -44,6 +45,11 @@
<fs.s3a.scale.test.huge.partitionsize>unset</fs.s3a.scale.test.huge.partitionsize>
<!-- Timeout in seconds for scale tests.-->
<fs.s3a.scale.test.timeout>3600</fs.s3a.scale.test.timeout>
+ <!-- are scale tests enabled ? -->
+ <fs.s3a.s3guard.test.enabled>false</fs.s3a.s3guard.test.enabled>
+ <fs.s3a.s3guard.test.authoritative>false</fs.s3a.s3guard.test.authoritative>
+ <fs.s3a.s3guard.test.implementation>local</fs.s3a.s3guard.test.implementation>
+
</properties>
<profiles>
@@ -164,6 +170,11 @@
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+ <!-- S3Guard -->
+ <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled>
+ <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative>
+ <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation>
+
</systemPropertyVariables>
<!-- Some tests cannot run in parallel. Tests that cover -->
<!-- access to the root directory must run in isolation -->
@@ -205,6 +216,10 @@
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.huge.huge.partitionsize>${fs.s3a.scale.test.huge.partitionsize}</fs.s3a.scale.test.huge.huge.partitionsize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+ <!-- S3Guard -->
+ <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled>
+ <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation>
+ <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative>
</systemPropertyVariables>
<!-- Do a sequential run for tests that cannot handle -->
<!-- parallel execution. -->
@@ -247,6 +262,10 @@
<fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
<fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
<fs.s3a.scale.test.timeout>${fs.s3a.scale.test.timeout}</fs.s3a.scale.test.timeout>
+ <!-- S3Guard -->
+ <fs.s3a.s3guard.test.enabled>${fs.s3a.s3guard.test.enabled}</fs.s3a.s3guard.test.enabled>
+ <fs.s3a.s3guard.test.implementation>${fs.s3a.s3guard.test.implementation}</fs.s3a.s3guard.test.implementation>
+ <fs.s3a.s3guard.test.authoritative>${fs.s3a.s3guard.test.authoritative}</fs.s3a.s3guard.test.authoritative>
</systemPropertyVariables>
<forkedProcessTimeoutInSeconds>${fs.s3a.scale.test.timeout}</forkedProcessTimeoutInSeconds>
</configuration>
@@ -269,6 +288,60 @@
<fs.s3a.scale.test.enabled>true</fs.s3a.scale.test.enabled>
</properties>
</profile>
+
+ <!-- Turn on S3Guard tests-->
+ <profile>
+ <id>s3guard</id>
+ <activation>
+ <property>
+ <name>s3guard</name>
+ </property>
+ </activation>
+ <properties >
+ <fs.s3a.s3guard.test.enabled>true</fs.s3a.s3guard.test.enabled>
+ </properties>
+ </profile>
+
+ <!-- Switch to DynamoDB for S3Guard. Has no effect unless S3Guard is enabled -->
+ <profile>
+ <id>dynamo</id>
+ <activation>
+ <property>
+ <name>dynamo</name>
+ </property>
+ </activation>
+ <properties >
+ <fs.s3a.s3guard.test.implementation>dynamo</fs.s3a.s3guard.test.implementation>
+ </properties>
+ </profile>
+
+ <!-- Switch to DynamoDBLocal for S3Guard. Has no effect unless S3Guard is enabled -->
+ <profile>
+ <id>dynamodblocal</id>
+ <activation>
+ <property>
+ <name>dynamodblocal</name>
+ </property>
+ </activation>
+ <properties>
+ <fs.s3a.s3guard.test.implementation>dynamodblocal</fs.s3a.s3guard.test.implementation>
+ </properties>
+ </profile>
+
+ <!-- Switch S3Guard from Authoritative=false to true
+ Has no effect unless S3Guard is enabled -->
+ <profile>
+ <id>non-auth</id>
+ <activation>
+ <property>
+ <name>auth</name>
+ </property>
+ </activation>
+ <properties >
+ <fs.s3a.s3guard.test.authoritative>true</fs.s3a.s3guard.test.authoritative>
+ </properties>
+ </profile>
+
</profiles>
<build>
@@ -296,16 +369,48 @@
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
- <id>deplist</id>
+ <id>deplist1</id>
<phase>compile</phase>
<goals>
<goal>list</goal>
</goals>
<configuration>
- <!-- build a shellprofile -->
+ <!-- build a shellprofile for hadoop-aws optional tools -->
<outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
</configuration>
</execution>
+ <execution>
+ <id>copy</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <includeScope>test</includeScope>
+ <includeTypes>so,dll,dylib</includeTypes>
+ <outputDirectory>${project.build.directory}/native-libs</outputDirectory>
+ </configuration>
+ </execution>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>copy-dependencies</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ </configuration>
+ </execution>
+ <execution>
+ <id>deplist2</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>list</goal>
+ </goals>
+ <configuration>
+ <!-- referenced by the s3guard command -->
+ <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-builtin.txt</outputFile>
+ </configuration>
+ </execution>
</executions>
</plugin>
</plugins>
@@ -334,6 +439,26 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>DynamoDBLocal</artifactId>
+ <version>${dynamodb.local.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 9e15b3f..1a464d0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -267,6 +267,11 @@ public final class Constants {
public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
+ /** Whether or not to allow MetadataStore to be source of truth. */
+ public static final String METADATASTORE_AUTHORITATIVE =
+ "fs.s3a.metadatastore.authoritative";
+ public static final boolean DEFAULT_METADATASTORE_AUTHORITATIVE = false;
+
/** read ahead buffer size to prevent connection re-establishments. */
public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
@@ -312,7 +317,7 @@ public final class Constants {
@InterfaceStability.Unstable
public static final Class<? extends S3ClientFactory>
DEFAULT_S3_CLIENT_FACTORY_IMPL =
- S3ClientFactory.DefaultS3ClientFactory.class;
+ DefaultS3ClientFactory.class;
/**
* Maximum number of partitions in a multipart upload: {@value}.
@@ -320,4 +325,130 @@ public final class Constants {
@InterfaceAudience.Private
public static final int MAX_MULTIPART_COUNT = 10000;
+ /**
+ * Classname of the S3A-specific output committer factory. This
+ * is what must be declared when attempting to use
+ */
+ @InterfaceStability.Unstable
+ public static final String S3A_OUTPUT_COMMITTER_FACTORY =
+ "org.apache.hadoop.fs.s3a.commit.S3AOutputCommitterFactory";
+
+ /* Constants. */
+ public static final String S3_METADATA_STORE_IMPL =
+ "fs.s3a.metadatastore.impl";
+
+ /** Minimum period of time (in milliseconds) to keep metadata (may only be
+ * applied when a prune command is manually run).
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_CLI_PRUNE_AGE =
+ "fs.s3a.s3guard.cli.prune.age";
+
+ /**
+ * The region of the DynamoDB service.
+ *
+ * This config has no default value. If the user does not set this, the
+ * S3Guard will operate table in the associated S3 bucket region.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_DDB_REGION_KEY =
+ "fs.s3a.s3guard.ddb.region";
+
+ /**
+ * The DynamoDB table name to use.
+ *
+ * This config has no default value. If the user does not set this, the
+ * S3Guard implementation will use the respective S3 bucket name.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_DDB_TABLE_NAME_KEY =
+ "fs.s3a.s3guard.ddb.table";
+
+ /**
+ * Whether to create the DynamoDB table if the table does not exist.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_DDB_TABLE_CREATE_KEY =
+ "fs.s3a.s3guard.ddb.table.create";
+
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_DDB_TABLE_CAPACITY_READ_KEY =
+ "fs.s3a.s3guard.ddb.table.capacity.read";
+ public static final long S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT = 500;
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY =
+ "fs.s3a.s3guard.ddb.table.capacity.write";
+ public static final long S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT = 100;
+
+ /**
+ * The maximum put or delete requests per BatchWriteItem request.
+ *
+ * Refer to Amazon API reference for this limit.
+ */
+ public static final int S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT = 25;
+
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_DDB_MAX_RETRIES =
+ "fs.s3a.s3guard.ddb.max.retries";
+ /**
+ * Max retries on batched DynamoDB operations before giving up and
+ * throwing an IOException. Default is {@value}. See core-default.xml for
+ * more detail.
+ */
+ public static final int S3GUARD_DDB_MAX_RETRIES_DEFAULT = 9;
+
+ /**
+ * Period of time (in milliseconds) to sleep between batches of writes.
+ * Currently only applies to prune operations, as they are naturally a
+ * lower priority than other operations.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY =
+ "fs.s3a.s3guard.ddb.background.sleep";
+ public static final int S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT = 25;
+
+ /**
+ * V1 committer.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3A_OUTPUT_COMMITTER_MRV1 =
+ "org.apache.hadoop.fs.s3a.commit.S3OutputCommitterMRv1";
+
+ /**
+ * The default "Null" metadata store: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_METASTORE_NULL
+ = "org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore";
+
+ /**
+ * Use Local memory for the metadata: {@value}.
+ * This is not coherent across processes and must be used for testing only.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_METASTORE_LOCAL
+ = "org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore";
+
+ /**
+ * Use DynamoDB for the metadata: {@value}.
+ */
+ @InterfaceStability.Unstable
+ public static final String S3GUARD_METASTORE_DYNAMO
+ = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
+
+ /**
+ * Inconsistency (visibility delay) injection settings.
+ */
+ @InterfaceStability.Unstable
+ public static final String FAIL_INJECT_INCONSISTENCY_KEY =
+ "fs.s3a.failinject.inconsistency.key.substring";
+
+ @InterfaceStability.Unstable
+ public static final String FAIL_INJECT_INCONSISTENCY_MSEC =
+ "fs.s3a.failinject.inconsistency.msec";
+
+ @InterfaceStability.Unstable
+ public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY =
+ "fs.s3a.failinject.inconsistency.probability";
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
new file mode 100644
index 0000000..f33b25e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
+
+/**
+ * The default factory implementation, which calls the AWS SDK to configure
+ * and create an {@link AmazonS3Client} that communicates with the S3 service.
+ */
+public class DefaultS3ClientFactory extends Configured implements
+ S3ClientFactory {
+
+ protected static final Logger LOG = S3AFileSystem.LOG;
+
+ @Override
+ public AmazonS3 createS3Client(URI name) throws IOException {
+ Configuration conf = getConf();
+ AWSCredentialsProvider credentials =
+ createAWSCredentialProviderSet(name, conf);
+ final ClientConfiguration awsConf = createAwsConf(getConf());
+ AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
+ return createAmazonS3Client(s3, conf, credentials, awsConf);
+ }
+
+ /**
+ * Create a new {@link ClientConfiguration}.
+ * @param conf The Hadoop configuration
+ * @return new AWS client configuration
+ */
+ public static ClientConfiguration createAwsConf(Configuration conf) {
+ final ClientConfiguration awsConf = new ClientConfiguration();
+ initConnectionSettings(conf, awsConf);
+ initProxySupport(conf, awsConf);
+ initUserAgent(conf, awsConf);
+ return awsConf;
+ }
+
+ /**
+ * Wrapper around constructor for {@link AmazonS3} client. Override this to
+ * provide an extended version of the client
+ * @param credentials credentials to use
+ * @param awsConf AWS configuration
+ * @return new AmazonS3 client
+ */
+ protected AmazonS3 newAmazonS3Client(
+ AWSCredentialsProvider credentials, ClientConfiguration awsConf) {
+ return new AmazonS3Client(credentials, awsConf);
+ }
+
+ /**
+ * Initializes all AWS SDK settings related to connection management.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ */
+ private static void initConnectionSettings(Configuration conf,
+ ClientConfiguration awsConf) {
+ awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+ DEFAULT_MAXIMUM_CONNECTIONS, 1));
+ boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+ DEFAULT_SECURE_CONNECTIONS);
+ awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+ awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+ DEFAULT_MAX_ERROR_RETRIES, 0));
+ awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+ DEFAULT_ESTABLISH_TIMEOUT, 0));
+ awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+ DEFAULT_SOCKET_TIMEOUT, 0));
+ int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+ DEFAULT_SOCKET_SEND_BUFFER, 2048);
+ int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+ DEFAULT_SOCKET_RECV_BUFFER, 2048);
+ awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+ String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+ if (!signerOverride.isEmpty()) {
+ LOG.debug("Signer override = {}", signerOverride);
+ awsConf.setSignerOverride(signerOverride);
+ }
+ }
+
+ /**
+ * Initializes AWS SDK proxy support if configured.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ * @throws IllegalArgumentException if misconfigured
+ */
+ private static void initProxySupport(Configuration conf,
+ ClientConfiguration awsConf) throws IllegalArgumentException {
+ String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+ int proxyPort = conf.getInt(PROXY_PORT, -1);
+ if (!proxyHost.isEmpty()) {
+ awsConf.setProxyHost(proxyHost);
+ if (proxyPort >= 0) {
+ awsConf.setProxyPort(proxyPort);
+ } else {
+ if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+ LOG.warn("Proxy host set without port. Using HTTPS default 443");
+ awsConf.setProxyPort(443);
+ } else {
+ LOG.warn("Proxy host set without port. Using HTTP default 80");
+ awsConf.setProxyPort(80);
+ }
+ }
+ String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+ String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+ if ((proxyUsername == null) != (proxyPassword == null)) {
+ String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+ PROXY_PASSWORD + " set without the other.";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ awsConf.setProxyUsername(proxyUsername);
+ awsConf.setProxyPassword(proxyPassword);
+ awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+ awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+ "domain {} as workstation {}", awsConf.getProxyHost(),
+ awsConf.getProxyPort(),
+ String.valueOf(awsConf.getProxyUsername()),
+ awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+ awsConf.getProxyWorkstation());
+ }
+ } else if (proxyPort >= 0) {
+ String msg =
+ "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ /**
+ * Initializes the User-Agent header to send in HTTP requests to the S3
+ * back-end. We always include the Hadoop version number. The user also
+ * may set an optional custom prefix to put in front of the Hadoop version
+ * number. The AWS SDK interally appends its own information, which seems
+ * to include the AWS SDK version, OS and JVM version.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ */
+ private static void initUserAgent(Configuration conf,
+ ClientConfiguration awsConf) {
+ String userAgent = "Hadoop " + VersionInfo.getVersion();
+ String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+ if (!userAgentPrefix.isEmpty()) {
+ userAgent = userAgentPrefix + ", " + userAgent;
+ }
+ LOG.debug("Using User-Agent: {}", userAgent);
+ awsConf.setUserAgentPrefix(userAgent);
+ }
+
+ /**
+ * Creates an {@link AmazonS3Client} from the established configuration.
+ *
+ * @param conf Hadoop configuration
+ * @param credentials AWS credentials
+ * @param awsConf AWS SDK configuration
+ * @return S3 client
+ * @throws IllegalArgumentException if misconfigured
+ */
+ private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
+ AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+ throws IllegalArgumentException {
+ String endPoint = conf.getTrimmed(ENDPOINT, "");
+ if (!endPoint.isEmpty()) {
+ try {
+ s3.setEndpoint(endPoint);
+ } catch (IllegalArgumentException e) {
+ String msg = "Incorrect endpoint: " + e.getMessage();
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg, e);
+ }
+ }
+ enablePathStyleAccessIfRequired(s3, conf);
+ return s3;
+ }
+
+ /**
+ * Enables path-style access to S3 buckets if configured. By default, the
+ * behavior is to use virtual hosted-style access with URIs of the form
+ * http://bucketname.s3.amazonaws.com. Enabling path-style access and a
+ * region-specific endpoint switches the behavior to use URIs of the form
+ * http://s3-eu-west-1.amazonaws.com/bucketname.
+ *
+ * @param s3 S3 client
+ * @param conf Hadoop configuration
+ */
+ private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+ Configuration conf) {
+ final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
+ if (pathStyleAccess) {
+ LOG.debug("Enabling path style access!");
+ s3.setS3ClientOptions(S3ClientOptions.builder()
+ .setPathStyleAccess(true)
+ .build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
new file mode 100644
index 0000000..5e9cb3f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects
+ * inconsistency and/or errors. Used for testing S3Guard.
+ * Currently only delays listing visibility, not affecting GET.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InconsistentAmazonS3Client extends AmazonS3Client {
+
+ /**
+ * Keys containing this substring will be subject to delayed visibility.
+ */
+ public static final String DEFAULT_DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
+
+ /**
+ * How many seconds affected keys will be delayed from appearing in listing.
+ * This should probably be a config value.
+ */
+ public static final long DEFAULT_DELAY_KEY_MSEC = 5 * 1000;
+
+ public static final float DEFAULT_DELAY_KEY_PROBABILITY = 1.0f;
+
+ /** Special config value since we can't store empty strings in XML. */
+ public static final String MATCH_ALL_KEYS = "*";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
+
+ /** Empty string matches all keys. */
+ private String delayKeySubstring;
+
+ /** Probability to delay visibility of a matching key. */
+ private float delayKeyProbability;
+
+ /** Time in milliseconds to delay visibility of newly modified object. */
+ private long delayKeyMsec;
+
+ /**
+ * Composite of data we need to track about recently deleted objects:
+ * when it was deleted (same was with recently put objects) and the object
+ * summary (since we should keep returning it for sometime after its
+ * deletion).
+ */
+ private static class Delete {
+ private Long time;
+ private S3ObjectSummary summary;
+
+ Delete(Long time, S3ObjectSummary summary) {
+ this.time = time;
+ this.summary = summary;
+ }
+
+ public Long time() {
+ return time;
+ }
+
+ public S3ObjectSummary summary() {
+ return summary;
+ }
+ }
+
+ /** Map of key to delay -> time it was deleted + object summary (object
+ * summary is null for prefixes. */
+ private Map<String, Delete> delayedDeletes = new HashMap<>();
+
+ /** Map of key to delay -> time it was created. */
+ private Map<String, Long> delayedPutKeys = new HashMap<>();
+
+ public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
+ ClientConfiguration clientConfiguration, Configuration conf) {
+ super(credentials, clientConfiguration);
+ setupConfig(conf);
+ }
+
+ protected void setupConfig(Configuration conf) {
+
+ delayKeySubstring = conf.get(FAIL_INJECT_INCONSISTENCY_KEY,
+ DEFAULT_DELAY_KEY_SUBSTRING);
+ // "" is a substring of all strings, use it to match all keys.
+ if (delayKeySubstring.equals(MATCH_ALL_KEYS)) {
+ delayKeySubstring = "";
+ }
+ delayKeyProbability = conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
+ DEFAULT_DELAY_KEY_PROBABILITY);
+ delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC,
+ DEFAULT_DELAY_KEY_MSEC);
+ LOG.info("Enabled with {} msec delay, substring {}, probability {}",
+ delayKeyMsec, delayKeySubstring, delayKeyProbability);
+ }
+
+ /**
+ * Clear all oustanding inconsistent keys. After calling this function,
+ * listings should behave normally (no failure injection), until additional
+ * keys are matched for delay, e.g. via putObject(), deleteObject().
+ */
+ public void clearInconsistency() {
+ LOG.info("clearing all delayed puts / deletes");
+ delayedDeletes.clear();
+ delayedPutKeys.clear();
+ }
+
+ /**
+ * Convenience function for test code to cast from supertype.
+ * @param c supertype to cast from
+ * @return subtype, not null
+ * @throws Exception on error
+ */
+ public static InconsistentAmazonS3Client castFrom(AmazonS3 c) throws
+ Exception {
+ InconsistentAmazonS3Client ic = null;
+ if (c instanceof InconsistentAmazonS3Client) {
+ ic = (InconsistentAmazonS3Client) c;
+ }
+ Preconditions.checkNotNull(ic, "Not an instance of " +
+ "InconsistentAmazonS3Client");
+ return ic;
+ }
+
+ @Override
+ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest
+ deleteObjectsRequest)
+ throws AmazonClientException, AmazonServiceException {
+ for (DeleteObjectsRequest.KeyVersion keyVersion :
+ deleteObjectsRequest.getKeys()) {
+ registerDeleteObject(keyVersion.getKey(), deleteObjectsRequest
+ .getBucketName());
+ }
+ return super.deleteObjects(deleteObjectsRequest);
+ }
+
+ @Override
+ public void deleteObject(DeleteObjectRequest deleteObjectRequest)
+ throws AmazonClientException, AmazonServiceException {
+ String key = deleteObjectRequest.getKey();
+ LOG.debug("key {}", key);
+ registerDeleteObject(key, deleteObjectRequest.getBucketName());
+ super.deleteObject(deleteObjectRequest);
+ }
+
+ /* We should only need to override this version of putObject() */
+ @Override
+ public PutObjectResult putObject(PutObjectRequest putObjectRequest)
+ throws AmazonClientException, AmazonServiceException {
+ LOG.debug("key {}", putObjectRequest.getKey());
+ registerPutObject(putObjectRequest);
+ return super.putObject(putObjectRequest);
+ }
+
+ /* We should only need to override this version of listObjects() */
+ @Override
+ public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
+ throws AmazonClientException, AmazonServiceException {
+ LOG.debug("prefix {}", listObjectsRequest.getPrefix());
+ ObjectListing listing = super.listObjects(listObjectsRequest);
+ listing = filterListObjects(listObjectsRequest, listing);
+ listing = restoreListObjects(listObjectsRequest, listing);
+ return listing;
+ }
+
+ private void addSummaryIfNotPresent(List<S3ObjectSummary> list,
+ S3ObjectSummary item) {
+ // Behavior of S3ObjectSummary
+ String key = item.getKey();
+ for (S3ObjectSummary member : list) {
+ if (member.getKey().equals(key)) {
+ return;
+ }
+ }
+ list.add(item);
+ }
+
+ /**
+ * Add prefix of child to given list. The added prefix will be equal to
+ * ancestor plus one directory past ancestor. e.g.:
+ * if ancestor is "/a/b/c" and child is "/a/b/c/d/e/file" then "a/b/c/d" is
+ * added to list.
+ * @param prefixes list to add to
+ * @param ancestor path we are listing in
+ * @param child full path to get prefix from
+ */
+ private void addPrefixIfNotPresent(List<String> prefixes, String ancestor,
+ String child) {
+ Path prefixCandidate = new Path(child).getParent();
+ Path ancestorPath = new Path(ancestor);
+ Preconditions.checkArgument(child.startsWith(ancestor), "%s does not " +
+ "start with %s", child, ancestor);
+ while (!prefixCandidate.isRoot()) {
+ Path nextParent = prefixCandidate.getParent();
+ if (nextParent.equals(ancestorPath)) {
+ String prefix = prefixCandidate.toString();
+ if (!prefixes.contains(prefix)) {
+ prefixes.add(prefix);
+ }
+ return;
+ }
+ prefixCandidate = nextParent;
+ }
+ }
+
+ /**
+ * Checks that the parent key is an ancestor of the child key.
+ * @param parent key that may be the parent.
+ * @param child key that may be the child.
+ * @param recursive if false, only return true for direct children. If
+ * true, any descendant will count.
+ * @return true if parent is an ancestor of child
+ */
+ private boolean isDescendant(String parent, String child, boolean recursive) {
+ if (recursive) {
+ if (!parent.endsWith("/")) {
+ parent = parent + "/";
+ }
+ return child.startsWith(parent);
+ } else {
+ Path actualParentPath = new Path(child).getParent();
+ Path expectedParentPath = new Path(parent);
+ return actualParentPath.equals(expectedParentPath);
+ }
+ }
+
+ /**
+ * Simulate eventual consistency of delete for this list operation: Any
+ * recently-deleted keys will be added.
+ * @param request List request
+ * @param rawListing listing returned from underlying S3
+ * @return listing with recently-deleted items restored
+ */
+ private ObjectListing restoreListObjects(ListObjectsRequest request,
+ ObjectListing rawListing) {
+ List<S3ObjectSummary> outputList = rawListing.getObjectSummaries();
+ List<String> outputPrefixes = rawListing.getCommonPrefixes();
+ // recursive list has no delimiter, returns everything that matches a
+ // prefix.
+ boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
+
+ // Go through all deleted keys
+ for (String key : new HashSet<>(delayedDeletes.keySet())) {
+ Delete delete = delayedDeletes.get(key);
+ if (isKeyDelayed(delete.time(), key)) {
+ if (isDescendant(request.getPrefix(), key, recursiveObjectList)) {
+ if (delete.summary() != null) {
+ addSummaryIfNotPresent(outputList, delete.summary());
+ }
+ }
+ // Non-recursive list has delimiter: will return rolled-up prefixes for
+ // all keys that are not direct children
+ if (!recursiveObjectList) {
+ if (isDescendant(request.getPrefix(), key, true)) {
+ addPrefixIfNotPresent(outputPrefixes, request.getPrefix(), key);
+ }
+ }
+ } else {
+ // Clean up any expired entries
+ delayedDeletes.remove(key);
+ }
+ }
+
+ return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+ }
+
+ private ObjectListing filterListObjects(ListObjectsRequest request,
+ ObjectListing rawListing) {
+
+ // Filter object listing
+ List<S3ObjectSummary> outputList = new ArrayList<>();
+ for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
+ String key = s.getKey();
+ if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
+ outputList.add(s);
+ }
+ }
+
+ // Filter prefixes (directories)
+ List<String> outputPrefixes = new ArrayList<>();
+ for (String key : rawListing.getCommonPrefixes()) {
+ if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
+ outputPrefixes.add(key);
+ }
+ }
+
+ return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+ }
+
+ private boolean isKeyDelayed(Long enqueueTime, String key) {
+ if (enqueueTime == null) {
+ LOG.debug("no delay for key {}", key);
+ return false;
+ }
+ long currentTime = System.currentTimeMillis();
+ long deadline = enqueueTime + delayKeyMsec;
+ if (currentTime >= deadline) {
+ delayedDeletes.remove(key);
+ LOG.debug("no longer delaying {}", key);
+ return false;
+ } else {
+ LOG.info("delaying {}", key);
+ return true;
+ }
+ }
+
+ private void registerDeleteObject(String key, String bucket) {
+ if (shouldDelay(key)) {
+ // Record summary so we can add it back for some time post-deletion
+ S3ObjectSummary summary = null;
+ ObjectListing list = listObjects(bucket, key);
+ for (S3ObjectSummary result : list.getObjectSummaries()) {
+ if (result.getKey().equals(key)) {
+ summary = result;
+ break;
+ }
+ }
+ delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary));
+ }
+ }
+
+ private void registerPutObject(PutObjectRequest req) {
+ String key = req.getKey();
+ if (shouldDelay(key)) {
+ enqueueDelayedPut(key);
+ }
+ }
+
+ /**
+ * Should we delay listing visibility for this key?
+ * @param key key which is being put
+ * @return true if we should delay
+ */
+ private boolean shouldDelay(String key) {
+ boolean delay = key.contains(delayKeySubstring);
+ delay = delay && trueWithProbability(delayKeyProbability);
+ LOG.debug("{} -> {}", key, delay);
+ return delay;
+ }
+
+
+ private boolean trueWithProbability(float p) {
+ return Math.random() < p;
+ }
+
+ /**
+ * Record this key as something that should not become visible in
+ * listObject replies for a while, to simulate eventual list consistency.
+ * @param key key to delay visibility of
+ */
+ private void enqueueDelayedPut(String key) {
+ LOG.debug("delaying put of {}", key);
+ delayedPutKeys.put(key, System.currentTimeMillis());
+ }
+
+ /** Since ObjectListing is immutable, we just override it with wrapper. */
+ private static class CustomObjectListing extends ObjectListing {
+
+ private final List<S3ObjectSummary> customListing;
+ private final List<String> customPrefixes;
+
+ CustomObjectListing(ObjectListing rawListing,
+ List<S3ObjectSummary> customListing,
+ List<String> customPrefixes) {
+ super();
+ this.customListing = customListing;
+ this.customPrefixes = customPrefixes;
+
+ this.setBucketName(rawListing.getBucketName());
+ this.setCommonPrefixes(rawListing.getCommonPrefixes());
+ this.setDelimiter(rawListing.getDelimiter());
+ this.setEncodingType(rawListing.getEncodingType());
+ this.setMarker(rawListing.getMarker());
+ this.setMaxKeys(rawListing.getMaxKeys());
+ this.setNextMarker(rawListing.getNextMarker());
+ this.setPrefix(rawListing.getPrefix());
+ this.setTruncated(rawListing.isTruncated());
+ }
+
+ @Override
+ public List<S3ObjectSummary> getObjectSummaries() {
+ return customListing;
+ }
+
+ @Override
+ public List<String> getCommonPrefixes() {
+ return customPrefixes;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
new file mode 100644
index 0000000..17d268b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * S3 Client factory used for testing with eventual consistency fault injection.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
+
+ @Override
+ protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
+ ClientConfiguration awsConf) {
+ LOG.warn("** FAILURE INJECTION ENABLED. Do not run in production! **");
+ return new InconsistentAmazonS3Client(credentials, awsConf, getConf());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 30d8e6f..8efa218 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -22,18 +22,25 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
+import java.util.Set;
import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
@@ -54,6 +61,43 @@ public class Listing {
}
/**
+ * Create a FileStatus iterator against a provided list of file status, with
+ * a given status filter.
+ *
+ * @param fileStatuses the provided list of file status. NO remote calls.
+ * @param filter file path filter on which paths to accept
+ * @param acceptor the file status acceptor
+ * @return the file status iterator
+ */
+ ProvidedFileStatusIterator createProvidedFileStatusIterator(
+ FileStatus[] fileStatuses,
+ PathFilter filter,
+ FileStatusAcceptor acceptor) {
+ return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
+ }
+
+ /**
+ * Create a FileStatus iterator against a path, with a given list object
+ * request.
+ *
+ * @param listPath path of the listing
+ * @param request initial request to make
+ * @param filter the filter on which paths to accept
+ * @param acceptor the class/predicate to decide which entries to accept
+ * in the listing based on the full file status.
+ * @return the iterator
+ * @throws IOException IO Problems
+ */
+ FileStatusListingIterator createFileStatusListingIterator(
+ Path listPath,
+ ListObjectsRequest request,
+ PathFilter filter,
+ Listing.FileStatusAcceptor acceptor) throws IOException {
+ return createFileStatusListingIterator(listPath, request, filter, acceptor,
+ null);
+ }
+
+ /**
* Create a FileStatus iterator against a path, with a given
* list object request.
* @param listPath path of the listing
@@ -61,6 +105,8 @@ public class Listing {
* @param filter the filter on which paths to accept
* @param acceptor the class/predicate to decide which entries to accept
* in the listing based on the full file status.
+ * @param providedStatus the provided list of file status, which may contain
+ * items that are not listed from source.
* @return the iterator
* @throws IOException IO Problems
*/
@@ -68,11 +114,13 @@ public class Listing {
Path listPath,
ListObjectsRequest request,
PathFilter filter,
- Listing.FileStatusAcceptor acceptor) throws IOException {
+ Listing.FileStatusAcceptor acceptor,
+ RemoteIterator<FileStatus> providedStatus) throws IOException {
return new FileStatusListingIterator(
new ObjectListingIterator(listPath, request),
filter,
- acceptor);
+ acceptor,
+ providedStatus);
}
/**
@@ -80,12 +128,27 @@ public class Listing {
* @param statusIterator an iterator over the remote status entries
* @return a new remote iterator
*/
+ @VisibleForTesting
LocatedFileStatusIterator createLocatedFileStatusIterator(
RemoteIterator<FileStatus> statusIterator) {
return new LocatedFileStatusIterator(statusIterator);
}
/**
+ * Create an located status iterator that wraps another to filter out a set
+ * of recently deleted items.
+ * @param iterator an iterator over the remote located status entries.
+ * @param tombstones set of paths that are recently deleted and should be
+ * filtered.
+ * @return a new remote iterator.
+ */
+ @VisibleForTesting
+ TombstoneReconcilingIterator createTombstoneReconcilingIterator(
+ RemoteIterator<LocatedFileStatus> iterator, Set<Path> tombstones) {
+ return new TombstoneReconcilingIterator(iterator, tombstones);
+ }
+
+ /**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
*/
@@ -108,6 +171,13 @@ public class Listing {
* should be generated.)
*/
boolean accept(Path keyPath, String commonPrefix);
+
+ /**
+ * Predicate to decide whether or not to accept a file status.
+ * @param status file status containing file path information
+ * @return true if the status is accepted else false
+ */
+ boolean accept(FileStatus status);
}
/**
@@ -115,9 +185,9 @@ public class Listing {
* value.
*
* If the status value is null, the iterator declares that it has no data.
- * This iterator is used to handle {@link listStatus()} calls where the path
- * handed in refers to a file, not a directory: this is the iterator
- * returned.
+ * This iterator is used to handle {@link S3AFileSystem#listStatus} calls
+ * where the path handed in refers to a file, not a directory: this is the
+ * iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<LocatedFileStatus> {
@@ -169,6 +239,47 @@ public class Listing {
}
/**
+ * This wraps up a provided non-null list of file status as a remote iterator.
+ *
+ * It firstly filters the provided list and later {@link #next} call will get
+ * from the filtered list. This suffers from scalability issues if the
+ * provided list is too large.
+ *
+ * There is no remote data to fetch.
+ */
+ static class ProvidedFileStatusIterator
+ implements RemoteIterator<FileStatus> {
+ private final ArrayList<FileStatus> filteredStatusList;
+ private int index = 0;
+
+ ProvidedFileStatusIterator(FileStatus[] fileStatuses, PathFilter filter,
+ FileStatusAcceptor acceptor) {
+ Preconditions.checkArgument(fileStatuses != null, "Null status list!");
+
+ filteredStatusList = new ArrayList<>(fileStatuses.length);
+ for (FileStatus status : fileStatuses) {
+ if (filter.accept(status.getPath()) && acceptor.accept(status)) {
+ filteredStatusList.add(status);
+ }
+ }
+ filteredStatusList.trimToSize();
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return index < filteredStatusList.size();
+ }
+
+ @Override
+ public FileStatus next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return filteredStatusList.get(index++);
+ }
+ }
+
+ /**
* Wraps up object listing into a remote iterator which will ask for more
* listing data if needed.
*
@@ -179,7 +290,7 @@ public class Listing {
* iterator can declare that there is more data available.
*
* The need to filter the results precludes the iterator from simply
- * declaring that if the {@link S3AFileSystem.ObjectListingIterator#hasNext()}
+ * declaring that if the {@link ObjectListingIterator#hasNext()}
* is true then there are more results. Instead the next batch of results must
* be retrieved and filtered.
*
@@ -208,20 +319,33 @@ public class Listing {
/** Iterator over the current set of results. */
private ListIterator<FileStatus> statusBatchIterator;
+ private final Set<FileStatus> providedStatus;
+ private Iterator<FileStatus> providedStatusIterator;
+
/**
* Create an iterator over file status entries.
* @param source the listing iterator from a listObjects call.
* @param filter the filter on which paths to accept
* @param acceptor the class/predicate to decide which entries to accept
* in the listing based on the full file status.
+ * @param providedStatus the provided list of file status, which may contain
+ * items that are not listed from source.
* @throws IOException IO Problems
*/
FileStatusListingIterator(ObjectListingIterator source,
PathFilter filter,
- FileStatusAcceptor acceptor) throws IOException {
+ FileStatusAcceptor acceptor,
+ RemoteIterator<FileStatus> providedStatus) throws IOException {
this.source = source;
this.filter = filter;
this.acceptor = acceptor;
+ this.providedStatus = new HashSet<>();
+ for (; providedStatus != null && providedStatus.hasNext();) {
+ final FileStatus status = providedStatus.next();
+ if (filter.accept(status.getPath()) && acceptor.accept(status)) {
+ this.providedStatus.add(status);
+ }
+ }
// build the first set of results. This will not trigger any
// remote IO, assuming the source iterator is in its initial
// iteration
@@ -233,26 +357,53 @@ public class Listing {
* If there is data in the local filtered list, return true.
* Else: request more data util that condition is met, or there
* is no more remote listing data.
+ * Lastly, return true if the {@code providedStatusIterator}
+ * has left items.
* @return true if a call to {@link #next()} will succeed.
* @throws IOException
*/
@Override
public boolean hasNext() throws IOException {
- return statusBatchIterator.hasNext() || requestNextBatch();
+ return sourceHasNext() || providedStatusIterator.hasNext();
+ }
+
+ private boolean sourceHasNext() throws IOException {
+ if (statusBatchIterator.hasNext() || requestNextBatch()) {
+ return true;
+ } else {
+ // turn to file status that are only in provided list
+ if (providedStatusIterator == null) {
+ LOG.debug("Start iterating the provided status.");
+ providedStatusIterator = providedStatus.iterator();
+ }
+ return false;
+ }
}
@Override
public FileStatus next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
+ final FileStatus status;
+ if (sourceHasNext()) {
+ status = statusBatchIterator.next();
+ // We remove from provided list the file status listed by S3 so that
+ // this does not return duplicate items.
+ LOG.debug("Removing the status from provided file status {}", status);
+ providedStatus.remove(status);
+ } else {
+ if (providedStatusIterator.hasNext()) {
+ status = providedStatusIterator.next();
+ LOG.debug("Returning provided file status {}", status);
+ } else {
+ throw new NoSuchElementException();
+ }
}
- return statusBatchIterator.next();
+ return status;
}
/**
* Try to retrieve another batch.
* Note that for the initial batch,
- * {@link S3AFileSystem.ObjectListingIterator} does not generate a request;
+ * {@link ObjectListingIterator} does not generate a request;
* it simply returns the initial set.
*
* @return true if a new batch was created.
@@ -312,7 +463,7 @@ public class Listing {
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = owner.keyToQualifiedPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
- FileStatus status = new S3AFileStatus(false, keyPath,
+ FileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
owner.getUsername());
LOG.debug("Adding directory: {}", status);
added++;
@@ -352,7 +503,7 @@ public class Listing {
* instance.
*
* 2. Second and later invocations will continue the ongoing listing,
- * calling {@link #continueListObjects(ObjectListing)} to request the next
+ * calling {@link S3AFileSystem#continueListObjects} to request the next
* batch of results.
*
* 3. The {@link #hasNext()} predicate returns true for the initial call,
@@ -504,6 +655,11 @@ public class Listing {
public boolean accept(Path keyPath, String prefix) {
return false;
}
+
+ @Override
+ public boolean accept(FileStatus status) {
+ return (status != null) && status.isFile();
+ }
}
/**
@@ -534,6 +690,80 @@ public class Listing {
}
/**
+ * Wraps another iterator and filters out files that appear in the provided
+ * set of tombstones. Will read ahead in the iterator when necessary to
+ * ensure that emptiness is detected early enough if only deleted objects
+ * remain in the source iterator.
+ */
+ static class TombstoneReconcilingIterator implements
+ RemoteIterator<LocatedFileStatus> {
+ private LocatedFileStatus next = null;
+ private final RemoteIterator<LocatedFileStatus> iterator;
+ private final Set<Path> tombstones;
+
+ /**
+ * @param iterator Source iterator to filter
+ * @param tombstones set of tombstone markers to filter out of results
+ */
+ TombstoneReconcilingIterator(RemoteIterator<LocatedFileStatus>
+ iterator, Set<Path> tombstones) {
+ this.iterator = iterator;
+ if (tombstones != null) {
+ this.tombstones = tombstones;
+ } else {
+ this.tombstones = Collections.EMPTY_SET;
+ }
+ }
+
+ private boolean fetch() throws IOException {
+ while (next == null && iterator.hasNext()) {
+ LocatedFileStatus candidate = iterator.next();
+ if (!tombstones.contains(candidate.getPath())) {
+ next = candidate;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean hasNext() throws IOException {
+ if (next != null) {
+ return true;
+ }
+ return fetch();
+ }
+
+ public LocatedFileStatus next() throws IOException {
+ if (hasNext()) {
+ LocatedFileStatus result = next;
+ next = null;
+ fetch();
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+ }
+
+ /**
+ * Accept all entries except those which map to S3N pseudo directory markers.
+ */
+ static class AcceptAllButS3nDirs implements FileStatusAcceptor {
+
+ public boolean accept(Path keyPath, S3ObjectSummary summary) {
+ return !summary.getKey().endsWith(S3N_FOLDER_SUFFIX);
+ }
+
+ public boolean accept(Path keyPath, String prefix) {
+ return !keyPath.toString().endsWith(S3N_FOLDER_SUFFIX);
+ }
+
+ public boolean accept(FileStatus status) {
+ return !status.getPath().toString().endsWith(S3N_FOLDER_SUFFIX);
+ }
+
+ }
+
+ /**
* Accept all entries except the base path and those which map to S3N
* pseudo directory markers.
*/
@@ -575,6 +805,11 @@ public class Listing {
public boolean accept(Path keyPath, String prefix) {
return !keyPath.equals(qualifiedPath);
}
+
+ @Override
+ public boolean accept(FileStatus status) {
+ return (status != null) && !status.getPath().equals(qualifiedPath);
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org