You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/12/30 01:38:08 UTC

[24/49] hadoop git commit: HADOOP-15113. NPE in S3A getFileStatus: null instrumentation on using closed instance. Contributed by Steve Loughran.

HADOOP-15113. NPE in S3A getFileStatus: null instrumentation on using closed instance.
Contributed by Steve Loughran.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ef450df4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ef450df4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ef450df4

Branch: refs/heads/YARN-6592
Commit: ef450df443f1dea1c52082cf281f25db7141972f
Parents: d2d8f4a
Author: Steve Loughran <st...@apache.org>
Authored: Thu Dec 21 14:15:53 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Thu Dec 21 14:15:53 2017 +0000

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 53 ++++++++---
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java |  3 +
 .../apache/hadoop/fs/s3a/ITestS3AClosedFS.java  | 92 ++++++++++++++++++++
 3 files changed, 135 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef450df4/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 9431f17..f461c9e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -187,6 +187,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   private long readAhead;
   private S3AInputPolicy inputPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
+  private volatile boolean isClosed = false;
   private MetadataStore metadataStore;
   private boolean allowAuthoritative;
 
@@ -678,7 +679,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
    */
   public FSDataInputStream open(Path f, int bufferSize)
       throws IOException {
-
+    checkNotClosed();
     LOG.debug("Opening '{}' for reading; input policy = {}", f, inputPolicy);
     final FileStatus fileStatus = getFileStatus(f);
     if (fileStatus.isDirectory()) {
@@ -722,6 +723,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   public FSDataOutputStream create(Path f, FsPermission permission,
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
+    checkNotClosed();
     final Path path = qualify(f);
     String key = pathToKey(path);
     FileStatus status = null;
@@ -871,7 +873,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
     Path dst = qualify(dest);
 
     LOG.debug("Rename path {} to {}", src, dst);
-    incrementStatistic(INVOCATION_RENAME);
+    entryPoint(INVOCATION_RENAME);
 
     String srcKey = pathToKey(src);
     String dstKey = pathToKey(dst);
@@ -1098,6 +1100,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   }
 
   /**
+   * Entry point to an operation.
+   * Increments the statistic; verifies the FS is active.
+   * @param operation The operation to increment
+   * @throws IOException if the
+   */
+  protected void entryPoint(Statistic operation) throws IOException {
+    checkNotClosed();
+    incrementStatistic(operation);
+  }
+
+  /**
    * Increment a statistic by 1.
    * @param statistic The operation to increment
    */
@@ -1660,6 +1673,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   @Retries.RetryTranslated
   public boolean delete(Path f, boolean recursive) throws IOException {
     try {
+      checkNotClosed();
       return innerDelete(innerGetFileStatus(f, true), recursive);
     } catch (FileNotFoundException e) {
       LOG.debug("Couldn't delete {} - does not exist", f);
@@ -1838,7 +1852,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
     Path path = qualify(f);
     String key = pathToKey(path);
     LOG.debug("List status for path: {}", path);
-    incrementStatistic(INVOCATION_LIST_STATUS);
+    entryPoint(INVOCATION_LIST_STATUS);
 
     List<FileStatus> result;
     final FileStatus fileStatus =  getFileStatus(path);
@@ -1981,7 +1995,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       throws IOException, FileAlreadyExistsException, AmazonClientException {
     Path f = qualify(p);
     LOG.debug("Making directory: {}", f);
-    incrementStatistic(INVOCATION_MKDIRS);
+    entryPoint(INVOCATION_MKDIRS);
     FileStatus fileStatus;
     List<Path> metadataStoreDirs = null;
     if (hasMetadataStore()) {
@@ -2058,7 +2072,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   @Retries.RetryTranslated
   S3AFileStatus innerGetFileStatus(final Path f,
       boolean needEmptyDirectoryFlag) throws IOException {
-    incrementStatistic(INVOCATION_GET_FILE_STATUS);
+    entryPoint(INVOCATION_GET_FILE_STATUS);
     final Path path = qualify(f);
     String key = pathToKey(path);
     LOG.debug("Getting path status for {}  ({})", path, key);
@@ -2319,7 +2333,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
       Path src, Path dst)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
-    incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
+    entryPoint(INVOCATION_COPY_FROM_LOCAL_FILE);
     LOG.debug("Copying local file from {} to {}", src, dst);
 
     // Since we have a local file, we don't need to stream into a temporary file
@@ -2418,6 +2432,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
       // already closed
       return;
     }
+    isClosed = true;
+    LOG.debug("Filesystem {} is closed", uri);
     try {
       super.close();
     } finally {
@@ -2435,6 +2451,17 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   }
 
   /**
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
+  private void checkNotClosed() throws IOException {
+    if (isClosed) {
+      throw new IOException(uri + ": " + E_FS_CLOSED);
+    }
+  }
+
+  /**
    * Override getCanonicalServiceName because we don't support token in S3A.
    */
   @Override
@@ -2860,7 +2887,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
    */
   @Override
   public FileStatus[] globStatus(Path pathPattern) throws IOException {
-    incrementStatistic(INVOCATION_GLOB_STATUS);
+    entryPoint(INVOCATION_GLOB_STATUS);
     return super.globStatus(pathPattern);
   }
 
@@ -2871,7 +2898,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   @Override
   public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
       throws IOException {
-    incrementStatistic(INVOCATION_GLOB_STATUS);
+    entryPoint(INVOCATION_GLOB_STATUS);
     return super.globStatus(pathPattern, filter);
   }
 
@@ -2881,7 +2908,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
    */
   @Override
   public boolean exists(Path f) throws IOException {
-    incrementStatistic(INVOCATION_EXISTS);
+    entryPoint(INVOCATION_EXISTS);
     return super.exists(f);
   }
 
@@ -2892,7 +2919,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   @Override
   @SuppressWarnings("deprecation")
   public boolean isDirectory(Path f) throws IOException {
-    incrementStatistic(INVOCATION_IS_DIRECTORY);
+    entryPoint(INVOCATION_IS_DIRECTORY);
     return super.isDirectory(f);
   }
 
@@ -2903,7 +2930,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   @Override
   @SuppressWarnings("deprecation")
   public boolean isFile(Path f) throws IOException {
-    incrementStatistic(INVOCATION_IS_FILE);
+    entryPoint(INVOCATION_IS_FILE);
     return super.isFile(f);
   }
 
@@ -2948,7 +2975,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
 
   private RemoteIterator<LocatedFileStatus> innerListFiles(Path f, boolean
       recursive, Listing.FileStatusAcceptor acceptor) throws IOException {
-    incrementStatistic(INVOCATION_LIST_FILES);
+    entryPoint(INVOCATION_LIST_FILES);
     Path path = qualify(f);
     LOG.debug("listFiles({}, {})", path, recursive);
     try {
@@ -3033,7 +3060,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
   public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
       final PathFilter filter)
       throws FileNotFoundException, IOException {
-    incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
+    entryPoint(INVOCATION_LIST_LOCATED_STATUS);
     Path path = qualify(f);
     LOG.debug("listLocatedStatus({}, {}", path, filter);
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef450df4/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 2457217..6d66739 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -89,6 +89,9 @@ public final class S3AUtils {
       "is abstract and therefore cannot be created";
   static final String ENDPOINT_KEY = "Endpoint";
 
+  /** Filesystem is closed; kept here to keep the errors close. */
+  static final String E_FS_CLOSED = "FileSystem is closed!";
+
   /**
    * Core property for provider path. Duplicated here for consistent
    * code across Hadoop version: {@value}.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ef450df4/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
new file mode 100644
index 0000000..6e81452
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.E_FS_CLOSED;
+
+/**
+ * Tests of the S3A FileSystem which is closed; just make sure
+ * that that basic file Ops fail meaningfully.
+ */
+public class ITestS3AClosedFS extends AbstractS3ATestBase {
+
+  private Path root = new Path("/");
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    root = getFileSystem().makeQualified(new Path("/"));
+    getFileSystem().close();
+  }
+
+  @Override
+  public void teardown()  {
+    // no op, as the FS is closed
+  }
+
+  @Test
+  public void testClosedGetFileStatus() throws Exception {
+    intercept(IOException.class, E_FS_CLOSED,
+        () -> getFileSystem().getFileStatus(root));
+  }
+
+  @Test
+  public void testClosedListStatus() throws Exception {
+    intercept(IOException.class, E_FS_CLOSED,
+        () -> getFileSystem().listStatus(root));
+  }
+
+  @Test
+  public void testClosedListFile() throws Exception {
+    intercept(IOException.class, E_FS_CLOSED,
+        () -> getFileSystem().listFiles(root, false));
+  }
+
+  @Test
+  public void testClosedListLocatedStatus() throws Exception {
+    intercept(IOException.class, E_FS_CLOSED,
+        () -> getFileSystem().listLocatedStatus(root));
+  }
+
+  @Test
+  public void testClosedCreate() throws Exception {
+    intercept(IOException.class, E_FS_CLOSED,
+        () -> getFileSystem().create(path("to-create")).close());
+  }
+
+  @Test
+  public void testClosedDelete() throws Exception {
+    intercept(IOException.class, E_FS_CLOSED,
+        () ->  getFileSystem().delete(path("to-delete"), false));
+  }
+
+  @Test
+  public void testClosedOpen() throws Exception {
+    intercept(IOException.class, E_FS_CLOSED,
+        () ->  getFileSystem().open(path("to-open")));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org