You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/09/01 20:01:50 UTC
[62/74] [abbrv] hadoop git commit: HADOOP-13345 HS3Guard: Improved
Consistency for S3A. Contributed by: Chris Nauroth, Aaron Fabbri,
Mingliang Liu, Lei (Eddy) Xu, Sean Mackrory, Steve Loughran and others.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
new file mode 100644
index 0000000..6cff533
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.AmazonS3;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
+
+/**
+ * Test S3Guard list consistency feature by injecting delayed listObjects()
+ * visibility via {@link InconsistentAmazonS3Client}.
+ *
+ * Tests here generally:
+ * 1. Use the inconsistency injection mentioned above.
+ * 2. Only run when S3Guard is enabled.
+ */
+public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
+ S3ClientFactory.class);
+ // Other configs would break test assumptions
+ conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
+ conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
+ conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
+ return new S3AContract(conf);
+ }
+
+ /**
+ * Helper function for other test cases: does a single rename operation and
+ * validates the aftermath.
+ * @param mkdirs Directories to create
+ * @param srcdirs Source paths for rename operation
+ * @param dstdirs Destination paths for rename operation
+ * @param yesdirs Files that must exist post-rename (e.g. srcdirs children)
+ * @param nodirs Files that must not exist post-rename (e.g. dstdirs children)
+ * @throws Exception
+ */
+ private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs,
+ Path[] dstdirs, Path[] yesdirs, Path[] nodirs) throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ Assume.assumeTrue(fs.hasMetadataStore());
+
+ if (mkdirs != null) {
+ for (Path mkdir : mkdirs) {
+ assertTrue(fs.mkdirs(mkdir));
+ }
+ clearInconsistency(fs);
+ }
+
+ assertTrue("srcdirs and dstdirs must have equal length",
+ srcdirs.length == dstdirs.length);
+ for (int i = 0; i < srcdirs.length; i++) {
+ assertTrue("Rename returned false: " + srcdirs[i] + " -> " + dstdirs[i],
+ fs.rename(srcdirs[i], dstdirs[i]));
+ }
+
+ for (Path yesdir : yesdirs) {
+ assertTrue("Path was supposed to exist: " + yesdir, fs.exists(yesdir));
+ }
+ for (Path nodir : nodirs) {
+ assertFalse("Path is not supposed to exist: " + nodir, fs.exists(nodir));
+ }
+ }
+
+ /**
+ * Tests that after renaming a directory, the original directory and its
+ * contents are indeed missing and the corresponding new paths are visible.
+ * @throws Exception
+ */
+ @Test
+ public void testConsistentListAfterRename() throws Exception {
+ Path[] mkdirs = {
+ path("d1/f"),
+ path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)
+ };
+ Path[] srcdirs = {path("d1")};
+ Path[] dstdirs = {path("d2")};
+ Path[] yesdirs = {path("d2"), path("d2/f"),
+ path("d2/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
+ Path[] nodirs = {path("d1"), path("d1/f"),
+ path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
+ doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs);
+ getFileSystem().delete(path("d1"), true);
+ getFileSystem().delete(path("d2"), true);
+ }
+
+ /**
+ * Tests a circular sequence of renames to verify that overwriting recently
+ * deleted files and reading recently created files from rename operations
+ * works as expected.
+ * @throws Exception
+ */
+ @Test
+ public void testRollingRenames() throws Exception {
+ Path[] dir0 = {path("rolling/1")};
+ Path[] dir1 = {path("rolling/2")};
+ Path[] dir2 = {path("rolling/3")};
+ // These sets have to be in reverse order compared to the movement
+ Path[] setA = {dir1[0], dir0[0]};
+ Path[] setB = {dir2[0], dir1[0]};
+ Path[] setC = {dir0[0], dir2[0]};
+
+ for(int i = 0; i < 2; i++) {
+ Path[] firstSet = i == 0 ? setA : null;
+ doTestRenameSequence(firstSet, setA, setB, setB, dir0);
+ doTestRenameSequence(null, setB, setC, setC, dir1);
+ doTestRenameSequence(null, setC, setA, setA, dir2);
+ }
+
+ S3AFileSystem fs = getFileSystem();
+ assertFalse("Renaming deleted file should have failed",
+ fs.rename(dir2[0], dir1[0]));
+ assertTrue("Renaming over existing file should have succeeded",
+ fs.rename(dir1[0], dir0[0]));
+ }
+
+ /**
+ * Tests that deleted files immediately stop manifesting in list operations
+ * even when the effect in S3 is delayed.
+ * @throws Exception
+ */
+ @Test
+ public void testConsistentListAfterDelete() throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ // test will fail if NullMetadataStore (the default) is configured: skip it.
+ Assume.assumeTrue(fs.hasMetadataStore());
+
+ // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+ // in listObjects() results via InconsistentS3Client
+ Path inconsistentPath =
+ path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
+
+ Path[] testDirs = {path("a/b/dir1"),
+ path("a/b/dir2"),
+ inconsistentPath};
+
+ for (Path path : testDirs) {
+ assertTrue(fs.mkdirs(path));
+ }
+ clearInconsistency(fs);
+ for (Path path : testDirs) {
+ assertTrue(fs.delete(path, false));
+ }
+
+ FileStatus[] paths = fs.listStatus(path("a/b/"));
+ List<Path> list = new ArrayList<>();
+ for (FileStatus fileState : paths) {
+ list.add(fileState.getPath());
+ }
+ assertFalse(list.contains(path("a/b/dir1")));
+ assertFalse(list.contains(path("a/b/dir2")));
+ // This should fail without S3Guard, and succeed with it.
+ assertFalse(list.contains(inconsistentPath));
+ }
+
+ /**
+ * Tests that rename immediately after files in the source directory are
+ * deleted results in exactly the correct set of destination files and none
+ * of the source files.
+ * @throws Exception
+ */
+ @Test
+ public void testConsistentRenameAfterDelete() throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ // test will fail if NullMetadataStore (the default) is configured: skip it.
+ Assume.assumeTrue(fs.hasMetadataStore());
+
+ // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+ // in listObjects() results via InconsistentS3Client
+ Path inconsistentPath =
+ path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
+
+ Path[] testDirs = {path("a/b/dir1"),
+ path("a/b/dir2"),
+ inconsistentPath};
+
+ for (Path path : testDirs) {
+ assertTrue(fs.mkdirs(path));
+ }
+ clearInconsistency(fs);
+ assertTrue(fs.delete(testDirs[1], false));
+ assertTrue(fs.delete(testDirs[2], false));
+
+ fs.rename(path("a"), path("a3"));
+ FileStatus[] paths = fs.listStatus(path("a3/b"));
+ List<Path> list = new ArrayList<>();
+ for (FileStatus fileState : paths) {
+ list.add(fileState.getPath());
+ }
+ assertTrue(list.contains(path("a3/b/dir1")));
+ assertFalse(list.contains(path("a3/b/dir2")));
+ // This should fail without S3Guard, and succeed with it.
+ assertFalse(list.contains(path("a3/b/dir3-" +
+ DEFAULT_DELAY_KEY_SUBSTRING)));
+
+ try {
+ RemoteIterator<LocatedFileStatus> old = fs.listFilesAndEmptyDirectories(
+ path("a"), true);
+ fail("Recently renamed dir should not be visible");
+ } catch(FileNotFoundException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testConsistentListStatusAfterPut() throws Exception {
+
+ S3AFileSystem fs = getFileSystem();
+
+ // This test will fail if NullMetadataStore (the default) is configured:
+ // skip it.
+ Assume.assumeTrue(fs.hasMetadataStore());
+
+ // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+ // in listObjects() results via InconsistentS3Client
+ Path inconsistentPath =
+ path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
+
+ Path[] testDirs = {path("a/b/dir1"),
+ path("a/b/dir2"),
+ inconsistentPath};
+
+ for (Path path : testDirs) {
+ assertTrue(fs.mkdirs(path));
+ }
+
+ FileStatus[] paths = fs.listStatus(path("a/b/"));
+ List<Path> list = new ArrayList<>();
+ for (FileStatus fileState : paths) {
+ list.add(fileState.getPath());
+ }
+ assertTrue(list.contains(path("a/b/dir1")));
+ assertTrue(list.contains(path("a/b/dir2")));
+ // This should fail without S3Guard, and succeed with it.
+ assertTrue(list.contains(inconsistentPath));
+ }
+
+ /**
+ * Similar to {@link #testConsistentListStatusAfterPut()}, this tests that the
+ * FS listLocatedStatus() call will return consistent list.
+ */
+ @Test
+ public void testConsistentListLocatedStatusAfterPut() throws Exception {
+ final S3AFileSystem fs = getFileSystem();
+ // This test will fail if NullMetadataStore (the default) is configured:
+ // skip it.
+ Assume.assumeTrue(fs.hasMetadataStore());
+ String rootDir = "doTestConsistentListLocatedStatusAfterPut";
+ fs.mkdirs(path(rootDir));
+
+ final int[] numOfPaths = {0, 1, 5};
+ for (int normalPathNum : numOfPaths) {
+ for (int delayedPathNum : new int[] {0, 2}) {
+ LOG.info("Testing with normalPathNum={}, delayedPathNum={}",
+ normalPathNum, delayedPathNum);
+ doTestConsistentListLocatedStatusAfterPut(fs, rootDir, normalPathNum,
+ delayedPathNum);
+ }
+ }
+ }
+
+ /**
+ * Helper method to implement the tests of consistent listLocatedStatus().
+ * @param fs The S3 file system from contract
+ * @param normalPathNum number paths listed directly from S3 without delaying
+ * @param delayedPathNum number paths listed with delaying
+ * @throws Exception
+ */
+ private void doTestConsistentListLocatedStatusAfterPut(S3AFileSystem fs,
+ String rootDir, int normalPathNum, int delayedPathNum) throws Exception {
+ final List<Path> testDirs = new ArrayList<>(normalPathNum + delayedPathNum);
+ int index = 0;
+ for (; index < normalPathNum; index++) {
+ testDirs.add(path(rootDir + "/dir-" +
+ index));
+ }
+ for (; index < normalPathNum + delayedPathNum; index++) {
+ // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+ // in listObjects() results via InconsistentS3Client
+ testDirs.add(path(rootDir + "/dir-" + index +
+ DEFAULT_DELAY_KEY_SUBSTRING));
+ }
+
+ for (Path path : testDirs) {
+ // delete the old test path (if any) so that when we call mkdirs() later,
+ // the to delay directories will be tracked via putObject() request.
+ fs.delete(path, true);
+ assertTrue(fs.mkdirs(path));
+ }
+
+ // this should return the union data from S3 and MetadataStore
+ final RemoteIterator<LocatedFileStatus> statusIterator =
+ fs.listLocatedStatus(path(rootDir + "/"));
+ List<Path> list = new ArrayList<>();
+ for (; statusIterator.hasNext();) {
+ list.add(statusIterator.next().getPath());
+ }
+
+ // This should fail without S3Guard, and succeed with it because part of the
+ // children under test path are delaying visibility
+ for (Path path : testDirs) {
+ assertTrue("listLocatedStatus should list " + path, list.contains(path));
+ }
+ }
+
+ /**
+ * Tests that the S3AFS listFiles() call will return consistent file list.
+ */
+ @Test
+ public void testConsistentListFiles() throws Exception {
+ final S3AFileSystem fs = getFileSystem();
+ // This test will fail if NullMetadataStore (the default) is configured:
+ // skip it.
+ Assume.assumeTrue(fs.hasMetadataStore());
+
+ final int[] numOfPaths = {0, 2};
+ for (int dirNum : numOfPaths) {
+ for (int normalFile : numOfPaths) {
+ for (int delayedFile : new int[] {0, 1}) {
+ for (boolean recursive : new boolean[] {true, false}) {
+ doTestListFiles(fs, dirNum, normalFile, delayedFile, recursive);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Helper method to implement the tests of consistent listFiles().
+ *
+ * The file structure has dirNum subdirectories, and each directory (including
+ * the test base directory itself) has normalFileNum normal files and
+ * delayedFileNum delayed files.
+ *
+ * @param fs The S3 file system from contract
+ * @param dirNum number of subdirectories
+ * @param normalFileNum number files in each directory without delay to list
+ * @param delayedFileNum number files in each directory with delay to list
+ * @param recursive listFiles recursively if true
+ * @throws Exception if any unexpected error
+ */
+ private void doTestListFiles(S3AFileSystem fs, int dirNum, int normalFileNum,
+ int delayedFileNum, boolean recursive) throws Exception {
+ describe("Testing dirNum=%d, normalFile=%d, delayedFile=%d, "
+ + "recursive=%s", dirNum, normalFileNum, delayedFileNum, recursive);
+ final Path baseTestDir = path("doTestListFiles-" + dirNum + "-"
+ + normalFileNum + "-" + delayedFileNum + "-" + recursive);
+ // delete the old test path (if any) so that when we call mkdirs() later,
+ // the to delay sub directories will be tracked via putObject() request.
+ fs.delete(baseTestDir, true);
+
+ // make subdirectories (if any)
+ final List<Path> testDirs = new ArrayList<>(dirNum + 1);
+ assertTrue(fs.mkdirs(baseTestDir));
+ testDirs.add(baseTestDir);
+ for (int i = 0; i < dirNum; i++) {
+ final Path subdir = path(baseTestDir + "/dir-" + i);
+ assertTrue(fs.mkdirs(subdir));
+ testDirs.add(subdir);
+ }
+
+ final Collection<String> fileNames
+ = new ArrayList<>(normalFileNum + delayedFileNum);
+ int index = 0;
+ for (; index < normalFileNum; index++) {
+ fileNames.add("file-" + index);
+ }
+ for (; index < normalFileNum + delayedFileNum; index++) {
+ // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+ // in listObjects() results via InconsistentS3Client
+ fileNames.add("file-" + index + "-" + DEFAULT_DELAY_KEY_SUBSTRING);
+ }
+
+ int filesAndEmptyDirectories = 0;
+
+ // create files under each test directory
+ for (Path dir : testDirs) {
+ for (String fileName : fileNames) {
+ writeTextFile(fs, new Path(dir, fileName), "I, " + fileName, false);
+ filesAndEmptyDirectories++;
+ }
+ }
+
+ // this should return the union data from S3 and MetadataStore
+ final RemoteIterator<LocatedFileStatus> statusIterator
+ = fs.listFiles(baseTestDir, recursive);
+ final Collection<Path> listedFiles = new HashSet<>();
+ for (; statusIterator.hasNext();) {
+ final FileStatus status = statusIterator.next();
+ assertTrue("FileStatus " + status + " is not a file!", status.isFile());
+ listedFiles.add(status.getPath());
+ }
+ LOG.info("S3AFileSystem::listFiles('{}', {}) -> {}",
+ baseTestDir, recursive, listedFiles);
+
+ // This should fail without S3Guard, and succeed with it because part of the
+ // files to list are delaying visibility
+ if (!recursive) {
+ // in this case only the top level files are listed
+ assertEquals("Unexpected number of files returned by listFiles() call",
+ normalFileNum + delayedFileNum, listedFiles.size());
+ verifyFileIsListed(listedFiles, baseTestDir, fileNames);
+ } else {
+ assertEquals("Unexpected number of files returned by listFiles() call",
+ filesAndEmptyDirectories,
+ listedFiles.size());
+ for (Path dir : testDirs) {
+ verifyFileIsListed(listedFiles, dir, fileNames);
+ }
+ }
+ }
+
+ private static void verifyFileIsListed(Collection<Path> listedFiles,
+ Path currentDir, Collection<String> fileNames) {
+ for (String fileName : fileNames) {
+ final Path file = new Path(currentDir, fileName);
+ assertTrue(file + " should have been listed", listedFiles.contains(file));
+ }
+ }
+
+ @Test
+ public void testCommitByRenameOperations() throws Throwable {
+ S3AFileSystem fs = getFileSystem();
+ Assume.assumeTrue(fs.hasMetadataStore());
+ Path work = path("test-commit-by-rename-" + DEFAULT_DELAY_KEY_SUBSTRING);
+ Path task00 = new Path(work, "task00");
+ fs.mkdirs(task00);
+ String name = "part-00";
+ try (FSDataOutputStream out =
+ fs.create(new Path(task00, name), false)) {
+ out.writeChars("hello");
+ }
+ for (FileStatus stat : fs.listStatus(task00)) {
+ fs.rename(stat.getPath(), work);
+ }
+ List<FileStatus> files = new ArrayList<>(2);
+ for (FileStatus stat : fs.listStatus(work)) {
+ if (stat.isFile()) {
+ files.add(stat);
+ }
+ }
+ assertFalse("renamed file " + name + " not found in " + work,
+ files.isEmpty());
+ assertEquals("more files found than expected in " + work
+ + " " + ls(work), 1, files.size());
+ FileStatus status = files.get(0);
+ assertEquals("Wrong filename in " + status,
+ name, status.getPath().getName());
+ }
+
+ @Test
+ public void testInconsistentS3ClientDeletes() throws Throwable {
+ S3AFileSystem fs = getFileSystem();
+ Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
+ for (int i = 0; i < 3; i++) {
+ fs.mkdirs(new Path(root, "dir" + i));
+ touch(fs, new Path(root, "file" + i));
+ for (int j = 0; j < 3; j++) {
+ touch(fs, new Path(new Path(root, "dir" + i), "file" + i + "-" + j));
+ }
+ }
+ clearInconsistency(fs);
+
+ AmazonS3 client = fs.getAmazonS3Client();
+ String key = fs.pathToKey(root) + "/";
+
+ ObjectListing preDeleteDelimited = client.listObjects(
+ fs.createListObjectsRequest(key, "/"));
+ ObjectListing preDeleteUndelimited = client.listObjects(
+ fs.createListObjectsRequest(key, null));
+
+ fs.delete(root, true);
+
+ ObjectListing postDeleteDelimited = client.listObjects(
+ fs.createListObjectsRequest(key, "/"));
+ ObjectListing postDeleteUndelimited = client.listObjects(
+ fs.createListObjectsRequest(key, null));
+
+ assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
+ "in a non-recursive listing",
+ preDeleteDelimited.getObjectSummaries().size(),
+ postDeleteDelimited.getObjectSummaries().size()
+ );
+ assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
+ "in a non-recursive listing",
+ preDeleteDelimited.getCommonPrefixes().size(),
+ postDeleteDelimited.getCommonPrefixes().size()
+ );
+ assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
+ "in a recursive listing",
+ preDeleteUndelimited.getObjectSummaries().size(),
+ postDeleteUndelimited.getObjectSummaries().size()
+ );
+ assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
+ "in a recursive listing",
+ preDeleteUndelimited.getCommonPrefixes().size(),
+ postDeleteUndelimited.getCommonPrefixes().size()
+ );
+ }
+
+ private static void clearInconsistency(S3AFileSystem fs) throws Exception {
+ AmazonS3 s3 = fs.getAmazonS3Client();
+ InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
+ ic.clearInconsistency();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
new file mode 100644
index 0000000..a63b696
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardWriteBack.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+
+/**
+ * Test cases that validate S3Guard's behavior for writing things like
+ * directory listings back to the MetadataStore.
+ */
+public class ITestS3GuardWriteBack extends AbstractS3ATestBase {
+
+ /**
+ * In listStatus(), when S3Guard is enabled, the full listing for a
+ * directory is "written back" to the MetadataStore before the listing is
+ * returned. Currently this "write back" behavior occurs when
+ * fs.s3a.metadatastore.authoritative is true. This test validates this
+ * behavior.
+ * @throws Exception on failure
+ */
+ @Test
+ public void testListStatusWriteBack() throws Exception {
+ Assume.assumeTrue(getFileSystem().hasMetadataStore());
+
+ Path directory = path("ListStatusWriteBack");
+
+ // "raw" S3AFileSystem without S3Guard
+ S3AFileSystem noS3Guard = createTestFS(directory.toUri(), true, false);
+
+ // Another with S3Guard and write-back disabled
+ S3AFileSystem noWriteBack = createTestFS(directory.toUri(), false, false);
+
+ // Another S3Guard and write-back enabled
+ S3AFileSystem yesWriteBack = createTestFS(directory.toUri(), false, true);
+
+ // delete the existing directory (in case of last test failure)
+ noS3Guard.delete(directory, true);
+ // Create a directory on S3 only
+ noS3Guard.mkdirs(new Path(directory, "OnS3"));
+ // Create a directory on both S3 and metadata store
+ Path p = new Path(directory, "OnS3AndMS");
+ assertPathDoesntExist(noWriteBack, p);
+ noWriteBack.mkdirs(p);
+
+ FileStatus[] fsResults;
+ DirListingMetadata mdResults;
+
+ // FS should return both even though S3Guard is not writing back to MS
+ fsResults = noWriteBack.listStatus(directory);
+ assertEquals("Filesystem enabled S3Guard without write back should have "
+ + "both /OnS3 and /OnS3AndMS: " + Arrays.toString(fsResults),
+ 2, fsResults.length);
+
+ // Metadata store without write-back should still only contain /OnS3AndMS,
+ // because newly discovered /OnS3 is not written back to metadata store
+ mdResults = noWriteBack.getMetadataStore().listChildren(directory);
+ assertEquals("Metadata store without write back should still only know "
+ + "about /OnS3AndMS, but it has: " + mdResults,
+ 1, mdResults.numEntries());
+
+ // FS should return both (and will write it back)
+ fsResults = yesWriteBack.listStatus(directory);
+ assertEquals("Filesystem enabled S3Guard with write back should have "
+ + " both /OnS3 and /OnS3AndMS: " + Arrays.toString(fsResults),
+ 2, fsResults.length);
+
+ // Metadata store with write-back should contain both because the newly
+ // discovered /OnS3 should have been written back to metadata store
+ mdResults = yesWriteBack.getMetadataStore().listChildren(directory);
+ assertEquals("Unexpected number of results from metadata store. "
+ + "Should have /OnS3 and /OnS3AndMS: " + mdResults,
+ 2, mdResults.numEntries());
+
+ // If we don't clean this up, the next test run will fail because it will
+ // have recorded /OnS3 being deleted even after it's written to noS3Guard.
+ getFileSystem().getMetadataStore().forgetMetadata(
+ new Path(directory, "OnS3"));
+ }
+
+ /** Create a separate S3AFileSystem instance for testing. */
+ private S3AFileSystem createTestFS(URI fsURI, boolean disableS3Guard,
+ boolean authoritativeMeta) throws IOException {
+ Configuration conf;
+
+ // Create a FileSystem that is S3-backed only
+ conf = createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ if (disableS3Guard) {
+ conf.set(Constants.S3_METADATA_STORE_IMPL,
+ Constants.S3GUARD_METASTORE_NULL);
+ } else {
+ S3ATestUtils.maybeEnableS3Guard(conf);
+ conf.setBoolean(Constants.METADATASTORE_AUTHORITATIVE, authoritativeMeta);
+ }
+ FileSystem fs = FileSystem.get(fsURI, conf);
+ return asS3AFS(fs);
+ }
+
+ private static S3AFileSystem asS3AFS(FileSystem fs) {
+ assertTrue("Not a S3AFileSystem: " + fs, fs instanceof S3AFileSystem);
+ return (S3AFileSystem)fs;
+ }
+
+ private static void assertPathDoesntExist(FileSystem fs, Path p)
+ throws IOException {
+ try {
+ FileStatus s = fs.getFileStatus(p);
+ } catch (FileNotFoundException e) {
+ return;
+ }
+ fail("Path should not exist: " + p);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
index 9e0a5e4..4e25380 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.*;
import java.net.URI;
import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.Region;
/**
* An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
@@ -35,6 +36,8 @@ public class MockS3ClientFactory implements S3ClientFactory {
String bucket = name.getHost();
AmazonS3 s3 = mock(AmazonS3.class);
when(s3.doesBucketExist(bucket)).thenReturn(true);
+ when(s3.getBucketLocation(anyString()))
+ .thenReturn(Region.US_West.toString());
return s3;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index acbe610..2c4f009 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -135,6 +135,18 @@ public interface S3ATestConstants {
String TEST_STS_ENDPOINT = "test.fs.s3a.sts.endpoint";
/**
+ * Various S3Guard tests.
+ */
+ String TEST_S3GUARD_PREFIX = "fs.s3a.s3guard.test";
+ String TEST_S3GUARD_ENABLED = TEST_S3GUARD_PREFIX + ".enabled";
+ String TEST_S3GUARD_AUTHORITATIVE = TEST_S3GUARD_PREFIX + ".authoritative";
+ String TEST_S3GUARD_IMPLEMENTATION = TEST_S3GUARD_PREFIX + ".implementation";
+ String TEST_S3GUARD_IMPLEMENTATION_LOCAL = "local";
+ String TEST_S3GUARD_IMPLEMENTATION_DYNAMO = "dynamo";
+ String TEST_S3GUARD_IMPLEMENTATION_DYNAMODBLOCAL = "dynamodblocal";
+ String TEST_S3GUARD_IMPLEMENTATION_NONE = "none";
+
+ /**
* Timeout in Milliseconds for standard tests: {@value}.
*/
int S3A_TEST_TIMEOUT = 10 * 60 * 1000;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 9528967..8dbf90a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -22,7 +22,14 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory;
+import org.apache.hadoop.fs.s3a.s3guard.DynamoDBLocalClientFactory;
+import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
+
+import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.internal.AssumptionViolatedException;
@@ -31,11 +38,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
import static org.junit.Assert.*;
/**
@@ -52,6 +61,15 @@ public final class S3ATestUtils {
public static final String UNSET_PROPERTY = "unset";
/**
+ * Get S3A FS name.
+ * @param conf configuration.
+ * @return S3A fs name.
+ */
+ public static String getFsName(Configuration conf) {
+ return conf.getTrimmed(TEST_FS_S3A_NAME, "");
+ }
+
+ /**
* Create the test filesystem.
*
* If the test.fs.s3a.name property is not set, this will
@@ -97,6 +115,8 @@ public final class S3ATestUtils {
throw new AssumptionViolatedException(
"No test filesystem in " + TEST_FS_S3A_NAME);
}
+ // patch in S3Guard options
+ maybeEnableS3Guard(conf);
S3AFileSystem fs1 = new S3AFileSystem();
//enable purging in tests
if (purge) {
@@ -137,6 +157,8 @@ public final class S3ATestUtils {
throw new AssumptionViolatedException("No test filesystem in "
+ TEST_FS_S3A_NAME);
}
+ // patch in S3Guard options
+ maybeEnableS3Guard(conf);
FileContext fc = FileContext.getFileContext(testURI, conf);
return fc;
}
@@ -301,13 +323,96 @@ public final class S3ATestUtils {
* @return a path
*/
public static Path createTestPath(Path defVal) {
- String testUniqueForkId = System.getProperty(
- S3ATestConstants.TEST_UNIQUE_FORK_ID);
+ String testUniqueForkId =
+ System.getProperty(S3ATestConstants.TEST_UNIQUE_FORK_ID);
return testUniqueForkId == null ? defVal :
new Path("/" + testUniqueForkId, "test");
}
/**
+ * Test assumption that S3Guard is/is not enabled.
+ * @param shouldBeEnabled should S3Guard be enabled?
+ * @param originalConf configuration to check
+ * @throws URISyntaxException
+ */
+ public static void assumeS3GuardState(boolean shouldBeEnabled,
+ Configuration originalConf) throws URISyntaxException {
+ boolean isEnabled = getTestPropertyBool(originalConf, TEST_S3GUARD_ENABLED,
+ originalConf.getBoolean(TEST_S3GUARD_ENABLED, false));
+ Assume.assumeThat("Unexpected S3Guard test state:"
+ + " shouldBeEnabled=" + shouldBeEnabled
+ + " and isEnabled=" + isEnabled,
+ shouldBeEnabled, Is.is(isEnabled));
+
+ final String fsname = originalConf.getTrimmed(TEST_FS_S3A_NAME);
+ Assume.assumeNotNull(fsname);
+ final String bucket = new URI(fsname).getHost();
+ final Configuration conf = propagateBucketOptions(originalConf, bucket);
+ boolean usingNullImpl = S3GUARD_METASTORE_NULL.equals(
+ conf.getTrimmed(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_NULL));
+ Assume.assumeThat("Unexpected S3Guard test state:"
+ + " shouldBeEnabled=" + shouldBeEnabled
+ + " but usingNullImpl=" + usingNullImpl,
+ shouldBeEnabled, Is.is(!usingNullImpl));
+ }
+
+ /**
+ * Conditionally set the S3Guard options from test properties.
+ * @param conf configuration
+ */
+ public static void maybeEnableS3Guard(Configuration conf) {
+ if (getTestPropertyBool(conf, TEST_S3GUARD_ENABLED,
+ conf.getBoolean(TEST_S3GUARD_ENABLED, false))) {
+ // S3Guard is enabled.
+ boolean authoritative = getTestPropertyBool(conf,
+ TEST_S3GUARD_AUTHORITATIVE,
+ conf.getBoolean(TEST_S3GUARD_AUTHORITATIVE, true));
+ String impl = getTestProperty(conf, TEST_S3GUARD_IMPLEMENTATION,
+ conf.get(TEST_S3GUARD_IMPLEMENTATION,
+ TEST_S3GUARD_IMPLEMENTATION_LOCAL));
+ String implClass = "";
+ switch (impl) {
+ case TEST_S3GUARD_IMPLEMENTATION_LOCAL:
+ implClass = S3GUARD_METASTORE_LOCAL;
+ break;
+ case TEST_S3GUARD_IMPLEMENTATION_DYNAMODBLOCAL:
+ conf.setClass(S3Guard.S3GUARD_DDB_CLIENT_FACTORY_IMPL,
+ DynamoDBLocalClientFactory.class, DynamoDBClientFactory.class);
+ case TEST_S3GUARD_IMPLEMENTATION_DYNAMO:
+ implClass = S3GUARD_METASTORE_DYNAMO;
+ break;
+ case TEST_S3GUARD_IMPLEMENTATION_NONE:
+ implClass = S3GUARD_METASTORE_NULL;
+ break;
+ default:
+ fail("Unknown s3guard back end: \"" + impl + "\"");
+ }
+ LOG.debug("Enabling S3Guard, authoritative={}, implementation={}",
+ authoritative, implClass);
+ conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
+ conf.set(S3_METADATA_STORE_IMPL, implClass);
+ conf.setBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, true);
+ }
+ }
+
+ /**
+ * Is there a MetadataStore configured for s3a with authoritative enabled?
+ * @param conf Configuration to test.
+ * @return true iff there is a MetadataStore configured, and it is
+ * configured allow authoritative results. This can result in reducing
+ * round trips to S3 service for cached results, which may affect FS/FC
+ * statistics.
+ */
+ public static boolean isMetadataStoreAuthoritative(Configuration conf) {
+ if (conf == null) {
+ return Constants.DEFAULT_METADATASTORE_AUTHORITATIVE;
+ }
+ return conf.getBoolean(
+ Constants.METADATASTORE_AUTHORITATIVE,
+ Constants.DEFAULT_METADATASTORE_AUTHORITATIVE);
+ }
+
+ /**
* Reset all metrics in a list.
* @param metrics metrics to reset
*/
@@ -504,6 +609,94 @@ public final class S3ATestUtils {
}
/**
+ * Verify the core size, block size and timestamp values of a file.
+ * @param status status entry to check
+ * @param size file size
+ * @param blockSize block size
+ * @param modTime modified time
+ */
+ public static void verifyFileStatus(FileStatus status, long size,
+ long blockSize, long modTime) {
+ verifyFileStatus(status, size, 0, modTime, 0, blockSize, null, null, null);
+ }
+
+ /**
+ * Verify the status entry of a file matches that expected.
+ * @param status status entry to check
+ * @param size file size
+ * @param replication replication factor (may be 0)
+ * @param modTime modified time
+ * @param accessTime access time (may be 0)
+ * @param blockSize block size
+ * @param owner owner (may be null)
+ * @param group user group (may be null)
+ * @param permission permission (may be null)
+ */
+ public static void verifyFileStatus(FileStatus status,
+ long size,
+ int replication,
+ long modTime,
+ long accessTime,
+ long blockSize,
+ String owner,
+ String group,
+ FsPermission permission) {
+ String details = status.toString();
+ assertFalse("Not a dir: " + details, status.isDirectory());
+ assertEquals("Mod time: " + details, modTime, status.getModificationTime());
+ assertEquals("File size: " + details, size, status.getLen());
+ assertEquals("Block size: " + details, blockSize, status.getBlockSize());
+ if (replication > 0) {
+ assertEquals("Replication value: " + details, replication,
+ status.getReplication());
+ }
+ if (accessTime != 0) {
+ assertEquals("Access time: " + details, accessTime,
+ status.getAccessTime());
+ }
+ if (owner != null) {
+ assertEquals("Owner: " + details, owner, status.getOwner());
+ }
+ if (group != null) {
+ assertEquals("Group: " + details, group, status.getGroup());
+ }
+ if (permission != null) {
+ assertEquals("Permission: " + details, permission,
+ status.getPermission());
+ }
+ }
+
+ /**
+ * Verify the status entry of a directory matches that expected.
+ * @param status status entry to check
+ * @param replication replication factor
+ * @param modTime modified time
+ * @param accessTime access time
+ * @param owner owner
+ * @param group user group
+ * @param permission permission.
+ */
+ public static void verifyDirStatus(FileStatus status,
+ int replication,
+ long modTime,
+ long accessTime,
+ String owner,
+ String group,
+ FsPermission permission) {
+ String details = status.toString();
+ assertTrue("Is a dir: " + details, status.isDirectory());
+ assertEquals("zero length: " + details, 0, status.getLen());
+
+ assertEquals("Mod time: " + details, modTime, status.getModificationTime());
+ assertEquals("Replication value: " + details, replication,
+ status.getReplication());
+ assertEquals("Access time: " + details, accessTime, status.getAccessTime());
+ assertEquals("Owner: " + details, owner, status.getOwner());
+ assertEquals("Group: " + details, group, status.getGroup());
+ assertEquals("Permission: " + details, permission, status.getPermission());
+ }
+
+ /**
* Set a bucket specific property to a particular value.
* If the generic key passed in has an {@code fs.s3a. prefix},
* that's stripped off, so that when the the bucket properties are propagated
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
new file mode 100644
index 0000000..e647327
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestListing.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
+import static org.apache.hadoop.fs.s3a.Listing.ProvidedFileStatusIterator;
+
+/**
+ * Place for the S3A listing classes; keeps all the small classes under control.
+ */
+public class TestListing extends AbstractS3AMockTest {
+
+ private static class MockRemoteIterator<FileStatus> implements
+ RemoteIterator<FileStatus> {
+ private Iterator<FileStatus> iterator;
+
+ MockRemoteIterator(Collection<FileStatus> source) {
+ iterator = source.iterator();
+ }
+
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ public FileStatus next() {
+ return iterator.next();
+ }
+ }
+
+ private FileStatus blankFileStatus(Path path) {
+ return new FileStatus(0, true, 0, 0, 0, path);
+ }
+
+ @Test
+ public void testTombstoneReconcilingIterator() throws Exception {
+ Path parent = new Path("/parent");
+ Path liveChild = new Path(parent, "/liveChild");
+ Path deletedChild = new Path(parent, "/deletedChild");
+ Path[] allFiles = {parent, liveChild, deletedChild};
+ Path[] liveFiles = {parent, liveChild};
+
+ Listing listing = new Listing(fs);
+ Collection<FileStatus> statuses = new ArrayList<>();
+ statuses.add(blankFileStatus(parent));
+ statuses.add(blankFileStatus(liveChild));
+ statuses.add(blankFileStatus(deletedChild));
+
+ Set<Path> tombstones = new HashSet<>();
+ tombstones.add(deletedChild);
+
+ RemoteIterator<FileStatus> sourceIterator = new MockRemoteIterator(
+ statuses);
+ RemoteIterator<LocatedFileStatus> locatedIterator =
+ listing.createLocatedFileStatusIterator(sourceIterator);
+ RemoteIterator<LocatedFileStatus> reconcilingIterator =
+ listing.createTombstoneReconcilingIterator(locatedIterator, tombstones);
+
+ Set<Path> expectedPaths = new HashSet<>();
+ expectedPaths.add(parent);
+ expectedPaths.add(liveChild);
+
+ Set<Path> actualPaths = new HashSet<>();
+ while (reconcilingIterator.hasNext()) {
+ actualPaths.add(reconcilingIterator.next().getPath());
+ }
+ Assert.assertTrue(actualPaths.equals(expectedPaths));
+ }
+
+ @Test
+ public void testProvidedFileStatusIteratorEnd() throws Exception {
+ FileStatus[] statuses = {
+ new FileStatus(100, false, 1, 8192, 0, new Path("s3a://blah/blah"))
+ };
+ ProvidedFileStatusIterator it = new ProvidedFileStatusIterator(statuses,
+ ACCEPT_ALL, new Listing.AcceptAllButS3nDirs());
+
+ Assert.assertTrue("hasNext() should return true first time", it.hasNext());
+ Assert.assertNotNull("first element should not be null", it.next());
+ Assert.assertFalse("hasNext() should now be false", it.hasNext());
+ try {
+ it.next();
+ Assert.fail("next() should have thrown exception");
+ } catch (NoSuchElementException e) {
+ // Correct behavior. Any other exceptions are propagated as failure.
+ return;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index e1aef75..e493818 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -39,7 +39,9 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest {
@After
public void tearDown() throws Exception {
- fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
+ if (fc != null) {
+ fc.delete(fileContextTestHelper.getTestRootPath(fc, "test"), true);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
index fff1fcb..725646c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextURI.java
@@ -16,19 +16,29 @@ package org.apache.hadoop.fs.s3a.fileContext;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContextURIBase;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestFileSystem;
+
/**
* S3a implementation of FileContextURIBase.
*/
public class ITestS3AFileContextURI extends FileContextURIBase {
+ private Configuration conf;
+ private boolean hasMetadataStore;
+
@Before
public void setUp() throws IOException, Exception {
- Configuration conf = new Configuration();
+ conf = new Configuration();
+ try(S3AFileSystem s3aFS = createTestFileSystem(conf)) {
+ hasMetadataStore = s3aFS.hasMetadataStore();
+ }
fc1 = S3ATestUtils.createTestFileContext(conf);
fc2 = S3ATestUtils.createTestFileContext(conf); //different object, same FS
super.setUp();
@@ -41,4 +51,11 @@ public class ITestS3AFileContextURI extends FileContextURIBase {
// (the statistics tested with this method are not relevant for an S3FS)
}
+ @Test
+ @Override
+ public void testModificationTime() throws IOException {
+ // skip modtime tests as there may be some inconsistency during creation
+ assume("modification time tests are skipped", !hasMetadataStore);
+ super.testModificationTime();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java
new file mode 100644
index 0000000..921d4a6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractMSContract.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.IOException;
+
+/**
+ * Test specification for MetadataStore contract tests. Supplies configuration
+ * and MetadataStore instance.
+ */
+public abstract class AbstractMSContract {
+
+ public abstract FileSystem getFileSystem() throws IOException;
+ public abstract MetadataStore getMetadataStore() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
new file mode 100644
index 0000000..ceacdf3
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
+
+/**
+ * Common functionality for S3GuardTool test cases.
+ */
+public abstract class AbstractS3GuardToolTestBase extends AbstractS3ATestBase {
+
+ protected static final String OWNER = "hdfs";
+
+ private MetadataStore ms;
+
+ protected static void expectResult(int expected,
+ String message,
+ S3GuardTool tool,
+ String... args) throws Exception {
+ assertEquals(message, expected, tool.run(args));
+ }
+
+ protected static void expectSuccess(
+ String message,
+ S3GuardTool tool,
+ String... args) throws Exception {
+ assertEquals(message, SUCCESS, tool.run(args));
+ }
+
+ protected MetadataStore getMetadataStore() {
+ return ms;
+ }
+
+ protected abstract MetadataStore newMetadataStore();
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ S3ATestUtils.assumeS3GuardState(true, getConfiguration());
+ ms = newMetadataStore();
+ ms.initialize(getFileSystem());
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ IOUtils.cleanupWithLogger(LOG, ms);
+ }
+
+ protected void mkdirs(Path path, boolean onS3, boolean onMetadataStore)
+ throws IOException {
+ if (onS3) {
+ getFileSystem().mkdirs(path);
+ }
+ if (onMetadataStore) {
+ S3AFileStatus status = new S3AFileStatus(true, path, OWNER);
+ ms.put(new PathMetadata(status));
+ }
+ }
+
+ protected static void putFile(MetadataStore ms, S3AFileStatus f)
+ throws IOException {
+ assertNotNull(f);
+ ms.put(new PathMetadata(f));
+ Path parent = f.getPath().getParent();
+ while (parent != null) {
+ S3AFileStatus dir = new S3AFileStatus(false, parent, f.getOwner());
+ ms.put(new PathMetadata(dir));
+ parent = parent.getParent();
+ }
+ }
+
+ /**
+ * Create file either on S3 or in metadata store.
+ * @param path the file path.
+ * @param onS3 set to true to create the file on S3.
+ * @param onMetadataStore set to true to create the file on the
+ * metadata store.
+ * @throws IOException IO problem
+ */
+ protected void createFile(Path path, boolean onS3, boolean onMetadataStore)
+ throws IOException {
+ if (onS3) {
+ ContractTestUtils.touch(getFileSystem(), path);
+ }
+
+ if (onMetadataStore) {
+ S3AFileStatus status = new S3AFileStatus(100L, System.currentTimeMillis(),
+ getFileSystem().qualify(path), 512L, "hdfs");
+ putFile(ms, status);
+ }
+ }
+
+ private void testPruneCommand(Configuration cmdConf, String...args)
+ throws Exception {
+ Path parent = path("prune-cli");
+ try {
+ getFileSystem().mkdirs(parent);
+
+ S3GuardTool.Prune cmd = new S3GuardTool.Prune(cmdConf);
+ cmd.setMetadataStore(ms);
+
+ createFile(new Path(parent, "stale"), true, true);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(2));
+ createFile(new Path(parent, "fresh"), true, true);
+
+ assertEquals(2, ms.listChildren(parent).getListing().size());
+ expectSuccess("Prune command did not exit successfully - see output", cmd,
+ args);
+ assertEquals(1, ms.listChildren(parent).getListing().size());
+ } finally {
+ getFileSystem().delete(parent, true);
+ ms.prune(Long.MAX_VALUE);
+ }
+ }
+
+ @Test
+ public void testPruneCommandCLI() throws Exception {
+ String testPath = path("testPruneCommandCLI").toString();
+ testPruneCommand(getFileSystem().getConf(),
+ "prune", "-seconds", "1", testPath);
+ }
+
+ @Test
+ public void testPruneCommandConf() throws Exception {
+ getConfiguration().setLong(Constants.S3GUARD_CLI_PRUNE_AGE,
+ TimeUnit.SECONDS.toMillis(1));
+ String testPath = path("testPruneCommandConf").toString();
+ testPruneCommand(getConfiguration(), "prune", testPath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java
new file mode 100644
index 0000000..0291acd
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBLocalClientFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.File;
+import java.io.IOException;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.dynamodbv2.local.main.ServerRunner;
+import com.amazonaws.services.dynamodbv2.local.server.DynamoDBProxyServer;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.s3a.DefaultS3ClientFactory;
+import org.apache.hadoop.net.ServerSocketUtil;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+import static org.apache.hadoop.fs.s3a.s3guard.DynamoDBClientFactory.DefaultDynamoDBClientFactory.getRegion;
+
+/**
+ * A DynamoDBClientFactory implementation that creates AmazonDynamoDB clients
+ * against an in-memory DynamoDBLocal server instance.
+ *
+ * You won't be charged bills for issuing any DynamoDB requests. However, the
+ * DynamoDBLocal is considered a simulator of the DynamoDB web service, so it
+ * may be stale or different. For example, the throttling is not yet supported
+ * in DynamoDBLocal. This is for testing purpose only.
+ *
+ * To use this for creating DynamoDB client in tests:
+ * <ol>
+ * <li>
+ * As all DynamoDBClientFactory implementations, this should be configured.
+ * </li>
+ * <li>
+ * The singleton DynamoDBLocal server instance is started automatically when
+ * creating the AmazonDynamoDB client for the first time. It still merits to
+ * launch the server before all the tests and fail fast if error happens.
+ * </li>
+ * <li>
+ * The server can be stopped explicitly, which is not actually needed in
+ * tests as JVM termination will do that.
+ * </li>
+ * </ol>
+ *
+ * @see DefaultDynamoDBClientFactory
+ */
+public class DynamoDBLocalClientFactory extends Configured
+ implements DynamoDBClientFactory {
+
+ /** The DynamoDBLocal dynamoDBLocalServer instance for testing. */
+ private static DynamoDBProxyServer dynamoDBLocalServer;
+ private static String ddbEndpoint;
+
+ private static final String SYSPROP_SQLITE_LIB = "sqlite4java.library.path";
+
+ @Override
+ public AmazonDynamoDB createDynamoDBClient(String defaultRegion)
+ throws IOException {
+ startSingletonServer();
+
+ final Configuration conf = getConf();
+ final AWSCredentialsProvider credentials =
+ createAWSCredentialProviderSet(null, conf);
+ final ClientConfiguration awsConf =
+ DefaultS3ClientFactory.createAwsConf(conf);
+ // fail fast in case of service errors
+ awsConf.setMaxErrorRetry(3);
+
+ final String region = getRegion(conf, defaultRegion);
+ LOG.info("Creating DynamoDBLocal client using endpoint {} in region {}",
+ ddbEndpoint, region);
+
+ return AmazonDynamoDBClientBuilder.standard()
+ .withCredentials(credentials)
+ .withClientConfiguration(awsConf)
+ .withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(ddbEndpoint, region))
+ .build();
+ }
+
+ /**
+ * Start a singleton in-memory DynamoDBLocal server if not started yet.
+ * @throws IOException if any error occurs
+ */
+ public synchronized static void startSingletonServer() throws IOException {
+ if (dynamoDBLocalServer != null) {
+ return;
+ }
+
+ // Set this property if it has not been set elsewhere
+ if (StringUtils.isEmpty(System.getProperty(SYSPROP_SQLITE_LIB))) {
+ String projectBuildDir = System.getProperty("project.build.directory");
+ if (StringUtils.isEmpty(projectBuildDir)) {
+ projectBuildDir = "target";
+ }
+ // sqlite4java lib should have been copied to $projectBuildDir/native-libs
+ System.setProperty(SYSPROP_SQLITE_LIB,
+ projectBuildDir + File.separator + "native-libs");
+ LOG.info("Setting {} -> {}",
+ SYSPROP_SQLITE_LIB, System.getProperty(SYSPROP_SQLITE_LIB));
+ }
+
+ try {
+ // Start an in-memory local DynamoDB instance
+ final String port = String.valueOf(ServerSocketUtil.getPort(0, 100));
+ ddbEndpoint = "http://localhost:" + port;
+ dynamoDBLocalServer = ServerRunner.createServerFromCommandLineArgs(
+ new String[]{"-inMemory", "-port", port});
+ dynamoDBLocalServer.start();
+ LOG.info("DynamoDBLocal singleton server was started at {}", ddbEndpoint);
+ } catch (Exception t) {
+ String msg = "Error starting DynamoDBLocal server at " + ddbEndpoint
+ + " " + t;
+ LOG.error(msg, t);
+ throw new IOException(msg, t);
+ }
+ }
+
+ /**
+ * Stop the in-memory DynamoDBLocal server if it is started.
+ * @throws IOException if any error occurs
+ */
+ public synchronized static void stopSingletonServer() throws IOException {
+ if (dynamoDBLocalServer != null) {
+ LOG.info("Shutting down the in-memory DynamoDBLocal server");
+ try {
+ dynamoDBLocalServer.stop();
+ } catch (Throwable t) {
+ String msg = "Error stopping DynamoDBLocal server at " + ddbEndpoint;
+ LOG.error(msg, t);
+ throw new IOException(msg, t);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
new file mode 100644
index 0000000..c6838a0
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardConcurrentOps.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.Constants;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
+
+/**
+ * Tests concurrent operations on S3Guard.
+ */
+public class ITestS3GuardConcurrentOps extends AbstractS3ATestBase {
+
+ @Rule
+ public final Timeout timeout = new Timeout(5 * 60 * 1000);
+
+ private void failIfTableExists(DynamoDB db, String tableName) {
+ boolean tableExists = true;
+ try {
+ Table table = db.getTable(tableName);
+ table.describe();
+ } catch (ResourceNotFoundException e) {
+ tableExists = false;
+ }
+ if (tableExists) {
+ fail("Table already exists: " + tableName);
+ }
+ }
+
+ private void deleteTable(DynamoDB db, String tableName) throws
+ InterruptedException {
+ try {
+ Table table = db.getTable(tableName);
+ table.waitForActive();
+ table.delete();
+ table.waitForDelete();
+ } catch (ResourceNotFoundException e) {
+ LOG.warn("Failed to delete {}, as it was not found", tableName, e);
+ }
+ }
+
+ @Test
+ public void testConcurrentTableCreations() throws Exception {
+ final Configuration conf = getConfiguration();
+ Assume.assumeTrue("Test only applies when DynamoDB is used for S3Guard",
+ conf.get(Constants.S3_METADATA_STORE_IMPL).equals(
+ Constants.S3GUARD_METASTORE_DYNAMO));
+
+ DynamoDBMetadataStore ms = new DynamoDBMetadataStore();
+ ms.initialize(getFileSystem());
+ DynamoDB db = ms.getDynamoDB();
+
+ String tableName = "testConcurrentTableCreations" + new Random().nextInt();
+ conf.setBoolean(Constants.S3GUARD_DDB_TABLE_CREATE_KEY, true);
+ conf.set(Constants.S3GUARD_DDB_TABLE_NAME_KEY, tableName);
+
+ String region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
+ if (StringUtils.isEmpty(region)) {
+ // no region set, so pick it up from the test bucket
+ conf.set(S3GUARD_DDB_REGION_KEY, getFileSystem().getBucketLocation());
+ }
+ int concurrentOps = 16;
+ int iterations = 4;
+
+ failIfTableExists(db, tableName);
+
+ for (int i = 0; i < iterations; i++) {
+ ExecutorService executor = Executors.newFixedThreadPool(
+ concurrentOps, new ThreadFactory() {
+ private AtomicInteger count = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r,
+ "testConcurrentTableCreations" + count.getAndIncrement());
+ }
+ });
+ ((ThreadPoolExecutor) executor).prestartAllCoreThreads();
+ Future<Exception>[] futures = new Future[concurrentOps];
+ for (int f = 0; f < concurrentOps; f++) {
+ final int index = f;
+ futures[f] = executor.submit(new Callable<Exception>() {
+ @Override
+ public Exception call() throws Exception {
+
+ ContractTestUtils.NanoTimer timer =
+ new ContractTestUtils.NanoTimer();
+
+ Exception result = null;
+ try (DynamoDBMetadataStore store = new DynamoDBMetadataStore()) {
+ store.initialize(conf);
+ } catch (Exception e) {
+ LOG.error(e.getClass() + ": " + e.getMessage());
+ result = e;
+ }
+
+ timer.end("Parallel DynamoDB client creation %d", index);
+ LOG.info("Parallel DynamoDB client creation {} ran from {} to {}",
+ index, timer.getStartTime(), timer.getEndTime());
+ return result;
+ }
+ });
+ }
+ List<Exception> exceptions = new ArrayList<>(concurrentOps);
+ for (int f = 0; f < concurrentOps; f++) {
+ Exception outcome = futures[f].get();
+ if (outcome != null) {
+ exceptions.add(outcome);
+ }
+ }
+ deleteTable(db, tableName);
+ int exceptionsThrown = exceptions.size();
+ if (exceptionsThrown > 0) {
+ // at least one exception was thrown. Fail the test & nest the first
+ // exception caught
+ throw new AssertionError(exceptionsThrown + "/" + concurrentOps +
+ " threads threw exceptions while initializing on iteration " + i,
+ exceptions.get(0));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
new file mode 100644
index 0000000..c13dfc4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolDynamoDB.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Callable;
+
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Table;
+import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Destroy;
+import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Init;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+/**
+ * Test S3Guard related CLI commands against DynamoDB.
+ */
+public class ITestS3GuardToolDynamoDB extends AbstractS3GuardToolTestBase {
+
+ @Override
+ protected MetadataStore newMetadataStore() {
+ return new DynamoDBMetadataStore();
+ }
+
+ // Check the existence of a given DynamoDB table.
+ private static boolean exist(DynamoDB dynamoDB, String tableName) {
+ assertNotNull(dynamoDB);
+ assertNotNull(tableName);
+ assertFalse("empty table name", tableName.isEmpty());
+ try {
+ Table table = dynamoDB.getTable(tableName);
+ table.describe();
+ } catch (ResourceNotFoundException e) {
+ return false;
+ }
+ return true;
+ }
+
+ @Test
+ public void testInvalidRegion() throws Exception {
+ final String testTableName = "testInvalidRegion" + new Random().nextInt();
+ final String testRegion = "invalidRegion";
+ // Initialize MetadataStore
+ final Init initCmd = new Init(getFileSystem().getConf());
+ LambdaTestUtils.intercept(IOException.class,
+ new Callable<String>() {
+ @Override
+ public String call() throws Exception {
+ int res = initCmd.run(new String[]{
+ "init",
+ "-region", testRegion,
+ "-meta", "dynamodb://" + testTableName
+ });
+ return "Use of invalid region did not fail, returning " + res
+ + "- table may have been " +
+ "created and not cleaned up: " + testTableName;
+ }
+ });
+ }
+
+ @Test
+ public void testDynamoDBInitDestroyCycle() throws Exception {
+ String testTableName = "testDynamoDBInitDestroy" + new Random().nextInt();
+ String testS3Url = path(testTableName).toString();
+ S3AFileSystem fs = getFileSystem();
+ DynamoDB db = null;
+ try {
+ // Initialize MetadataStore
+ Init initCmd = new Init(fs.getConf());
+ expectSuccess("Init command did not exit successfully - see output",
+ initCmd,
+ "init", "-meta", "dynamodb://" + testTableName, testS3Url);
+ // Verify it exists
+ MetadataStore ms = getMetadataStore();
+ assertTrue("metadata store should be DynamoDBMetadataStore",
+ ms instanceof DynamoDBMetadataStore);
+ DynamoDBMetadataStore dynamoMs = (DynamoDBMetadataStore) ms;
+ db = dynamoMs.getDynamoDB();
+ assertTrue(String.format("%s does not exist", testTableName),
+ exist(db, testTableName));
+
+ // Destroy MetadataStore
+ Destroy destroyCmd = new Destroy(fs.getConf());
+
+ expectSuccess("Destroy command did not exit successfully - see output",
+ destroyCmd,
+ "destroy", "-meta", "dynamodb://" + testTableName, testS3Url);
+ // Verify it does not exist
+ assertFalse(String.format("%s still exists", testTableName),
+ exist(db, testTableName));
+
+ // delete again and expect success again
+ expectSuccess("Destroy command did not exit successfully - see output",
+ destroyCmd,
+ "destroy", "-meta", "dynamodb://" + testTableName, testS3Url);
+ } catch (ResourceNotFoundException e) {
+ throw new AssertionError(
+ String.format("DynamoDB table %s does not exist", testTableName),
+ e);
+ } finally {
+ LOG.warn("Table may have not been cleaned up: " +
+ testTableName);
+ if (db != null) {
+ Table table = db.getTable(testTableName);
+ if (table != null) {
+ try {
+ table.delete();
+ table.waitForDelete();
+ } catch (ResourceNotFoundException e) { /* Ignore */ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/621b43e2/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
new file mode 100644
index 0000000..181cdfb
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/ITestS3GuardToolLocal.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.s3guard;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.Diff;
+
+import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
+
+/**
+ * Test S3Guard related CLI commands against a LocalMetadataStore.
+ */
+public class ITestS3GuardToolLocal extends AbstractS3GuardToolTestBase {
+
+ @Override
+ protected MetadataStore newMetadataStore() {
+ return new LocalMetadataStore();
+ }
+
+ @Test
+ public void testImportCommand() throws Exception {
+ S3AFileSystem fs = getFileSystem();
+ MetadataStore ms = getMetadataStore();
+ Path parent = path("test-import");
+ fs.mkdirs(parent);
+ Path dir = new Path(parent, "a");
+ fs.mkdirs(dir);
+ Path emptyDir = new Path(parent, "emptyDir");
+ fs.mkdirs(emptyDir);
+ for (int i = 0; i < 10; i++) {
+ String child = String.format("file-%d", i);
+ try (FSDataOutputStream out = fs.create(new Path(dir, child))) {
+ out.write(1);
+ }
+ }
+
+ S3GuardTool.Import cmd = new S3GuardTool.Import(fs.getConf());
+ cmd.setStore(ms);
+
+ expectSuccess("Import command did not exit successfully - see output",
+ cmd,
+ "import", parent.toString());
+
+ DirListingMetadata children =
+ ms.listChildren(dir);
+ assertEquals("Unexpected number of paths imported", 10, children
+ .getListing().size());
+ assertEquals("Expected 2 items: empty directory and a parent directory", 2,
+ ms.listChildren(parent).getListing().size());
+ // assertTrue(children.isAuthoritative());
+ }
+
+ @Test
+ public void testDiffCommand() throws IOException {
+ S3AFileSystem fs = getFileSystem();
+ MetadataStore ms = getMetadataStore();
+ Set<Path> filesOnS3 = new HashSet<>(); // files on S3.
+ Set<Path> filesOnMS = new HashSet<>(); // files on metadata store.
+
+ Path testPath = path("test-diff");
+ mkdirs(testPath, true, true);
+
+ Path msOnlyPath = new Path(testPath, "ms_only");
+ mkdirs(msOnlyPath, false, true);
+ filesOnMS.add(msOnlyPath);
+ for (int i = 0; i < 5; i++) {
+ Path file = new Path(msOnlyPath, String.format("file-%d", i));
+ createFile(file, false, true);
+ filesOnMS.add(file);
+ }
+
+ Path s3OnlyPath = new Path(testPath, "s3_only");
+ mkdirs(s3OnlyPath, true, false);
+ filesOnS3.add(s3OnlyPath);
+ for (int i = 0; i < 5; i++) {
+ Path file = new Path(s3OnlyPath, String.format("file-%d", i));
+ createFile(file, true, false);
+ filesOnS3.add(file);
+ }
+
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(buf);
+ Diff cmd = new Diff(fs.getConf());
+ cmd.setStore(ms);
+ assertEquals("Diff command did not exit successfully - see output", SUCCESS,
+ cmd.run(new String[]{"diff", "-meta", "local://metadata",
+ testPath.toString()}, out));
+ out.close();
+
+ Set<Path> actualOnS3 = new HashSet<>();
+ Set<Path> actualOnMS = new HashSet<>();
+ boolean duplicates = false;
+ try (BufferedReader reader =
+ new BufferedReader(new InputStreamReader(
+ new ByteArrayInputStream(buf.toByteArray())))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ String[] fields = line.split("\\s");
+ assertEquals("[" + line + "] does not have enough fields",
+ 4, fields.length);
+ String where = fields[0];
+ Path path = new Path(fields[3]);
+ if (Diff.S3_PREFIX.equals(where)) {
+ duplicates = duplicates || actualOnS3.contains(path);
+ actualOnS3.add(path);
+ } else if (Diff.MS_PREFIX.equals(where)) {
+ duplicates = duplicates || actualOnMS.contains(path);
+ actualOnMS.add(path);
+ } else {
+ fail("Unknown prefix: " + where);
+ }
+ }
+ }
+ String actualOut = out.toString();
+ assertEquals("Mismatched metadata store outputs: " + actualOut,
+ filesOnMS, actualOnMS);
+ assertEquals("Mismatched s3 outputs: " + actualOut, filesOnS3, actualOnS3);
+ assertFalse("Diff contained duplicates", duplicates);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org