You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/10/10 21:04:05 UTC

[7/7] hadoop git commit: HADOOP-13447. Refactor S3AFileSystem to support introduction of separate metadata repository and tests. Contributed by Chris Nauroth.

HADOOP-13447. Refactor S3AFileSystem to support introduction of separate metadata repository and tests. Contributed by Chris Nauroth.

(cherry picked from commit d152557cf7f4d2288524c222fcbaf152bdc038b0)
(cherry picked from commit e28930a38b3868bc701aeef229831292320b4a3a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67d8301e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67d8301e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67d8301e

Branch: refs/heads/branch-2.8
Commit: 67d8301e55c19057957173fdfc28e77e070f2787
Parents: 027d76f
Author: Chris Nauroth <cn...@apache.org>
Authored: Tue Sep 6 09:36:21 2016 -0700
Committer: Chris Nauroth <cn...@apache.org>
Committed: Mon Oct 10 14:01:19 2016 -0700

----------------------------------------------------------------------
 hadoop-tools/hadoop-aws/pom.xml                 |   5 +
 .../org/apache/hadoop/fs/s3a/Constants.java     |  29 ++-
 .../hadoop/fs/s3a/S3AFastOutputStream.java      |   6 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 192 ++-------------
 .../apache/hadoop/fs/s3a/S3AInputStream.java    |   6 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java |  40 ++++
 .../apache/hadoop/fs/s3a/S3ClientFactory.java   | 232 +++++++++++++++++++
 .../hadoop/fs/s3native/S3xLoginHelper.java      |   3 +
 .../hadoop/fs/s3a/AbstractS3AMockTest.java      |  70 ++++++
 .../hadoop/fs/s3a/ITestS3AConfiguration.java    |  10 +-
 .../hadoop/fs/s3a/MockS3ClientFactory.java      |  40 ++++
 .../hadoop/fs/s3a/TestS3AGetFileStatus.java     | 126 ++++++++++
 12 files changed, 567 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index c91081d..e3befd4 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -295,6 +295,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 1508675..cf97c35 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -53,7 +53,8 @@ public final class Constants {
   public static final int DEFAULT_MAXIMUM_CONNECTIONS = 15;
 
   // connect to s3 over ssl?
-  public static final String SECURE_CONNECTIONS = "fs.s3a.connection.ssl.enabled";
+  public static final String SECURE_CONNECTIONS =
+      "fs.s3a.connection.ssl.enabled";
   public static final boolean DEFAULT_SECURE_CONNECTIONS = true;
 
   //use a custom endpoint?
@@ -75,7 +76,8 @@ public final class Constants {
   public static final int DEFAULT_MAX_ERROR_RETRIES = 20;
 
   // seconds until we give up trying to establish a connection to s3
-  public static final String ESTABLISH_TIMEOUT = "fs.s3a.connection.establish.timeout";
+  public static final String ESTABLISH_TIMEOUT =
+      "fs.s3a.connection.establish.timeout";
   public static final int DEFAULT_ESTABLISH_TIMEOUT = 50000;
 
   // seconds until we give up on a connection to s3
@@ -116,11 +118,13 @@ public final class Constants {
   public static final long DEFAULT_MULTIPART_SIZE = 104857600; // 100 MB
 
   // minimum size in bytes before we start a multipart uploads or copy
-  public static final String MIN_MULTIPART_THRESHOLD = "fs.s3a.multipart.threshold";
+  public static final String MIN_MULTIPART_THRESHOLD =
+      "fs.s3a.multipart.threshold";
   public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = Integer.MAX_VALUE;
 
   //enable multiobject-delete calls?
-  public static final String ENABLE_MULTI_DELETE = "fs.s3a.multiobjectdelete.enable";
+  public static final String ENABLE_MULTI_DELETE =
+      "fs.s3a.multiobjectdelete.enable";
 
   // comma separated list of directories
   public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
@@ -139,11 +143,13 @@ public final class Constants {
   public static final String DEFAULT_CANNED_ACL = "";
 
   // should we try to purge old multipart uploads when starting up
-  public static final String PURGE_EXISTING_MULTIPART = "fs.s3a.multipart.purge";
+  public static final String PURGE_EXISTING_MULTIPART =
+      "fs.s3a.multipart.purge";
   public static final boolean DEFAULT_PURGE_EXISTING_MULTIPART = false;
 
   // purge any multipart uploads older than this number of seconds
-  public static final String PURGE_EXISTING_MULTIPART_AGE = "fs.s3a.multipart.purge.age";
+  public static final String PURGE_EXISTING_MULTIPART_AGE =
+      "fs.s3a.multipart.purge.age";
   public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 14400;
 
   // s3 server-side encryption
@@ -203,4 +209,15 @@ public final class Constants {
    */
   @InterfaceStability.Unstable
   public static final String INPUT_FADV_RANDOM = "random";
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static final String S3_CLIENT_FACTORY_IMPL =
+      "fs.s3a.s3.client.factory.impl";
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public static final Class<? extends S3ClientFactory>
+      DEFAULT_S3_CLIENT_FACTORY_IMPL =
+          S3ClientFactory.DefaultS3ClientFactory.class;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 5509d36..c25d0fb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
@@ -71,7 +71,7 @@ public class S3AFastOutputStream extends OutputStream {
   private static final Logger LOG = S3AFileSystem.LOG;
   private final String key;
   private final String bucket;
-  private final AmazonS3Client client;
+  private final AmazonS3 client;
   private final int partSize;
   private final int multiPartThreshold;
   private final S3AFileSystem fs;
@@ -102,7 +102,7 @@ public class S3AFastOutputStream extends OutputStream {
    * @param threadPoolExecutor thread factory
    * @throws IOException on any problem
    */
-  public S3AFastOutputStream(AmazonS3Client client,
+  public S3AFastOutputStream(AmazonS3 client,
       S3AFileSystem fs,
       String bucket,
       String key,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 24a8d64..15bd23a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -36,11 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@@ -59,7 +55,6 @@ import com.amazonaws.services.s3.transfer.Upload;
 import com.amazonaws.event.ProgressListener;
 import com.amazonaws.event.ProgressEvent;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -84,7 +79,7 @@ import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.VersionInfo;
+import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
@@ -116,7 +111,7 @@ public class S3AFileSystem extends FileSystem {
   public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
   private URI uri;
   private Path workingDir;
-  private AmazonS3Client s3;
+  private AmazonS3 s3;
   private String bucket;
   private int maxKeys;
   private Listing listing;
@@ -156,37 +151,11 @@ public class S3AFileSystem extends FileSystem {
 
       bucket = name.getHost();
 
-      AWSCredentialsProvider credentials =
-          createAWSCredentialProviderSet(name, conf, uri);
-
-      ClientConfiguration awsConf = new ClientConfiguration();
-      awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
-          DEFAULT_MAXIMUM_CONNECTIONS, 1));
-      boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
-          DEFAULT_SECURE_CONNECTIONS);
-      awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
-      awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
-          DEFAULT_MAX_ERROR_RETRIES, 0));
-      awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
-          DEFAULT_ESTABLISH_TIMEOUT, 0));
-      awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
-          DEFAULT_SOCKET_TIMEOUT, 0));
-      int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
-          DEFAULT_SOCKET_SEND_BUFFER, 2048);
-      int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
-          DEFAULT_SOCKET_RECV_BUFFER, 2048);
-      awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
-      String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
-      if (!signerOverride.isEmpty()) {
-        LOG.debug("Signer override = {}", signerOverride);
-        awsConf.setSignerOverride(signerOverride);
-      }
-
-      initProxySupport(conf, awsConf, secureConnections);
-
-      initUserAgent(conf, awsConf);
-
-      initAmazonS3Client(conf, credentials, awsConf);
+      Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
+          S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
+          S3ClientFactory.class);
+      s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
+          .createS3Client(name, uri);
 
       maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
       listing = new Listing(this);
