You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/12/30 01:38:08 UTC
[24/49] hadoop git commit: HADOOP-15113. NPE in S3A getFileStatus:
null instrumentation on using closed instance. Contributed by Steve Loughran.
HADOOP-15113. NPE in S3A getFileStatus: null instrumentation on using closed instance.
Contributed by Steve Loughran.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ef450df4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ef450df4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ef450df4
Branch: refs/heads/YARN-6592
Commit: ef450df443f1dea1c52082cf281f25db7141972f
Parents: d2d8f4a
Author: Steve Loughran <st...@apache.org>
Authored: Thu Dec 21 14:15:53 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Dec 21 14:15:53 2017 +0000
----------------------------------------------------------------------
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 53 ++++++++---
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 3 +
.../apache/hadoop/fs/s3a/ITestS3AClosedFS.java | 92 ++++++++++++++++++++
3 files changed, 135 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef450df4/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9431f17..f461c9e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -187,6 +187,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private long readAhead;
private S3AInputPolicy inputPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private volatile boolean isClosed = false;
private MetadataStore metadataStore;
private boolean allowAuthoritative;
@@ -678,7 +679,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
*/
public FSDataInputStream open(Path f, int bufferSize)
throws IOException {
-
+ checkNotClosed();
LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy);
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
@@ -722,6 +723,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
+ checkNotClosed();
final Path path = qualify(f);
String key = pathToKey(path);
FileStatus status = null;
@@ -871,7 +873,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
Path dst = qualify(dest);
LOG.debug("Rename path {} to {}", src, dst);
- incrementStatistic(INVOCATION_RENAME);
+ entryPoint(INVOCATION_RENAME);
String srcKey = pathToKey(src);
String dstKey = pathToKey(dst);
@@ -1098,6 +1100,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
}
/**
+ * Entry point to an operation.
+ * Increments the statistic; verifies the FS is active.
+ * @param operation The operation to increment
+ * @throws IOException if the
+ */
+ protected void entryPoint(Statistic operation) throws IOException {
+ checkNotClosed();
+ incrementStatistic(operation);
+ }
+
+ /**
* Increment a statistic by 1.
* @param statistic The operation to increment
*/
@@ -1660,6 +1673,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
@Retries.RetryTranslated
public boolean delete(Path f, boolean recursive) throws IOException {
try {
+ checkNotClosed();
return innerDelete(innerGetFileStatus(f, true), recursive);
} catch (FileNotFoundException e) {
LOG.debug("Couldn't delete {} - does not exist", f);
@@ -1838,7 +1852,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
Path path = qualify(f);
String key = pathToKey(path);
LOG.debug("List status for path: {}", path);
- incrementStatistic(INVOCATION_LIST_STATUS);
+ entryPoint(INVOCATION_LIST_STATUS);
List<FileStatus> result;
final FileStatus fileStatus = getFileStatus(path);
@@ -1981,7 +1995,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
throws IOException, FileAlreadyExistsException, AmazonClientException {
Path f = qualify(p);
LOG.debug("Making directory: {}", f);
- incrementStatistic(INVOCATION_MKDIRS);
+ entryPoint(INVOCATION_MKDIRS);
FileStatus fileStatus;
List<Path> metadataStoreDirs = null;
if (hasMetadataStore()) {
@@ -2058,7 +2072,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
@Retries.RetryTranslated
S3AFileStatus innerGetFileStatus(final Path f,
boolean needEmptyDirectoryFlag) throws IOException {
- incrementStatistic(INVOCATION_GET_FILE_STATUS);
+ entryPoint(INVOCATION_GET_FILE_STATUS);
final Path path = qualify(f);
String key = pathToKey(path);
LOG.debug("Getting path status for {} ({})", path, key);
@@ -2319,7 +2333,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
Path src, Path dst)
throws IOException, FileAlreadyExistsException, AmazonClientException {
- incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
+ entryPoint(INVOCATION_COPY_FROM_LOCAL_FILE);
LOG.debug("Copying local file from {} to {}", src, dst);
// Since we have a local file, we don't need to stream into a temporary file
@@ -2418,6 +2432,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
// already closed
return;
}
+ isClosed = true;
+ LOG.debug("Filesystem {} is closed", uri);
try {
super.close();
} finally {
@@ -2435,6 +2451,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
}
/**
+ * Verify that the input stream is open. Non blocking; this gives
+ * the last state of the volatile {@link #closed} field.
+ * @throws IOException if the connection is closed.
+ */
+ private void checkNotClosed() throws IOException {
+ if (isClosed) {
+ throw new IOException(uri + ": " + E_FS_CLOSED);
+ }
+ }
+
+ /**
* Override getCanonicalServiceName because we don't support token in S3A.
*/
@Override
@@ -2860,7 +2887,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
*/
@Override
public FileStatus[] globStatus(Path pathPattern) throws IOException {
- incrementStatistic(INVOCATION_GLOB_STATUS);
+ entryPoint(INVOCATION_GLOB_STATUS);
return super.globStatus(pathPattern);
}
@@ -2871,7 +2898,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
@Override
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException {
- incrementStatistic(INVOCATION_GLOB_STATUS);
+ entryPoint(INVOCATION_GLOB_STATUS);
return super.globStatus(pathPattern, filter);
}
@@ -2881,7 +2908,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
*/
@Override
public boolean exists(Path f) throws IOException {
- incrementStatistic(INVOCATION_EXISTS);
+ entryPoint(INVOCATION_EXISTS);
return super.exists(f);
}
@@ -2892,7 +2919,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
@Override
@SuppressWarnings("deprecation")
public boolean isDirectory(Path f) throws IOException {
- incrementStatistic(INVOCATION_IS_DIRECTORY);
+ entryPoint(INVOCATION_IS_DIRECTORY);
return super.isDirectory(f);
}
@@ -2903,7 +2930,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
@Override
@SuppressWarnings("deprecation")
public boolean isFile(Path f) throws IOException {
- incrementStatistic(INVOCATION_IS_FILE);
+ entryPoint(INVOCATION_IS_FILE);
return super.isFile(f);
}
@@ -2948,7 +2975,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
- incrementStatistic(INVOCATION_LIST_FILES);
+ entryPoint(INVOCATION_LIST_FILES);
Path path = qualify(f);
LOG.debug("listFiles({}, {})", path, recursive);
try {
@@ -3033,7 +3060,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter)
throws FileNotFoundException, IOException {
- incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
+ entryPoint(INVOCATION_LIST_LOCATED_STATUS);
Path path = qualify(f);
LOG.debug("listLocatedStatus({}, {}", path, filter);
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef450df4/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 2457217..6d66739 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -89,6 +89,9 @@ public final class S3AUtils {
"is abstract and therefore cannot be created";
static final String ENDPOINT_KEY = "Endpoint";
+ /** Filesystem is closed; kept here to keep the errors close. */
+ static final String E_FS_CLOSED = "FileSystem is closed!";
+
/**
* Core property for provider path. Duplicated here for consistent
* code across Hadoop version: {@value}.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef450df4/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
new file mode 100644
index 0000000..6e81452
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.E_FS_CLOSED;
+
+/**
+ * Tests of the S3A FileSystem which is closed; just make sure
+ * that that basic file Ops fail meaningfully.
+ */
+public class ITestS3AClosedFS extends AbstractS3ATestBase {
+
+ private Path root = new Path("/");
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ root = getFileSystem().makeQualified(new Path("/"));
+ getFileSystem().close();
+ }
+
+ @Override
+ public void teardown() {
+ // no op, as the FS is closed
+ }
+
+ @Test
+ public void testClosedGetFileStatus() throws Exception {
+ intercept(IOException.class, E_FS_CLOSED,
+ () -> getFileSystem().getFileStatus(root));
+ }
+
+ @Test
+ public void testClosedListStatus() throws Exception {
+ intercept(IOException.class, E_FS_CLOSED,
+ () -> getFileSystem().listStatus(root));
+ }
+
+ @Test
+ public void testClosedListFile() throws Exception {
+ intercept(IOException.class, E_FS_CLOSED,
+ () -> getFileSystem().listFiles(root, false));
+ }
+
+ @Test
+ public void testClosedListLocatedStatus() throws Exception {
+ intercept(IOException.class, E_FS_CLOSED,
+ () -> getFileSystem().listLocatedStatus(root));
+ }
+
+ @Test
+ public void testClosedCreate() throws Exception {
+ intercept(IOException.class, E_FS_CLOSED,
+ () -> getFileSystem().create(path("to-create")).close());
+ }
+
+ @Test
+ public void testClosedDelete() throws Exception {
+ intercept(IOException.class, E_FS_CLOSED,
+ () -> getFileSystem().delete(path("to-delete"), false));
+ }
+
+ @Test
+ public void testClosedOpen() throws Exception {
+ intercept(IOException.class, E_FS_CLOSED,
+ () -> getFileSystem().open(path("to-open")));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org