You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2019/10/01 17:11:43 UTC
[hadoop] branch trunk updated: HADOOP-16458.
LocatedFileStatusFetcher.getFileStatuses failing intermittently with S3
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1921e94 HADOOP-16458. LocatedFileStatusFetcher.getFileStatuses failing intermittently with S3
1921e94 is described below
commit 1921e94292f0820985a0cfbf8922a2a1a67fe921
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Tue Oct 1 18:10:29 2019 +0100
HADOOP-16458. LocatedFileStatusFetcher.getFileStatuses failing intermittently with S3
Contributed by Steve Loughran.
Includes
-S3A glob scans don't bother trying to resolve symlinks
-stack traces don't get lost in getFileStatuses() when exceptions are wrapped
-debug level logging of what is up in Globber
-Contains HADOOP-13373. Add S3A implementation of FSMainOperationsBaseTest.
-ITestRestrictedReadAccess tests incomplete read access to files.
This adds a builder API for constructing globbers which other stores can use
so that they too can skip symlink resolution when not needed.
Change-Id: I23bcdb2783d6bd77cf168fdc165b1b4b334d91c7
---
.../main/java/org/apache/hadoop/fs/FileSystem.java | 7 +-
.../main/java/org/apache/hadoop/fs/Globber.java | 208 +++++-
.../org/apache/hadoop/test/LambdaTestUtils.java | 3 +
.../org/apache/hadoop/mapred/FileInputFormat.java | 5 +-
.../hadoop/mapred/InvalidInputException.java | 4 +
.../hadoop/mapred/LocatedFileStatusFetcher.java | 66 +-
.../mapreduce/lib/input/FileInputFormat.java | 6 +-
.../mapreduce/lib/input/InvalidInputException.java | 4 +
.../java/org/apache/hadoop/fs/s3a/Invoker.java | 3 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 21 +-
.../fs/s3a/ITestLocatedFileStatusFetcher.java | 40 ++
.../hadoop/fs/s3a/ITestS3AFSMainOperations.java | 65 ++
.../fs/s3a/auth/ITestRestrictedReadAccess.java | 707 +++++++++++++++++++++
13 files changed, 1089 insertions(+), 50 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 4e9f172..2376c05 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -2064,7 +2064,12 @@ public abstract class FileSystem extends Configured
* @throws IOException IO failure
*/
public FileStatus[] globStatus(Path pathPattern) throws IOException {
- return new Globber(this, pathPattern, DEFAULT_FILTER).glob();
+ return Globber.createGlobber(this)
+ .withPathPattern(pathPattern)
+ .withPathFiltern(DEFAULT_FILTER)
+ .withResolveSymlinks(true)
+ .build()
+ .glob();
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java
index b241a94..f301f22 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java
@@ -25,15 +25,24 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.util.DurationInfo;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Implementation of {@link FileSystem#globStatus(Path, PathFilter)}.
+ * This has historically been package-private; it has been opened
+ * up for object stores within the {@code hadoop-*} codebase ONLY.
+ * It could be expanded for external store implementations in future.
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
-class Globber {
+public class Globber {
public static final Logger LOG =
LoggerFactory.getLogger(Globber.class.getName());
@@ -42,21 +51,62 @@ class Globber {
private final Path pathPattern;
private final PathFilter filter;
private final Tracer tracer;
-
- public Globber(FileSystem fs, Path pathPattern, PathFilter filter) {
+ private final boolean resolveSymlinks;
+
+ Globber(FileSystem fs, Path pathPattern, PathFilter filter) {
+ this.fs = fs;
+ this.fc = null;
+ this.pathPattern = pathPattern;
+ this.filter = filter;
+ this.tracer = FsTracer.get(fs.getConf());
+ this.resolveSymlinks = true;
+ }
+
+ Globber(FileContext fc, Path pathPattern, PathFilter filter) {
+ this.fs = null;
+ this.fc = fc;
+ this.pathPattern = pathPattern;
+ this.filter = filter;
+ this.tracer = fc.getTracer();
+ this.resolveSymlinks = true;
+ }
+
+ /**
+ * Filesystem constructor for use by {@link GlobBuilder}.
+ * @param fs filesystem
+ * @param pathPattern path pattern
+ * @param filter optional filter
+ * @param resolveSymlinks should symlinks be resolved.
+ */
+ private Globber(FileSystem fs, Path pathPattern, PathFilter filter,
+ boolean resolveSymlinks) {
this.fs = fs;
this.fc = null;
this.pathPattern = pathPattern;
this.filter = filter;
+ this.resolveSymlinks = resolveSymlinks;
this.tracer = FsTracer.get(fs.getConf());
+ LOG.debug("Created Globber for path={}, symlinks={}",
+ pathPattern, resolveSymlinks);
}
- public Globber(FileContext fc, Path pathPattern, PathFilter filter) {
+ /**
+ * File Context constructor for use by {@link GlobBuilder}.
+ * @param fc file context
+ * @param pathPattern path pattern
+ * @param filter optional filter
+ * @param resolveSymlinks should symlinks be resolved.
+ */
+ private Globber(FileContext fc, Path pathPattern, PathFilter filter,
+ boolean resolveSymlinks) {
this.fs = null;
this.fc = fc;
this.pathPattern = pathPattern;
this.filter = filter;
+ this.resolveSymlinks = resolveSymlinks;
this.tracer = fc.getTracer();
+ LOG.debug("Created Globber path={}, symlinks={}",
+ pathPattern, resolveSymlinks);
}
private FileStatus getFileStatus(Path path) throws IOException {
@@ -67,6 +117,7 @@ class Globber {
return fc.getFileStatus(path);
}
} catch (FileNotFoundException e) {
+ LOG.debug("getFileStatus({}) failed; returning null", path, e);
return null;
}
}
@@ -79,6 +130,7 @@ class Globber {
return fc.util().listStatus(path);
}
} catch (FileNotFoundException e) {
+ LOG.debug("listStatus({}) failed; returning empty array", path, e);
return new FileStatus[0];
}
}
@@ -107,7 +159,7 @@ class Globber {
*/
private static List<String> getPathComponents(String path)
throws IOException {
- ArrayList<String> ret = new ArrayList<String>();
+ ArrayList<String> ret = new ArrayList<>();
for (String component : path.split(Path.SEPARATOR)) {
if (!component.isEmpty()) {
ret.add(component);
@@ -145,7 +197,8 @@ class Globber {
public FileStatus[] glob() throws IOException {
TraceScope scope = tracer.newScope("Globber#glob");
scope.addKVAnnotation("pattern", pathPattern.toUri().getPath());
- try {
+ try (DurationInfo ignored = new DurationInfo(LOG, false,
+ "glob %s", pathPattern)) {
return doGlob();
} finally {
scope.close();
@@ -164,10 +217,11 @@ class Globber {
String pathPatternString = pathPattern.toUri().getPath();
List<String> flattenedPatterns = GlobExpander.expand(pathPatternString);
+ LOG.debug("Filesystem glob {}", pathPatternString);
// Now loop over all flattened patterns. In every case, we'll be trying to
// match them to entries in the filesystem.
ArrayList<FileStatus> results =
- new ArrayList<FileStatus>(flattenedPatterns.size());
+ new ArrayList<>(flattenedPatterns.size());
boolean sawWildcard = false;
for (String flatPattern : flattenedPatterns) {
// Get the absolute path for this flattened pattern. We couldn't do
@@ -175,13 +229,14 @@ class Globber {
// path you go down influences how the path must be made absolute.
Path absPattern = fixRelativePart(new Path(
flatPattern.isEmpty() ? Path.CUR_DIR : flatPattern));
+ LOG.debug("Pattern: {}", absPattern);
// Now we break the flattened, absolute pattern into path components.
// For example, /a/*/c would be broken into the list [a, *, c]
List<String> components =
getPathComponents(absPattern.toUri().getPath());
// Starting out at the root of the filesystem, we try to match
// filesystem entries against pattern components.
- ArrayList<FileStatus> candidates = new ArrayList<FileStatus>(1);
+ ArrayList<FileStatus> candidates = new ArrayList<>(1);
// To get the "real" FileStatus of root, we'd have to do an expensive
// RPC to the NameNode. So we create a placeholder FileStatus which has
// the correct path, but defaults for the rest of the information.
@@ -206,12 +261,13 @@ class Globber {
for (int componentIdx = 0; componentIdx < components.size();
componentIdx++) {
ArrayList<FileStatus> newCandidates =
- new ArrayList<FileStatus>(candidates.size());
+ new ArrayList<>(candidates.size());
GlobFilter globFilter = new GlobFilter(components.get(componentIdx));
String component = unescapePathComponent(components.get(componentIdx));
if (globFilter.hasPattern()) {
sawWildcard = true;
}
+ LOG.debug("Component {}, patterned={}", component, sawWildcard);
if (candidates.isEmpty() && sawWildcard) {
// Optimization: if there are no more candidates left, stop examining
// the path components. We can only do this if we've already seen
@@ -245,19 +301,31 @@ class Globber {
// incorrectly conclude that /a/b was a file and should not match
// /a/*/*. So we use getFileStatus of the path we just listed to
// disambiguate.
- Path path = candidate.getPath();
- FileStatus status = getFileStatus(path);
- if (status == null) {
- // null means the file was not found
- LOG.warn("File/directory {} not found:"
- + " it may have been deleted."
- + " If this is an object store, this can be a sign of"
- + " eventual consistency problems.",
- path);
- continue;
- }
- if (!status.isDirectory()) {
- continue;
+ if (resolveSymlinks) {
+ LOG.debug("listStatus found one entry; disambiguating {}",
+ children[0]);
+ Path path = candidate.getPath();
+ FileStatus status = getFileStatus(path);
+ if (status == null) {
+ // null means the file was not found
+ LOG.warn("File/directory {} not found:"
+ + " it may have been deleted."
+ + " If this is an object store, this can be a sign of"
+ + " eventual consistency problems.",
+ path);
+ continue;
+ }
+ if (!status.isDirectory()) {
+ LOG.debug("Resolved entry is a file; skipping: {}", status);
+ continue;
+ }
+ } else {
+ // there's no symlinks in this store, so no need to issue
+ // another call, just see if the result is a directory or a file
+ if (children[0].getPath().equals(candidate.getPath())) {
+ // the listing status is of a file
+ continue;
+ }
}
}
for (FileStatus child : children) {
@@ -312,6 +380,8 @@ class Globber {
*/
if ((!sawWildcard) && results.isEmpty() &&
(flattenedPatterns.size() <= 1)) {
+ LOG.debug("No matches found and there was no wildcard in the path {}",
+ pathPattern);
return null;
}
/*
@@ -324,4 +394,98 @@ class Globber {
Arrays.sort(ret);
return ret;
}
+
+ /**
+ * Create a builder for a Globber, bonded to the specific filesystem.
+ * @param filesystem filesystem
+ * @return the builder to finish configuring.
+ */
+ public static GlobBuilder createGlobber(FileSystem filesystem) {
+ return new GlobBuilder(filesystem);
+ }
+
+ /**
+ * Create a builder for a Globber, bonded to the specific file
+ * context.
+ * @param fileContext file context.
+ * @return the builder to finish configuring.
+ */
+ public static GlobBuilder createGlobber(FileContext fileContext) {
+ return new GlobBuilder(fileContext);
+ }
+
+ /**
+ * Builder for Globber instances.
+ */
+ @InterfaceAudience.Private
+ public static class GlobBuilder {
+
+ private final FileSystem fs;
+
+ private final FileContext fc;
+
+ private Path pathPattern;
+
+ private PathFilter filter;
+
+ private boolean resolveSymlinks = true;
+
+ /**
+ * Construct bonded to a file context.
+ * @param fc file context.
+ */
+ public GlobBuilder(final FileContext fc) {
+ this.fs = null;
+ this.fc = checkNotNull(fc);
+ }
+
+ /**
+ * Construct bonded to a filesystem.
+ * @param fs file system.
+ */
+ public GlobBuilder(final FileSystem fs) {
+ this.fs = checkNotNull(fs);
+ this.fc = null;
+ }
+
+ /**
+ * Set the path pattern.
+ * @param pattern pattern to use.
+ * @return the builder
+ */
+ public GlobBuilder withPathPattern(Path pattern) {
+ pathPattern = pattern;
+ return this;
+ }
+
+ /**
+ * Set the path filter.
+ * @param pathFilter filter
+ * @return the builder
+ */
+ public GlobBuilder withPathFiltern(PathFilter pathFilter) {
+ filter = pathFilter;
+ return this;
+ }
+
+ /**
+ * Set the symlink resolution policy.
+ * @param resolve resolution flag.
+ * @return the builder
+ */
+ public GlobBuilder withResolveSymlinks(boolean resolve) {
+ resolveSymlinks = resolve;
+ return this;
+ }
+
+ /**
+ * Build the Globber.
+ * @return a new instance.
+ */
+ public Globber build() {
+ return fs != null
+ ? new Globber(fs, pathPattern, filter, resolveSymlinks)
+ : new Globber(fc, pathPattern, filter, resolveSymlinks);
+ }
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
index c1b6cc4..db36154 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -575,6 +575,9 @@ public final class LambdaTestUtils {
if (o == null) {
return NULL_RESULT;
} else {
+ if (o instanceof String) {
+ return '"' + (String)o + '"';
+ }
try {
return o.toString();
} catch (Exception e) {
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
index afdc0ca..b3e2b4a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -250,7 +251,9 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
job, dirs, recursive, inputFilter, false);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
- throw new IOException("Interrupted while getting file statuses");
+ throw (IOException)
+ new InterruptedIOException("Interrupted while getting file statuses")
+ .initCause(e);
}
result = Iterables.toArray(locatedFiles, FileStatus.class);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java
index e1bb36b..faf1a38 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java
@@ -38,10 +38,14 @@ public class InvalidInputException extends IOException {
/**
* Create the exception with the given list.
+ * The first element of the list is used as the init cause value.
* @param probs the list of problems to report. this list is not copied.
*/
public InvalidInputException(List<IOException> probs) {
problems = probs;
+ if (!probs.isEmpty()) {
+ initCause(probs.get(0));
+ }
}
/**
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
index 3869c49..a248f14 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
@@ -46,15 +46,23 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.util.concurrent.HadoopExecutors;
/**
* Utility class to fetch block locations for specified Input paths using a
* configured number of threads.
+ * The thread count is determined from the value of
+ * "mapreduce.input.fileinputformat.list-status.num-threads" in the
+ * configuration.
*/
@Private
public class LocatedFileStatusFetcher {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName());
private final Path[] inputDirs;
private final PathFilter inputFilter;
private final Configuration conf;
@@ -64,7 +72,7 @@ public class LocatedFileStatusFetcher {
private final ExecutorService rawExec;
private final ListeningExecutorService exec;
private final BlockingQueue<List<FileStatus>> resultQueue;
- private final List<IOException> invalidInputErrors = new LinkedList<IOException>();
+ private final List<IOException> invalidInputErrors = new LinkedList<>();
private final ProcessInitialInputPathCallback processInitialInputPathCallback =
new ProcessInitialInputPathCallback();
@@ -79,25 +87,30 @@ public class LocatedFileStatusFetcher {
private volatile Throwable unknownError;
/**
+ * Instantiate.
+ * The newApi switch is only used to configure what exception is raised
+ * on failure of {@link #getFileStatuses()}, it does not change the algorithm.
* @param conf configuration for the job
* @param dirs the initial list of paths
- * @param recursive whether to traverse the patchs recursively
+ * @param recursive whether to traverse the paths recursively
* @param inputFilter inputFilter to apply to the resulting paths
* @param newApi whether using the mapred or mapreduce API
* @throws InterruptedException
* @throws IOException
*/
public LocatedFileStatusFetcher(Configuration conf, Path[] dirs,
- boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException,
- IOException {
+ boolean recursive, PathFilter inputFilter, boolean newApi)
+ throws InterruptedException, IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
+ LOG.debug("Instantiated LocatedFileStatusFetcher with {} threads",
+ numThreads);
rawExec = HadoopExecutors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
exec = MoreExecutors.listeningDecorator(rawExec);
- resultQueue = new LinkedBlockingQueue<List<FileStatus>>();
+ resultQueue = new LinkedBlockingQueue<>();
this.conf = conf;
this.inputDirs = dirs;
this.recursive = recursive;
@@ -106,10 +119,13 @@ public class LocatedFileStatusFetcher {
}
/**
- * Start executing and return FileStatuses based on the parameters specified
+ * Start executing and return FileStatuses based on the parameters specified.
* @return fetched file statuses
- * @throws InterruptedException
- * @throws IOException
+ * @throws InterruptedException interruption waiting for results.
+ * @throws IOException IO failure or other error.
+ * @throws InvalidInputException on an invalid input and the old API
+ * @throws org.apache.hadoop.mapreduce.lib.input.InvalidInputException on an
+ * invalid input and the new API.
*/
public Iterable<FileStatus> getFileStatuses() throws InterruptedException,
IOException {
@@ -117,6 +133,7 @@ public class LocatedFileStatusFetcher {
// rest being scheduled does not lead to a termination.
runningTasks.incrementAndGet();
for (Path p : inputDirs) {
+ LOG.debug("Queuing scan of directory {}", p);
runningTasks.incrementAndGet();
ListenableFuture<ProcessInitialInputPathCallable.Result> future = exec
.submit(new ProcessInitialInputPathCallable(p, conf, inputFilter));
@@ -128,14 +145,20 @@ public class LocatedFileStatusFetcher {
lock.lock();
try {
+ LOG.debug("Waiting scan completion");
while (runningTasks.get() != 0 && unknownError == null) {
condition.await();
}
} finally {
lock.unlock();
}
+ // either the scan completed or an error was raised.
+ // in the case of an error shutting down the executor will interrupt all
+ // active threads, which can add noise to the logs.
+ LOG.debug("Scan complete: shutting down");
this.exec.shutdownNow();
if (this.unknownError != null) {
+ LOG.debug("Scan failed", this.unknownError);
if (this.unknownError instanceof Error) {
throw (Error) this.unknownError;
} else if (this.unknownError instanceof RuntimeException) {
@@ -148,7 +171,11 @@ public class LocatedFileStatusFetcher {
throw new IOException(this.unknownError);
}
}
- if (this.invalidInputErrors.size() != 0) {
+ if (!this.invalidInputErrors.isEmpty()) {
+ LOG.debug("Invalid Input Errors raised");
+ for (IOException error : invalidInputErrors) {
+ LOG.debug("Error", error);
+ }
if (this.newApi) {
throw new org.apache.hadoop.mapreduce.lib.input.InvalidInputException(
invalidInputErrors);
@@ -161,7 +188,7 @@ public class LocatedFileStatusFetcher {
/**
* Collect misconfigured Input errors. Errors while actually reading file info
- * are reported immediately
+ * are reported immediately.
*/
private void registerInvalidInputError(List<IOException> errors) {
synchronized (this) {
@@ -171,9 +198,10 @@ public class LocatedFileStatusFetcher {
/**
* Register fatal errors - example an IOException while accessing a file or a
- * full exection queue
+ * full execution queue.
*/
private void registerError(Throwable t) {
+ LOG.debug("Error", t);
lock.lock();
try {
if (unknownError == null) {
@@ -221,7 +249,7 @@ public class LocatedFileStatusFetcher {
public Result call() throws Exception {
Result result = new Result();
result.fs = fs;
-
+ LOG.debug("ProcessInputDirCallable {}", fileStatus);
if (fileStatus.isDirectory()) {
RemoteIterator<LocatedFileStatus> iter = fs
.listLocatedStatus(fileStatus.getPath());
@@ -242,8 +270,8 @@ public class LocatedFileStatusFetcher {
}
private static class Result {
- private List<FileStatus> locatedFileStatuses = new LinkedList<FileStatus>();
- private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<FileStatus>();
+ private List<FileStatus> locatedFileStatuses = new LinkedList<>();
+ private List<FileStatus> dirsNeedingRecursiveCalls = new LinkedList<>();
private FileSystem fs;
}
}
@@ -259,11 +287,12 @@ public class LocatedFileStatusFetcher {
@Override
public void onSuccess(ProcessInputDirCallable.Result result) {
try {
- if (result.locatedFileStatuses.size() != 0) {
+ if (!result.locatedFileStatuses.isEmpty()) {
resultQueue.add(result.locatedFileStatuses);
}
- if (result.dirsNeedingRecursiveCalls.size() != 0) {
+ if (!result.dirsNeedingRecursiveCalls.isEmpty()) {
for (FileStatus fileStatus : result.dirsNeedingRecursiveCalls) {
+ LOG.debug("Queueing directory scan {}", fileStatus.getPath());
runningTasks.incrementAndGet();
ListenableFuture<ProcessInputDirCallable.Result> future = exec
.submit(new ProcessInputDirCallable(result.fs, fileStatus,
@@ -285,7 +314,7 @@ public class LocatedFileStatusFetcher {
}
}
-
+
/**
* Processes an initial Input Path pattern through the globber and PathFilter
* to generate a list of files which need further processing.
@@ -309,6 +338,7 @@ public class LocatedFileStatusFetcher {
Result result = new Result();
FileSystem fs = path.getFileSystem(conf);
result.fs = fs;
+ LOG.debug("ProcessInitialInputPathCallable path {}", path);
FileStatus[] matches = fs.globStatus(path, inputFilter);
if (matches == null) {
result.addError(new IOException("Input path does not exist: " + path));
@@ -337,7 +367,7 @@ public class LocatedFileStatusFetcher {
/**
* The callback handler to handle results generated by
- * {@link ProcessInitialInputPathCallable}
+ * {@link ProcessInitialInputPathCallable}.
*
*/
private class ProcessInitialInputPathCallback implements
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
index e2658ca..22efe14 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.lib.input;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
@@ -283,7 +284,10 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
job.getConfiguration(), dirs, recursive, inputFilter, true);
locatedFiles = locatedFileStatusFetcher.getFileStatuses();
} catch (InterruptedException e) {
- throw new IOException("Interrupted while getting file statuses");
+ throw (IOException)
+ new InterruptedIOException(
+ "Interrupted while getting file statuses")
+ .initCause(e);
}
result = Lists.newArrayList(locatedFiles);
}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
index 61e1484..1113bec 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java
@@ -37,10 +37,14 @@ public class InvalidInputException extends IOException {
/**
* Create the exception with the given list.
+ * The first element of the list is used as the init cause value.
* @param probs the list of problems to report. this list is not copied.
*/
public InvalidInputException(List<IOException> probs) {
problems = probs;
+ if (!probs.isEmpty()) {
+ initCause(probs.get(0));
+ }
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
index a59ffa9..bbb9faa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.util.DurationInfo;
/**
* Class to provide lambda expression invocation of AWS operations.
@@ -105,7 +106,7 @@ public class Invoker {
@Retries.OnceTranslated
public static <T> T once(String action, String path, Operation<T> operation)
throws IOException {
- try {
+ try (DurationInfo ignored = new DurationInfo(LOG, false, "%s", action)) {
return operation.execute();
} catch (AmazonClientException e) {
throw S3AUtils.translateException(action, path, e);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 159505b..1a1d9b7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
@@ -2472,7 +2473,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @param newDir the current working directory.
*/
public void setWorkingDirectory(Path newDir) {
- workingDir = newDir;
+ workingDir = makeQualified(newDir);
}
/**
@@ -3669,19 +3670,27 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
@Override
public FileStatus[] globStatus(Path pathPattern) throws IOException {
- entryPoint(INVOCATION_GLOB_STATUS);
- return super.globStatus(pathPattern);
+ return globStatus(pathPattern, ACCEPT_ALL);
}
/**
- * Override superclass so as to add statistic collection.
+ * Override superclass so as to disable symlink resolution and so avoid
+ * some calls to the FS which may have problems when the store is being
+ * inconsistent.
* {@inheritDoc}
*/
@Override
- public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+ public FileStatus[] globStatus(
+ final Path pathPattern,
+ final PathFilter filter)
throws IOException {
entryPoint(INVOCATION_GLOB_STATUS);
- return super.globStatus(pathPattern, filter);
+ return Globber.createGlobber(this)
+ .withPathPattern(pathPattern)
+ .withPathFiltern(filter)
+ .withResolveSymlinks(true)
+ .build()
+ .glob();
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestLocatedFileStatusFetcher.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestLocatedFileStatusFetcher.java
new file mode 100644
index 0000000..bd6bf2f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestLocatedFileStatusFetcher.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the LocatedFileStatusFetcher can do.
+ * This is related to HADOOP-16458.
+ * There's basic tests in ITestS3AFSMainOperations; this
+ * is see if we can create better corner cases.
+ */
+public class ITestLocatedFileStatusFetcher extends AbstractS3ATestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestLocatedFileStatusFetcher.class);
+
+ @Test
+ public void testGlobScan() throws Throwable {
+
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
new file mode 100644
index 0000000..511aa0f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.junit.Ignore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSMainOperationsBaseTest;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
+
+/**
+ * S3A Test suite for the FSMainOperationsBaseTest tests.
+ */
+public class ITestS3AFSMainOperations extends FSMainOperationsBaseTest {
+
+
+ public ITestS3AFSMainOperations() {
+ super(createTestPath(
+ new Path("/ITestS3AFSMainOperations")).toUri().toString());
+ }
+
+ @Override
+ protected FileSystem createFileSystem() throws Exception {
+ S3AContract contract = new S3AContract(new Configuration());
+ contract.init();
+ return contract.getTestFileSystem();
+ }
+
+ @Override
+ @Ignore("Permissions not supported")
+ public void testListStatusThrowsExceptionForUnreadableDir() {
+ }
+
+ @Override
+ @Ignore("Permissions not supported")
+ public void testGlobStatusThrowsExceptionForUnreadableDir() {
+ }
+
+ @Override
+ @Ignore("local FS path setup broken")
+ public void testCopyToLocalWithUseRawLocalFileSystemOption()
+ throws Exception {
+ }
+
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java
new file mode 100644
index 0000000..a741cd6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java
@@ -0,0 +1,707 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.AccessDeniedException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
+import org.apache.hadoop.mapred.LocatedFileStatusFetcher;
+import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN;
+import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE;
+import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.directory;
+import static org.apache.hadoop.fs.s3a.auth.RoleModel.statement;
+import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*;
+import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.bindRolePolicyStatements;
+import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig;
+import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
+import static org.apache.hadoop.test.GenericTestUtils.failif;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * This test creates a client with no read access to the underlying
+ * filesystem and then tries to perform various read operations on it.
+ * S3Guard in non-auth mode always goes to the FS, so we parameterize the
+ * test for S3Guard + Auth to see how failures move around.
+ * <ol>
+ * <li>Tests only run if an assumed role is provided.</li>
+ * <li>And the s3guard tests use the local metastore if
+ * there was not one already.</li>
+ * </ol>
+ * The tests are all bundled into one big test case.
+ * From a purist unit test perspective, this is utterly wrong as it goes
+ * against the
+ * <i>"Each test case tests exactly one thing"</i>
+ * philosophy of JUnit.
+ * <p>
+ * However is significantly reduces setup costs on the parameterized test runs,
+ * as it means that the filesystems and directories only need to be
+ * created and destroyed once per parameterized suite, rather than
+ * once per individual test.
+ * <p>
+ * All the test probes have informative messages so when a test failure
+ * does occur, its cause should be discoverable. It main weaknesses are
+ * therefore:
+ * <ol>
+ * <li>A failure of an assertion blocks all subsequent assertions from
+ * being checked.</li>
+ * <li>Maintenance is potentially harder.</li>
+ * </ol>
+ * To simplify maintenance, the operations tested are broken up into
+ * their own methods, with fields used to share the restricted role and
+ * created paths.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+@RunWith(Parameterized.class)
+public class ITestRestrictedReadAccess extends AbstractS3ATestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ITestRestrictedReadAccess.class);
+
+ /** Filter to select everything. */
+ private static final PathFilter EVERYTHING = t -> true;
+
+ /** Filter to select .txt files. */
+ private static final PathFilter TEXT_FILE =
+ path -> path.toUri().toString().endsWith(".txt");
+
+ /** The same path filter used in FileInputFormat. */
+ private static final PathFilter HIDDEN_FILE_FILTER =
+ (p) -> {
+ String n = p.getName();
+ return !n.startsWith("_") && !n.startsWith(".");
+ };
+
+ /**
+ * Text found in LocatedFileStatusFetcher exception when the glob
+ * returned "null".
+ */
+ private static final String DOES_NOT_EXIST = "does not exist";
+
+ /**
+ * Text found in LocatedFileStatusFetcher exception when
+ * the glob returned an empty list.
+ */
+ private static final String MATCHES_0_FILES = "matches 0 files";
+
+ /**
+ * Text used in files.
+ */
+ public static final byte[] HELLO = "hello".getBytes(Charset.forName("UTF-8"));
+
+ /**
+ * Wildcard scan to find *.txt in the no-read directory.
+ * When a scan/glob is done with S3Guard in auth mode, the scan will
+ * succeed but the file open will fail for any non-empty file.
+ * In non-auth mode, the read restrictions will fail the actual scan.
+ */
+ private Path noReadWildcard;
+
+ /**
+ * Parameterization.
+ */
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[][]{
+ {"raw", false, false},
+ {"nonauth", true, false},
+ {"auth", true, true}
+ });
+ }
+
+ private final String name;
+
+ private final boolean s3guard;
+
+ private final boolean authMode;
+
+ private Path basePath;
+
+ private Path noReadDir;
+
+ private Path emptyDir;
+
+ private Path emptyFile;
+
+ private Path subDir;
+
+ private Path subdirFile;
+
+ private Path subDir2;
+
+ private Path subdir2File1;
+
+ private Path subdir2File2;
+
+ private Configuration roleConfig;
+
+ /**
+ * A read-only FS; if non-null it is closed in teardown.
+ */
+ private S3AFileSystem readonlyFS;
+
+ /**
+ * Test suite setup.
+ * @param name name for logs/paths.
+ * @param s3guard is S3Guard enabled?
+ * @param authMode is S3Guard in auth mode?
+ */
+ public ITestRestrictedReadAccess(
+ final String name,
+ final boolean s3guard,
+ final boolean authMode) {
+ this.name = name;
+ this.s3guard = s3guard;
+ this.authMode = authMode;
+ }
+
+ @Override
+ public Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ String bucketName = getTestBucketName(conf);
+ removeBucketOverrides(bucketName, conf,
+ S3_METADATA_STORE_IMPL,
+ METADATASTORE_AUTHORITATIVE);
+ conf.setClass(Constants.S3_METADATA_STORE_IMPL,
+ s3guard ?
+ LocalMetadataStore.class
+ : NullMetadataStore.class,
+ MetadataStore.class);
+ conf.setBoolean(METADATASTORE_AUTHORITATIVE, authMode);
+ disableFilesystemCaching(conf);
+ return conf;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ assumeRoleTests();
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ S3AUtils.closeAll(LOG, readonlyFS);
+ super.teardown();
+ }
+
+ private void assumeRoleTests() {
+ assume("No ARN for role tests", !getAssumedRoleARN().isEmpty());
+ }
+
+ private String getAssumedRoleARN() {
+ return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, "");
+ }
+
+ /**
+ * Create the assumed role configuration.
+ * @return a config bonded to the ARN of the assumed role
+ */
+ public Configuration createAssumedRoleConfig() {
+ return createAssumedRoleConfig(getAssumedRoleARN());
+ }
+
+ /**
+ * Create a config for an assumed role; it also disables FS caching.
+ * @param roleARN ARN of role
+ * @return the new configuration
+ */
+ private Configuration createAssumedRoleConfig(String roleARN) {
+ return newAssumedRoleConfig(getContract().getConf(), roleARN);
+ }
+
+ /**
+ * This is a single test case which invokes the individual test cases
+ * in sequence.
+ */
+ @Test
+ public void testNoReadAccess() throws Throwable {
+ describe("Test failure handling if the client doesn't"
+ + " have read access under a path");
+ initNoReadAccess();
+
+ // now move up the API Chain, from the calls made by globStatus,
+ // to globStatus itself, and then to LocatedFileStatusFetcher,
+ // which invokes globStatus
+
+ checkBasicFileOperations();
+ checkGlobOperations();
+ checkSingleThreadedLocatedFileStatus();
+ checkLocatedFileStatusFourThreads();
+ checkLocatedFileStatusScanFile();
+ checkLocatedFileStatusNonexistentPath();
+ checkDeleteOperations();
+ }
+
+ /**
+ * Initialize the directory tree and the role filesystem.
+ */
+ public void initNoReadAccess() throws Throwable {
+ describe("Setting up filesystem");
+
+ S3AFileSystem realFS = getFileSystem();
+
+ // avoiding the parameterization to steer clear of accidentally creating
+ // patterns
+ basePath = path("testNoReadAccess-" + name);
+
+ // define the paths and create them.
+ describe("Creating test directories and files");
+
+ // this is the directory to which the restricted role has no read
+ // access.
+ noReadDir = new Path(basePath, "noReadDir");
+ // wildcard scan to find *.txt
+ noReadWildcard = new Path(noReadDir, "*/*.txt");
+
+ // an empty directory directory under the noReadDir
+ emptyDir = new Path(noReadDir, "emptyDir");
+ realFS.mkdirs(emptyDir);
+
+ // an empty file directory under the noReadDir
+ emptyFile = new Path(noReadDir, "emptyFile.txt");
+ touch(realFS, emptyFile);
+
+ // a subdirectory
+ subDir = new Path(noReadDir, "subDir");
+
+ // and a file in that subdirectory
+ subdirFile = new Path(subDir, "subdirFile.txt");
+ createFile(realFS, subdirFile, true, HELLO);
+ subDir2 = new Path(noReadDir, "subDir2");
+ subdir2File1 = new Path(subDir2, "subdir2File1.txt");
+ subdir2File2 = new Path(subDir2, "subdir2File2.docx");
+ createFile(realFS, subdir2File1, true, HELLO);
+ createFile(realFS, subdir2File2, true, HELLO);
+
+ // create a role filesystem which does not have read access under a path
+ // it still has write access, which can be explored in the final
+ // step to delete files and directories.
+ roleConfig = createAssumedRoleConfig();
+ bindRolePolicyStatements(roleConfig,
+ STATEMENT_S3GUARD_CLIENT,
+ STATEMENT_ALLOW_SSE_KMS_RW,
+ statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS),
+ new Statement(Effects.Deny)
+ .addActions(S3_ALL_GET)
+ .addResources(directory(noReadDir)));
+ readonlyFS = (S3AFileSystem) basePath.getFileSystem(roleConfig);
+ }
+
+ /**
+ * Validate basic IO operations.
+ */
+ public void checkBasicFileOperations() throws Throwable {
+
+ // this is a LIST call; there's no marker.
+ // so the sequence is
+ // - HEAD path -> FNFE
+ // - HEAD path + / -> FNFE
+ // - LIST path -> list results
+ // Because the client has list access, this succeeds
+ readonlyFS.listStatus(basePath);
+
+ // this is HEAD + "/" on S3; get on S3Guard auth
+ readonlyFS.listStatus(emptyDir);
+
+ // a recursive list of the no-read-directory works because
+ // there is no directory marker, it becomes a LIST call.
+ lsR(readonlyFS, noReadDir, true);
+
+ // similarly, a getFileStatus ends up being a list and generating
+ // a file status marker.
+ readonlyFS.getFileStatus(noReadDir);
+
+ // empty dir checks work!
+ readonlyFS.getFileStatus(emptyDir);
+
+ // now look at a file; the outcome depends on the mode.
+ if (authMode) {
+ // auth mode doesn't check S3, so no failure
+ readonlyFS.getFileStatus(subdirFile);
+ } else {
+ accessDenied(() ->
+ readonlyFS.getFileStatus(subdirFile));
+ }
+
+ // irrespective of mode, the attempt to read the data will fail.
+ // the only variable is where the failure occurs
+ accessDenied(() ->
+ ContractTestUtils.readUTF8(readonlyFS, subdirFile, HELLO.length));
+
+ // the empty file is interesting
+ if (!authMode) {
+ // non-auth mode, it fails at some point in the opening process.
+ // due to a HEAD being called on the object
+ accessDenied(() ->
+ ContractTestUtils.readUTF8(readonlyFS, emptyFile, 0));
+ } else {
+ // auth mode doesn't check the store.
+ // Furthermore, because it knows the file length is zero,
+ // it returns -1 without even opening the file.
+ // This means that permissions on the file do not get checked.
+ // See: HADOOP-16464.
+ try (FSDataInputStream is = readonlyFS.open(emptyFile)) {
+ Assertions.assertThat(is.read())
+ .describedAs("read of empty file")
+ .isEqualTo(-1);
+ }
+ readonlyFS.getFileStatus(subdirFile);
+ }
+ }
+
+ /**
+ * Explore Glob's recursive scan.
+ */
+ public void checkGlobOperations() throws Throwable {
+
+ describe("Glob Status operations");
+ // baseline: the real filesystem on a subdir
+ globFS(getFileSystem(), subdirFile, null, false, 1);
+ // a file fails if not in auth mode
+ globFS(readonlyFS, subdirFile, null, !authMode, 1);
+ // empty directories don't fail.
+ assertStatusPathEquals(emptyDir,
+ globFS(readonlyFS, emptyDir, null, false, 1));
+
+ FileStatus[] st = globFS(readonlyFS,
+ noReadWildcard,
+ null, false, 2);
+ Assertions.assertThat(st)
+ .extracting(FileStatus::getPath)
+ .containsExactlyInAnyOrder(subdirFile, subdir2File1);
+
+ // there is precisely one .docx file (subdir2File2.docx)
+ globFS(readonlyFS,
+ new Path(noReadDir, "*/*.docx"),
+ null, false, 1);
+
+ // there are no .doc files.
+ globFS(readonlyFS,
+ new Path(noReadDir, "*/*.doc"),
+ null, false, 0);
+ globFS(readonlyFS, noReadDir,
+ EVERYTHING, false, 1);
+ // and a filter without any wildcarded pattern only finds
+ // the role dir itself.
+ FileStatus[] st2 = globFS(readonlyFS, noReadDir,
+ EVERYTHING, false, 1);
+ Assertions.assertThat(st2)
+ .extracting(FileStatus::getPath)
+ .containsExactly(noReadDir);
+ }
+
+ /**
+ * Run a located file status fetcher against the directory tree.
+ */
+ public void checkSingleThreadedLocatedFileStatus() throws Throwable {
+
+ describe("LocatedFileStatusFetcher operations");
+ // use the same filter as FileInputFormat; single thread.
+ roleConfig.setInt(LIST_STATUS_NUM_THREADS, 1);
+ LocatedFileStatusFetcher fetcher =
+ new LocatedFileStatusFetcher(
+ roleConfig,
+ new Path[]{basePath},
+ true,
+ HIDDEN_FILE_FILTER,
+ true);
+ Assertions.assertThat(fetcher.getFileStatuses())
+ .describedAs("result of located scan")
+ .flatExtracting(FileStatus::getPath)
+ .containsExactlyInAnyOrder(
+ emptyFile,
+ subdirFile,
+ subdir2File1,
+ subdir2File2);
+
+ }
+
+ /**
+ * Run a located file status fetcher against the directory tree.
+ */
+ public void checkLocatedFileStatusFourThreads() throws Throwable {
+
+ // four threads and the text filter.
+ int threads = 4;
+ describe("LocatedFileStatusFetcher with %d", threads);
+ roleConfig.setInt(LIST_STATUS_NUM_THREADS, threads);
+ LocatedFileStatusFetcher fetcher2 =
+ new LocatedFileStatusFetcher(
+ roleConfig,
+ new Path[]{noReadWildcard},
+ true,
+ EVERYTHING,
+ true);
+ Assertions.assertThat(fetcher2.getFileStatuses())
+ .describedAs("result of located scan")
+ .isNotNull()
+ .flatExtracting(FileStatus::getPath)
+ .containsExactlyInAnyOrder(subdirFile, subdir2File1);
+ }
+
+ /**
+ * Run a located file status fetcher against the directory tree.
+ */
+ public void checkLocatedFileStatusScanFile() throws Throwable {
+ // pass in a file as the base of the scan.
+ describe("LocatedFileStatusFetcher with file %s", subdirFile);
+ roleConfig.setInt(LIST_STATUS_NUM_THREADS, 16);
+ try {
+ Iterable<FileStatus> fetched = new LocatedFileStatusFetcher(
+ roleConfig,
+ new Path[]{subdirFile},
+ true,
+ TEXT_FILE,
+ true).getFileStatuses();
+ // when not in auth mode, the HEAD request MUST have failed.
+ failif(!authMode, "LocatedFileStatusFetcher(" + subdirFile + ")"
+ + " should have failed");
+ // and in auth mode, the file MUST have been found.
+ Assertions.assertThat(fetched)
+ .describedAs("result of located scan")
+ .isNotNull()
+ .flatExtracting(FileStatus::getPath)
+ .containsExactly(subdirFile);
+ } catch (AccessDeniedException e) {
+ // we require the HEAD request to fail with access denied in non-auth
+ // mode, but not in auth mode.
+ failif(authMode, "LocatedFileStatusFetcher(" + subdirFile + ")", e);
+ }
+ }
+
+ /**
+ * Explore what happens with a path that does not exist.
+ */
+ public void checkLocatedFileStatusNonexistentPath() throws Throwable {
+ // scan a path that doesn't exist
+ Path nonexistent = new Path(noReadDir, "nonexistent");
+ InvalidInputException ex = intercept(InvalidInputException.class,
+ DOES_NOT_EXIST,
+ () -> new LocatedFileStatusFetcher(
+ roleConfig,
+ new Path[]{nonexistent},
+ true,
+ EVERYTHING,
+ true)
+ .getFileStatuses());
+ // validate nested exception
+ assertExceptionContains(DOES_NOT_EXIST, ex.getCause());
+
+ // a file which exists but which doesn't match the pattern
+ // is downgraded to not existing.
+ intercept(InvalidInputException.class,
+ DOES_NOT_EXIST,
+ () -> new LocatedFileStatusFetcher(
+ roleConfig,
+ new Path[]{noReadDir},
+ true,
+ TEXT_FILE,
+ true)
+ .getFileStatuses());
+
+ // a pattern under a nonexistent path is considered to not be a match.
+ ex = intercept(
+ InvalidInputException.class,
+ MATCHES_0_FILES,
+ () -> new LocatedFileStatusFetcher(
+ roleConfig,
+ new Path[]{new Path(nonexistent, "*.txt)")},
+ true,
+ TEXT_FILE,
+ true)
+ .getFileStatuses());
+ // validate nested exception
+ assertExceptionContains(MATCHES_0_FILES, ex.getCause());
+ }
+
+ /**
+ * Do some cleanup to see what happens with delete calls.
+ * Cleanup happens in test teardown anyway; doing it here
+ * just makes use of the delete calls to see how delete failures
+ * change with permissions and S3Guard stettings.
+ */
+ public void checkDeleteOperations() throws Throwable {
+ describe("Testing delete operations");
+
+ if (!authMode) {
+ // unguarded or non-auth S3Guard to fail on HEAD + /
+ accessDenied(() -> readonlyFS.delete(emptyDir, true));
+ // to fail on HEAD
+ accessDenied(() -> readonlyFS.delete(emptyFile, true));
+ } else {
+ // auth mode checks DDB for status and then issues the DELETE
+ readonlyFS.delete(emptyDir, true);
+ readonlyFS.delete(emptyFile, true);
+ }
+
+ // this will succeed for both as there is no subdir marker.
+ readonlyFS.delete(subDir, true);
+ // after which it is not there
+ fileNotFound(() -> readonlyFS.getFileStatus(subDir));
+ // and nor is its child.
+ fileNotFound(() -> readonlyFS.getFileStatus(subdirFile));
+
+ // now delete the base path
+ readonlyFS.delete(basePath, true);
+ // and expect an FNFE
+ fileNotFound(() -> readonlyFS.getFileStatus(subDir));
+ }
+
+ /**
+ * Require an operation to fail with a FileNotFoundException.
+ * @param eval closure to evaluate.
+ * @param <T> type of callable
+ * @return the exception.
+ * @throws Exception any other exception
+ */
+ protected <T> FileNotFoundException fileNotFound(final Callable<T> eval)
+ throws Exception {
+ return intercept(FileNotFoundException.class, eval);
+ }
+
+ /**
+ * Require an operation to fail with an AccessDeniedException.
+ * @param eval closure to evaluate.
+ * @param <T> type of callable
+ * @return the exception.
+ * @throws Exception any other exception
+ */
+ protected <T> AccessDeniedException accessDenied(final Callable<T> eval)
+ throws Exception {
+ return intercept(AccessDeniedException.class, eval);
+ }
+
+ /**
+ * Assert that a status array has exactly one element and its
+ * value is as expected.
+ * @param expected expected path
+ * @param statuses list of statuses
+ */
+ protected void assertStatusPathEquals(final Path expected,
+ final FileStatus[] statuses) {
+ Assertions.assertThat(statuses)
+ .describedAs("List of status entries")
+ .isNotNull()
+ .hasSize(1);
+ Assertions.assertThat(statuses[0].getPath())
+ .describedAs("Status entry %s", statuses[0])
+ .isEqualTo(expected);
+ }
+
+ /**
+ * Glob under a path with expected outcomes.
+ * @param fs filesystem to use
+ * @param path path (which can include patterns)
+ * @param filter optional filter
+ * @param expectAuthFailure is auth failure expected?
+ * @param expectedCount expected count of results; -1 means null response
+ * @return the result of a successful glob or null if an expected auth
+ * failure was caught.
+ * @throws IOException failure.
+ */
+ protected FileStatus[] globFS(
+ final S3AFileSystem fs,
+ final Path path,
+ final PathFilter filter,
+ boolean expectAuthFailure,
+ final int expectedCount)
+ throws IOException {
+ LOG.info("Glob {}", path);
+ S3ATestUtils.MetricDiff getMetric = new S3ATestUtils.MetricDiff(fs,
+ Statistic.OBJECT_METADATA_REQUESTS);
+ S3ATestUtils.MetricDiff listMetric = new S3ATestUtils.MetricDiff(fs,
+ Statistic.OBJECT_LIST_REQUESTS);
+ FileStatus[] st;
+ try {
+ st = filter == null
+ ? fs.globStatus(path)
+ : fs.globStatus(path, filter);
+ LOG.info("Metrics:\n {},\n {}", getMetric, listMetric);
+ if (expectAuthFailure) {
+ // should have failed here
+ String resultStr;
+ if (st == null) {
+ resultStr = "A null array";
+ } else {
+ resultStr = StringUtils.join(st, ",");
+ }
+ fail(String.format("globStatus(%s) should have raised"
+ + " an exception, but returned %s", path, resultStr));
+ }
+ } catch (AccessDeniedException e) {
+ LOG.info("Metrics:\n {},\n {}", getMetric, listMetric);
+ failif(!expectAuthFailure, "Access denied in glob of " + path,
+ e);
+ return null;
+ }
+ if (expectedCount < 0) {
+ Assertions.assertThat(st)
+ .describedAs("Glob of %s", path)
+ .isNull();
+ } else {
+ Assertions.assertThat(st)
+ .describedAs("Glob of %s", path)
+ .isNotNull()
+ .hasSize(expectedCount);
+ }
+ return st;
+ }
+
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org