@@ -282,50 +251,6 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
-  void initProxySupport(Configuration conf, ClientConfiguration awsConf,
-      boolean secureConnections) throws IllegalArgumentException {
-    String proxyHost = conf.getTrimmed(PROXY_HOST, "");
-    int proxyPort = conf.getInt(PROXY_PORT, -1);
-    if (!proxyHost.isEmpty()) {
-      awsConf.setProxyHost(proxyHost);
-      if (proxyPort >= 0) {
-        awsConf.setProxyPort(proxyPort);
-      } else {
-        if (secureConnections) {
-          LOG.warn("Proxy host set without port. Using HTTPS default 443");
-          awsConf.setProxyPort(443);
-        } else {
-          LOG.warn("Proxy host set without port. Using HTTP default 80");
-          awsConf.setProxyPort(80);
-        }
-      }
-      String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
-      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
-      if ((proxyUsername == null) != (proxyPassword == null)) {
-        String msg = "Proxy error: " + PROXY_USERNAME + " or " +
-            PROXY_PASSWORD + " set without the other.";
-        LOG.error(msg);
-        throw new IllegalArgumentException(msg);
-      }
-      awsConf.setProxyUsername(proxyUsername);
-      awsConf.setProxyPassword(proxyPassword);
-      awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
-      awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
-                "domain {} as workstation {}", awsConf.getProxyHost(),
-            awsConf.getProxyPort(),
-            String.valueOf(awsConf.getProxyUsername()),
-            awsConf.getProxyPassword(), awsConf.getProxyDomain(),
-            awsConf.getProxyWorkstation());
-      }
-    } else if (proxyPort >= 0) {
-      String msg = "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
-      LOG.error(msg);
-      throw new IllegalArgumentException(msg);
-    }
-  }
-
   /**
    * Get S3A Instrumentation. For test purposes.
    * @return this instance's instrumentation.
@@ -334,53 +259,9 @@ public class S3AFileSystem extends FileSystem {
     return instrumentation;
   }
 
-  /**
-   * Initializes the User-Agent header to send in HTTP requests to the S3
-   * back-end.  We always include the Hadoop version number.  The user also may
-   * set an optional custom prefix to put in front of the Hadoop version number.
-   * The AWS SDK interally appends its own information, which seems to include
-   * the AWS SDK version, OS and JVM version.
-   *
-   * @param conf Hadoop configuration
-   * @param awsConf AWS SDK configuration
-   */
-  private void initUserAgent(Configuration conf, ClientConfiguration awsConf) {
-    String userAgent = "Hadoop " + VersionInfo.getVersion();
-    String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
-    if (!userAgentPrefix.isEmpty()) {
-      userAgent = userAgentPrefix + ", " + userAgent;
-    }
-    LOG.debug("Using User-Agent: {}", userAgent);
-    awsConf.setUserAgent(userAgent);
-  }
-
-  private void initAmazonS3Client(Configuration conf,
-      AWSCredentialsProvider credentials, ClientConfiguration awsConf)
-      throws IllegalArgumentException {
-    s3 = new AmazonS3Client(credentials, awsConf);
-    String endPoint = conf.getTrimmed(ENDPOINT, "");
-    if (!endPoint.isEmpty()) {
-      try {
-        s3.setEndpoint(endPoint);
-      } catch (IllegalArgumentException e) {
-        String msg = "Incorrect endpoint: "  + e.getMessage();
-        LOG.error(msg);
-        throw new IllegalArgumentException(msg, e);
-      }
-    }
-    enablePathStyleAccessIfRequired(conf);
-  }
-
-  private void enablePathStyleAccessIfRequired(Configuration conf) {
-    final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
-    if (pathStyleAccess) {
-      LOG.debug("Enabling path style access!");
-      s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
-    }
-  }
-
   private void initTransferManager() {
-    TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
+    TransferManagerConfiguration transferConfiguration =
+        new TransferManagerConfiguration();
     transferConfiguration.setMinimumUploadPartSize(partSize);
     transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
     transferConfiguration.setMultipartCopyPartSize(partSize);
@@ -451,7 +332,7 @@ public class S3AFileSystem extends FileSystem {
    * @return AmazonS3Client
    */
   @VisibleForTesting
-  AmazonS3Client getAmazonS3Client() {
+  AmazonS3 getAmazonS3Client() {
     return s3;
   }
 
@@ -475,10 +356,6 @@ public class S3AFileSystem extends FileSystem {
     this.inputPolicy = inputPolicy;
   }
 
-  public S3AFileSystem() {
-    super();
-  }
-
   /**
    * Turns a path (relative or otherwise) into an S3 key.
    *
@@ -822,8 +699,10 @@ public class S3AFileSystem extends FileSystem {
 
       while (true) {
         for (S3ObjectSummary summary : objects.getObjectSummaries()) {
-          keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
-          String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
+          keysToDelete.add(
+              new DeleteObjectsRequest.KeyVersion(summary.getKey()));
+          String newDstKey =
+              dstKey + summary.getKey().substring(srcKey.length());
           copyFile(summary.getKey(), newDstKey, summary.getSize());
 
           if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
@@ -1484,7 +1363,8 @@ public class S3AFileSystem extends FileSystem {
             LOG.debug("Found file (with /): fake directory");
             return new S3AFileStatus(true, true, path);
           } else {
-            LOG.warn("Found file (with /): real file? should not happen: {}", key);
+            LOG.warn("Found file (with /): real file? should not happen: {}",
+                key);
 
             return new S3AFileStatus(meta.getContentLength(),
                 dateToLong(meta.getLastModified()),
@@ -2078,42 +1958,4 @@ public class S3AFileSystem extends FileSystem {
           getFileBlockLocations(status, 0, status.getLen())
           : null);
   }
-
-  /**
-   * Get a integer option >= the minimum allowed value.
-   * @param conf configuration
-   * @param key key to look up
-   * @param defVal default value
-   * @param min minimum value
-   * @return the value
-   * @throws IllegalArgumentException if the value is below the minimum
-   */
-  static int intOption(Configuration conf, String key, int defVal, int min) {
-    int v = conf.getInt(key, defVal);
-    Preconditions.checkArgument(v >= min,
-        String.format("Value of %s: %d is below the minimum value %d",
-            key, v, min));
-    return v;
-  }
-
-  /**
-   * Get a long option >= the minimum allowed value.
-   * @param conf configuration
-   * @param key key to look up
-   * @param defVal default value
-   * @param min minimum value
-   * @return the value
-   * @throws IllegalArgumentException if the value is below the minimum
-   */
-  static long longOption(Configuration conf,
-      String key,
-      long defVal,
-      long min) {
-    long v = conf.getLong(key, defVal);
-    Preconditions.checkArgument(v >= min,
-        String.format("Value of %s: %d is below the minimum value %d",
-            key, v, min));
-    return v;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index ccb9726..dd6cdd7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import com.google.common.base.Preconditions;
@@ -71,7 +71,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   private volatile boolean closed;
   private S3ObjectInputStream wrappedStream;
   private final FileSystem.Statistics stats;
-  private final AmazonS3Client client;
+  private final AmazonS3 client;
   private final String bucket;
   private final String key;
   private final long contentLength;
@@ -101,7 +101,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   public S3AInputStream(String bucket,
       String key,
       long contentLength,
-      AmazonS3Client client,
+      AmazonS3 client,
       FileSystem.Statistics stats,
       S3AInstrumentation instrumentation,
       long readahead,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 40d0b1b..93d819b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -25,6 +25,9 @@ import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import com.google.common.base.Preconditions;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -420,4 +423,41 @@ public final class S3AUtils {
     builder.append("size=").append(summary.getSize());
     return builder.toString();
   }
+
+  /**
+   * Get a integer option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static int intOption(Configuration conf, String key, int defVal, int min) {
+    int v = conf.getInt(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    return v;
+  }
+
+  /**
+   * Get a long option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static long longOption(Configuration conf,
+      String key,
+      long defVal,
+      long min) {
+    long v = conf.getLong(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    return v;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
new file mode 100644
index 0000000..0a4dd02
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.VersionInfo;
+
+import org.slf4j.Logger;
+
+/**
+ * Factory for creation of S3 client instances to be used by {@link S3Store}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+interface S3ClientFactory {
+
+  /**
+   * Creates a new {@link AmazonS3} client.  This method accepts the S3A file
+   * system URI both in raw input form and validated form as separate arguments,
+   * because both values may be useful in logging.
+   *
+   * @param name raw input S3A file system URI
+   * @param uri validated form of S3A file system URI
+   * @return S3 client
+   * @throws IOException IO problem
+   */
+  AmazonS3 createS3Client(URI name, URI uri) throws IOException;
+
+  /**
+   * The default factory implementation, which calls the AWS SDK to configure
+   * and create an {@link AmazonS3Client} that communicates with the S3 service.
+   */
+  static class DefaultS3ClientFactory extends Configured
+      implements S3ClientFactory {
+
+    private static final Logger LOG = S3AFileSystem.LOG;
+
+    @Override
+    public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
+      Configuration conf = getConf();
+      AWSCredentialsProvider credentials =
+          createAWSCredentialProviderSet(name, conf, uri);
+      ClientConfiguration awsConf = new ClientConfiguration();
+      initConnectionSettings(conf, awsConf);
+      initProxySupport(conf, awsConf);
+      initUserAgent(conf, awsConf);
+      return createAmazonS3Client(conf, credentials, awsConf);
+    }
+
+    /**
+     * Initializes all AWS SDK settings related to connection management.
+     *
+     * @param conf Hadoop configuration
+     * @param awsConf AWS SDK configuration
+     */
+    private static void initConnectionSettings(Configuration conf,
+        ClientConfiguration awsConf) {
+      awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+          DEFAULT_MAXIMUM_CONNECTIONS, 1));
+      boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+          DEFAULT_SECURE_CONNECTIONS);
+      awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
+      awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+          DEFAULT_MAX_ERROR_RETRIES, 0));
+      awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+          DEFAULT_ESTABLISH_TIMEOUT, 0));
+      awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+          DEFAULT_SOCKET_TIMEOUT, 0));
+      int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+          DEFAULT_SOCKET_SEND_BUFFER, 2048);
+      int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+          DEFAULT_SOCKET_RECV_BUFFER, 2048);
+      awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+      String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+      if (!signerOverride.isEmpty()) {
+        LOG.debug("Signer override = {}", signerOverride);
+        awsConf.setSignerOverride(signerOverride);
+      }
+    }
+
+    /**
+     * Initializes AWS SDK proxy support if configured.
+     *
+     * @param conf Hadoop configuration
+     * @param awsConf AWS SDK configuration
+     * @throws IllegalArgumentException if misconfigured
+     */
+    private static void initProxySupport(Configuration conf,
+        ClientConfiguration awsConf) throws IllegalArgumentException {
+      String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+      int proxyPort = conf.getInt(PROXY_PORT, -1);
+      if (!proxyHost.isEmpty()) {
+        awsConf.setProxyHost(proxyHost);
+        if (proxyPort >= 0) {
+          awsConf.setProxyPort(proxyPort);
+        } else {
+          if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+            LOG.warn("Proxy host set without port. Using HTTPS default 443");
+            awsConf.setProxyPort(443);
+          } else {
+            LOG.warn("Proxy host set without port. Using HTTP default 80");
+            awsConf.setProxyPort(80);
+          }
+        }
+        String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+        String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+        if ((proxyUsername == null) != (proxyPassword == null)) {
+          String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+              PROXY_PASSWORD + " set without the other.";
+          LOG.error(msg);
+          throw new IllegalArgumentException(msg);
+        }
+        awsConf.setProxyUsername(proxyUsername);
+        awsConf.setProxyPassword(proxyPassword);
+        awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+        awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                  "domain {} as workstation {}", awsConf.getProxyHost(),
+              awsConf.getProxyPort(),
+              String.valueOf(awsConf.getProxyUsername()),
+              awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+              awsConf.getProxyWorkstation());
+        }
+      } else if (proxyPort >= 0) {
+        String msg =
+            "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+    }
+
+    /**
+     * Initializes the User-Agent header to send in HTTP requests to the S3
+     * back-end.  We always include the Hadoop version number.  The user also
+     * may set an optional custom prefix to put in front of the Hadoop version
+     * number.  The AWS SDK interally appends its own information, which seems
+     * to include the AWS SDK version, OS and JVM version.
+     *
+     * @param conf Hadoop configuration
+     * @param awsConf AWS SDK configuration
+     */
+    private static void initUserAgent(Configuration conf,
+        ClientConfiguration awsConf) {
+      String userAgent = "Hadoop " + VersionInfo.getVersion();
+      String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+      if (!userAgentPrefix.isEmpty()) {
+        userAgent = userAgentPrefix + ", " + userAgent;
+      }
+      LOG.debug("Using User-Agent: {}", userAgent);
+      awsConf.setUserAgent(userAgent);
+    }
+
+    /**
+     * Creates an {@link AmazonS3Client} from the established configuration.
+     *
+     * @param conf Hadoop configuration
+     * @param credentials AWS credentials
+     * @param awsConf AWS SDK configuration
+     * @return S3 client
+     * @throws IllegalArgumentException if misconfigured
+     */
+    private static AmazonS3 createAmazonS3Client(Configuration conf,
+        AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+        throws IllegalArgumentException {
+      AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
+      String endPoint = conf.getTrimmed(ENDPOINT, "");
+      if (!endPoint.isEmpty()) {
+        try {
+          s3.setEndpoint(endPoint);
+        } catch (IllegalArgumentException e) {
+          String msg = "Incorrect endpoint: "  + e.getMessage();
+          LOG.error(msg);
+          throw new IllegalArgumentException(msg, e);
+        }
+      }
+      enablePathStyleAccessIfRequired(s3, conf);
+      return s3;
+    }
+
+    /**
+     * Enables path-style access to S3 buckets if configured.  By default, the
+     * behavior is to use virtual hosted-style access with URIs of the form
+     * http://bucketname.s3.amazonaws.com.  Enabling path-style access and a
+     * region-specific endpoint switches the behavior to use URIs of the form
+     * http://s3-eu-west-1.amazonaws.com/bucketname.
+     *
+     * @param s3 S3 client
+     * @param conf Hadoop configuration
+     */
+    private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+        Configuration conf) {
+      final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
+      if (pathStyleAccess) {
+        LOG.debug("Enabling path style access!");
+        s3.setS3ClientOptions(new S3ClientOptions().withPathStyleAccess(true));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java
index bc8c2e6..97ece37 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3native/S3xLoginHelper.java
@@ -132,6 +132,9 @@ public final class S3xLoginHelper {
    *
    * This strips out login information.
    *
+   * @param uri the URI to canonicalize
+   * @param defaultPort default port to use in canonicalized URI if the input
+   *     URI has no port and this value is greater than 0
    * @return a new, canonicalized URI.
    */
   public static URI canonicalizeUri(URI uri, int defaultPort) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
new file mode 100644
index 0000000..6734947
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Abstract base class for S3A unit tests using a mock S3 client.
+ */
+public abstract class AbstractS3AMockTest {
+
+  protected static final String BUCKET = "mock-bucket";
+  protected static final AmazonServiceException NOT_FOUND;
+  static {
+    NOT_FOUND = new AmazonServiceException("Not Found");
+    NOT_FOUND.setStatusCode(404);
+  }
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  protected S3AFileSystem fs;
+  protected AmazonS3 s3;
+
+  @Before
+  public void setup() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(S3_CLIENT_FACTORY_IMPL, MockS3ClientFactory.class,
+        S3ClientFactory.class);
+    fs = new S3AFileSystem();
+    URI uri = URI.create(FS_S3A + "://" + BUCKET);
+    fs.initialize(uri, conf);
+    s3 = fs.getAmazonS3Client();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index 9fadc1f..4404c60 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.ClientConfiguration;
-import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.S3ClientOptions;
 
 import org.apache.commons.lang.StringUtils;
@@ -96,7 +96,7 @@ public class ITestS3AConfiguration {
     } else {
       conf.set(Constants.ENDPOINT, endpoint);
       fs = S3ATestUtils.createTestFileSystem(conf);
-      AmazonS3Client s3 = fs.getAmazonS3Client();
+      AmazonS3 s3 = fs.getAmazonS3Client();
       String endPointRegion = "";
       // Differentiate handling of "s3-" and "s3." based endpoint identifiers
       String[] endpointParts = StringUtils.split(endpoint, '.');
@@ -362,7 +362,7 @@ public class ITestS3AConfiguration {
     try {
       fs = S3ATestUtils.createTestFileSystem(conf);
       assertNotNull(fs);
-      AmazonS3Client s3 = fs.getAmazonS3Client();
+      AmazonS3 s3 = fs.getAmazonS3Client();
       assertNotNull(s3);
       S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class,
           "clientOptions");
@@ -386,7 +386,7 @@ public class ITestS3AConfiguration {
     conf = new Configuration();
     fs = S3ATestUtils.createTestFileSystem(conf);
     assertNotNull(fs);
-    AmazonS3Client s3 = fs.getAmazonS3Client();
+    AmazonS3 s3 = fs.getAmazonS3Client();
     assertNotNull(s3);
     ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
         "clientConfiguration");
@@ -399,7 +399,7 @@ public class ITestS3AConfiguration {
     conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
     fs = S3ATestUtils.createTestFileSystem(conf);
     assertNotNull(fs);
-    AmazonS3Client s3 = fs.getAmazonS3Client();
+    AmazonS3 s3 = fs.getAmazonS3Client();
     assertNotNull(s3);
     ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
         "clientConfiguration");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
new file mode 100644
index 0000000..41f04ee
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/MockS3ClientFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.mockito.Mockito.*;
+
+import java.net.URI;
+
+import com.amazonaws.services.s3.AmazonS3;
+
+/**
+ * An {@link S3ClientFactory} that returns Mockito mocks of the {@link AmazonS3}
+ * interface suitable for unit testing.
+ */
+public class MockS3ClientFactory implements S3ClientFactory {
+
+  @Override
+  public AmazonS3 createS3Client(URI name, URI uri) {
+    String bucket = name.getHost();
+    AmazonS3 s3 = mock(AmazonS3.class);
+    when(s3.doesBucketExist(bucket)).thenReturn(true);
+    return s3;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67d8301e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
new file mode 100644
index 0000000..f9e9c6b
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.FileNotFoundException;
+import java.util.Collections;
+import java.util.Date;
+
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.junit.Test;
+
+/**
+ * S3A tests for getFileStatus using mock S3 client.
+ */
+public class TestS3AGetFileStatus extends AbstractS3AMockTest {
+
+  @Test
+  public void testFile() throws Exception {
+    Path path = new Path("/file");
+    String key = path.toUri().getPath().substring(1);
+    ObjectMetadata meta = new ObjectMetadata();
+    meta.setContentLength(1L);
+    meta.setLastModified(new Date(2L));
+    when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta);
+    FileStatus stat = fs.getFileStatus(path);
+    assertNotNull(stat);
+    assertEquals(fs.makeQualified(path), stat.getPath());
+    assertTrue(stat.isFile());
+    assertEquals(meta.getContentLength(), stat.getLen());
+    assertEquals(meta.getLastModified().getTime(), stat.getModificationTime());
+  }
+
+  @Test
+  public void testFakeDirectory() throws Exception {
+    Path path = new Path("/dir");
+    String key = path.toUri().getPath().substring(1);
+    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+    ObjectMetadata meta = new ObjectMetadata();
+    meta.setContentLength(0L);
+    when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta);
+    FileStatus stat = fs.getFileStatus(path);
+    assertNotNull(stat);
+    assertEquals(fs.makeQualified(path), stat.getPath());
+    assertTrue(stat.isDirectory());
+  }
+
+  @Test
+  public void testImplicitDirectory() throws Exception {
+    Path path = new Path("/dir");
+    String key = path.toUri().getPath().substring(1);
+    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+    ObjectListing objects = mock(ObjectListing.class);
+    when(objects.getCommonPrefixes()).thenReturn(
+        Collections.singletonList("dir/"));
+    when(objects.getObjectSummaries()).thenReturn(
+        Collections.<S3ObjectSummary>emptyList());
+    when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+    FileStatus stat = fs.getFileStatus(path);
+    assertNotNull(stat);
+    assertEquals(fs.makeQualified(path), stat.getPath());
+    assertTrue(stat.isDirectory());
+  }
+
+  @Test
+  public void testRoot() throws Exception {
+    Path path = new Path("/");
+    String key = path.toUri().getPath().substring(1);
+    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+    ObjectListing objects = mock(ObjectListing.class);
+    when(objects.getCommonPrefixes()).thenReturn(
+        Collections.<String>emptyList());
+    when(objects.getObjectSummaries()).thenReturn(
+        Collections.<S3ObjectSummary>emptyList());
+    when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+    FileStatus stat = fs.getFileStatus(path);
+    assertNotNull(stat);
+    assertEquals(fs.makeQualified(path), stat.getPath());
+    assertTrue(stat.isDirectory());
+    assertTrue(stat.getPath().isRoot());
+  }
+
+  @Test
+  public void testNotFound() throws Exception {
+    Path path = new Path("/dir");
+    String key = path.toUri().getPath().substring(1);
+    when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND);
+    when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND);
+    ObjectListing objects = mock(ObjectListing.class);
+    when(objects.getCommonPrefixes()).thenReturn(
+        Collections.<String>emptyList());
+    when(objects.getObjectSummaries()).thenReturn(
+        Collections.<S3ObjectSummary>emptyList());
+    when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+    exception.expect(FileNotFoundException.class);
+    fs.getFileStatus(path);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org