You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2016/12/06 06:55:08 UTC

hadoop git commit: HADOOP-13793. S3guard: add inconsistency injection, integration tests. Contributed by Aaron Fabbri

Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-13345 cfd0fbf13 -> 013a3c454


HADOOP-13793. S3guard: add inconsistency injection, integration tests. Contributed by Aaron Fabbri


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/013a3c45
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/013a3c45
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/013a3c45

Branch: refs/heads/HADOOP-13345
Commit: 013a3c4540f2622fc8182a214e19cdb407f21b8b
Parents: cfd0fbf
Author: Mingliang Liu <li...@apache.org>
Authored: Mon Dec 5 22:10:14 2016 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Mon Dec 5 22:53:54 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/s3a/Constants.java     |   2 +-
 .../hadoop/fs/s3a/DefaultS3ClientFactory.java   | 223 +++++++++++++++++++
 .../fs/s3a/InconsistentAmazonS3Client.java      | 189 ++++++++++++++++
 .../apache/hadoop/fs/s3a/S3ClientFactory.java   | 186 ----------------
 .../fs/s3a/ITestS3GuardListConsistency.java     |  79 +++++++
 .../fs/s3a/InconsistentS3ClientFactory.java     |  35 +++
 6 files changed, 527 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 518bd33..c102460 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -280,7 +280,7 @@ public final class Constants {
   @InterfaceStability.Unstable
   public static final Class<? extends S3ClientFactory>
       DEFAULT_S3_CLIENT_FACTORY_IMPL =
-          S3ClientFactory.DefaultS3ClientFactory.class;
+          DefaultS3ClientFactory.class;
 
   /**
    * Maximum number of partitions in a multipart upload: {@value}.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
new file mode 100644
index 0000000..a43a746
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
+
+/**
+ * The default factory implementation, which calls the AWS SDK to configure
+ * and create an {@link AmazonS3Client} that communicates with the S3 service.
+ */
+public class DefaultS3ClientFactory extends Configured implements
+    S3ClientFactory {
+
+  private static final Logger LOG = S3AFileSystem.LOG;
+
+  @Override
+  public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
+    Configuration conf = getConf();
+    AWSCredentialsProvider credentials =
+        createAWSCredentialProviderSet(name, conf, uri);
+    ClientConfiguration awsConf = new ClientConfiguration();
+    initConnectionSettings(conf, awsConf);
+    initProxySupport(conf, awsConf);
+    initUserAgent(conf, awsConf);
+    AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
+    return createAmazonS3Client(s3, conf, credentials, awsConf);
+  }
+
+  /**
+   * Wrapper around constructor for {@link AmazonS3} client.  Override this to
+   * provide an extended version of the client
+   * @param credentials credentials to use
+   * @param awsConf  AWS configuration
+   * @return  new AmazonS3 client
+   */
+  protected AmazonS3 newAmazonS3Client(
+      AWSCredentialsProvider credentials, ClientConfiguration awsConf) {
+    return new AmazonS3Client(credentials, awsConf);
+  }
+
+  /**
+   * Initializes all AWS SDK settings related to connection management.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration
+   */
+  private static void initConnectionSettings(Configuration conf,
+      ClientConfiguration awsConf) {
+    awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+        DEFAULT_MAXIMUM_CONNECTIONS, 1));
+    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+        DEFAULT_SECURE_CONNECTIONS);
+    awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
+    awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+        DEFAULT_MAX_ERROR_RETRIES, 0));
+    awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+        DEFAULT_ESTABLISH_TIMEOUT, 0));
+    awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+        DEFAULT_SOCKET_TIMEOUT, 0));
+    int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+        DEFAULT_SOCKET_SEND_BUFFER, 2048);
+    int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+        DEFAULT_SOCKET_RECV_BUFFER, 2048);
+    awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+    String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+    if (!signerOverride.isEmpty()) {
+      LOG.debug("Signer override = {}", signerOverride);
+      awsConf.setSignerOverride(signerOverride);
+    }
+  }
+
+  /**
+   * Initializes AWS SDK proxy support if configured.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration
+   * @throws IllegalArgumentException if misconfigured
+   */
+  private static void initProxySupport(Configuration conf,
+      ClientConfiguration awsConf) throws IllegalArgumentException {
+    String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+    int proxyPort = conf.getInt(PROXY_PORT, -1);
+    if (!proxyHost.isEmpty()) {
+      awsConf.setProxyHost(proxyHost);
+      if (proxyPort >= 0) {
+        awsConf.setProxyPort(proxyPort);
+      } else {
+        if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+          LOG.warn("Proxy host set without port. Using HTTPS default 443");
+          awsConf.setProxyPort(443);
+        } else {
+          LOG.warn("Proxy host set without port. Using HTTP default 80");
+          awsConf.setProxyPort(80);
+        }
+      }
+      String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+      String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+      if ((proxyUsername == null) != (proxyPassword == null)) {
+        String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+            PROXY_PASSWORD + " set without the other.";
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg);
+      }
+      awsConf.setProxyUsername(proxyUsername);
+      awsConf.setProxyPassword(proxyPassword);
+      awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+      awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+                "domain {} as workstation {}", awsConf.getProxyHost(),
+            awsConf.getProxyPort(),
+            String.valueOf(awsConf.getProxyUsername()),
+            awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+            awsConf.getProxyWorkstation());
+      }
+    } else if (proxyPort >= 0) {
+      String msg =
+          "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+      LOG.error(msg);
+      throw new IllegalArgumentException(msg);
+    }
+  }
+
+  /**
+   * Initializes the User-Agent header to send in HTTP requests to the S3
+   * back-end.  We always include the Hadoop version number.  The user also
+   * may set an optional custom prefix to put in front of the Hadoop version
+   * number.  The AWS SDK interally appends its own information, which seems
+   * to include the AWS SDK version, OS and JVM version.
+   *
+   * @param conf Hadoop configuration
+   * @param awsConf AWS SDK configuration
+   */
+  private static void initUserAgent(Configuration conf,
+      ClientConfiguration awsConf) {
+    String userAgent = "Hadoop " + VersionInfo.getVersion();
+    String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+    if (!userAgentPrefix.isEmpty()) {
+      userAgent = userAgentPrefix + ", " + userAgent;
+    }
+    LOG.debug("Using User-Agent: {}", userAgent);
+    awsConf.setUserAgentPrefix(userAgent);
+  }
+
+  /**
+   * Creates an {@link AmazonS3Client} from the established configuration.
+   *
+   * @param conf Hadoop configuration
+   * @param credentials AWS credentials
+   * @param awsConf AWS SDK configuration
+   * @return S3 client
+   * @throws IllegalArgumentException if misconfigured
+   */
+  private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
+      AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+      throws IllegalArgumentException {
+    String endPoint = conf.getTrimmed(ENDPOINT, "");
+    if (!endPoint.isEmpty()) {
+      try {
+        s3.setEndpoint(endPoint);
+      } catch (IllegalArgumentException e) {
+        String msg = "Incorrect endpoint: "  + e.getMessage();
+        LOG.error(msg);
+        throw new IllegalArgumentException(msg, e);
+      }
+    }
+    enablePathStyleAccessIfRequired(s3, conf);
+    return s3;
+  }
+
+  /**
+   * Enables path-style access to S3 buckets if configured.  By default, the
+   * behavior is to use virtual hosted-style access with URIs of the form
+   * http://bucketname.s3.amazonaws.com.  Enabling path-style access and a
+   * region-specific endpoint switches the behavior to use URIs of the form
+   * http://s3-eu-west-1.amazonaws.com/bucketname.
+   *
+   * @param s3 S3 client
+   * @param conf Hadoop configuration
+   */
+  private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+      Configuration conf) {
+    final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
+    if (pathStyleAccess) {
+      LOG.debug("Enabling path style access!");
+      s3.setS3ClientOptions(S3ClientOptions.builder()
+          .setPathStyleAccess(true)
+          .build());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
new file mode 100644
index 0000000..ebca268
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects
+ * inconsistency and/or errors.  Used for testing S3Guard.
+ * Currently only delays listing visibility, not affecting GET.
+ */
+public class InconsistentAmazonS3Client extends AmazonS3Client {
+
+  /**
+   * Keys containing this substring will be subject to delayed visibility.
+   */
+  public static final String DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
+
+  /**
+   * How many seconds affected keys will be delayed from appearing in listing.
+   * This should probably be a config value.
+   */
+  public static final long DELAY_KEY_MILLIS = 5 * 1000;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
+
+  /** Map of key to delay -> time it was created. */
+  private Map<String, Long> delayedKeys = new HashMap<>();
+
+  public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
+      ClientConfiguration clientConfiguration) {
+    super(credentials, clientConfiguration);
+  }
+
+  /* We should only need to override this version of putObject() */
+  @Override
+  public PutObjectResult putObject(PutObjectRequest putObjectRequest)
+      throws AmazonClientException, AmazonServiceException {
+    LOG.debug("key {}", putObjectRequest.getKey());
+    registerPutObject(putObjectRequest);
+    return super.putObject(putObjectRequest);
+  }
+
+  /* We should only need to override this version of listObjects() */
+  @Override
+  public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
+      throws AmazonClientException, AmazonServiceException {
+    LOG.debug("prefix {}", listObjectsRequest.getPrefix());
+    ObjectListing listing = super.listObjects(listObjectsRequest);
+    return filterListObjects(listObjectsRequest,
+        listing);
+  }
+
+
+  private ObjectListing filterListObjects(ListObjectsRequest request,
+      ObjectListing rawListing) {
+
+    // Filter object listing
+    List<S3ObjectSummary> outputList = new ArrayList<>();
+    for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
+      if (!isVisibilityDelayed(s.getKey())) {
+        outputList.add(s);
+      }
+    }
+
+    // Filter prefixes (directories)
+    List<String> outputPrefixes = new ArrayList<>();
+    for (String key : rawListing.getCommonPrefixes()) {
+      if (!isVisibilityDelayed(key)) {
+        outputPrefixes.add(key);
+      }
+    }
+
+    return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+  }
+
+  private boolean isVisibilityDelayed(String key) {
+    Long createTime = delayedKeys.get(key);
+    if (createTime == null) {
+      LOG.debug("no delay for key {}", key);
+      return false;
+    }
+    long currentTime = System.currentTimeMillis();
+    long deadline = createTime + DELAY_KEY_MILLIS;
+    if (currentTime >= deadline) {
+      delayedKeys.remove(key);
+      LOG.debug("{} no longer delayed", key);
+      return false;
+    } else  {
+      LOG.info("{} delaying visibility", key);
+      return true;
+    }
+  }
+
+  private void registerPutObject(PutObjectRequest req) {
+    String key = req.getKey();
+    if (shouldDelay(key)) {
+      enqueueDelayKey(key);
+    }
+  }
+
+  /**
+   * Should we delay listing visibility for this key?
+   * @param key key which is being put
+   * @return true if we should delay
+   */
+  private boolean shouldDelay(String key) {
+    boolean delay = key.contains(DELAY_KEY_SUBSTRING);
+    LOG.debug("{} -> {}", key, delay);
+    return delay;
+  }
+
+  /**
+   * Record this key as something that should not become visible in
+   * listObject replies for a while, to simulate eventual list consistency.
+   * @param key key to delay visibility of
+   */
+  private void enqueueDelayKey(String key) {
+    LOG.debug("key {}", key);
+    delayedKeys.put(key, System.currentTimeMillis());
+  }
+
+  /** Since ObjectListing is immutable, we just override it with wrapper. */
+  private static class CustomObjectListing extends ObjectListing {
+
+    private final List<S3ObjectSummary> customListing;
+    private final List<String> customPrefixes;
+
+    public CustomObjectListing(ObjectListing rawListing,
+        List<S3ObjectSummary> customListing, List<String> customPrefixes) {
+      super();
+      this.customListing = customListing;
+      this.customPrefixes = customPrefixes;
+
+      this.setBucketName(rawListing.getBucketName());
+      this.setCommonPrefixes(rawListing.getCommonPrefixes());
+      this.setDelimiter(rawListing.getDelimiter());
+      this.setEncodingType(rawListing.getEncodingType());
+      this.setMarker(rawListing.getMarker());
+      this.setMaxKeys(rawListing.getMaxKeys());
+      this.setNextMarker(rawListing.getNextMarker());
+      this.setPrefix(rawListing.getPrefix());
+      this.setTruncated(rawListing.isTruncated());
+    }
+
+    @Override
+    public List<S3ObjectSummary> getObjectSummaries() {
+      return customListing;
+    }
+
+    @Override
+    public List<String> getCommonPrefixes() {
+      return customPrefixes;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 871322d..5169840 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -18,26 +18,13 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-
 import java.io.IOException;
 import java.net.URI;
 
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.S3ClientOptions;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.VersionInfo;
-
-import org.slf4j.Logger;
 
 /**
  * Factory for creation of S3 client instances to be used by {@link S3Store}.
@@ -58,177 +45,4 @@ interface S3ClientFactory {
    */
   AmazonS3 createS3Client(URI name, URI uri) throws IOException;
 
-  /**
-   * The default factory implementation, which calls the AWS SDK to configure
-   * and create an {@link AmazonS3Client} that communicates with the S3 service.
-   */
-  static class DefaultS3ClientFactory extends Configured
-      implements S3ClientFactory {
-
-    private static final Logger LOG = S3AFileSystem.LOG;
-
-    @Override
-    public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
-      Configuration conf = getConf();
-      AWSCredentialsProvider credentials =
-          createAWSCredentialProviderSet(name, conf, uri);
-      ClientConfiguration awsConf = new ClientConfiguration();
-      initConnectionSettings(conf, awsConf);
-      initProxySupport(conf, awsConf);
-      initUserAgent(conf, awsConf);
-      return createAmazonS3Client(conf, credentials, awsConf);
-    }
-
-    /**
-     * Initializes all AWS SDK settings related to connection management.
-     *
-     * @param conf Hadoop configuration
-     * @param awsConf AWS SDK configuration
-     */
-    private static void initConnectionSettings(Configuration conf,
-        ClientConfiguration awsConf) {
-      awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
-          DEFAULT_MAXIMUM_CONNECTIONS, 1));
-      boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
-          DEFAULT_SECURE_CONNECTIONS);
-      awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
-      awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
-          DEFAULT_MAX_ERROR_RETRIES, 0));
-      awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
-          DEFAULT_ESTABLISH_TIMEOUT, 0));
-      awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
-          DEFAULT_SOCKET_TIMEOUT, 0));
-      int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
-          DEFAULT_SOCKET_SEND_BUFFER, 2048);
-      int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
-          DEFAULT_SOCKET_RECV_BUFFER, 2048);
-      awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
-      String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
-      if (!signerOverride.isEmpty()) {
-        LOG.debug("Signer override = {}", signerOverride);
-        awsConf.setSignerOverride(signerOverride);
-      }
-    }
-
-    /**
-     * Initializes AWS SDK proxy support if configured.
-     *
-     * @param conf Hadoop configuration
-     * @param awsConf AWS SDK configuration
-     * @throws IllegalArgumentException if misconfigured
-     */
-    private static void initProxySupport(Configuration conf,
-        ClientConfiguration awsConf) throws IllegalArgumentException {
-      String proxyHost = conf.getTrimmed(PROXY_HOST, "");
-      int proxyPort = conf.getInt(PROXY_PORT, -1);
-      if (!proxyHost.isEmpty()) {
-        awsConf.setProxyHost(proxyHost);
-        if (proxyPort >= 0) {
-          awsConf.setProxyPort(proxyPort);
-        } else {
-          if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
-            LOG.warn("Proxy host set without port. Using HTTPS default 443");
-            awsConf.setProxyPort(443);
-          } else {
-            LOG.warn("Proxy host set without port. Using HTTP default 80");
-            awsConf.setProxyPort(80);
-          }
-        }
-        String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
-        String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
-        if ((proxyUsername == null) != (proxyPassword == null)) {
-          String msg = "Proxy error: " + PROXY_USERNAME + " or " +
-              PROXY_PASSWORD + " set without the other.";
-          LOG.error(msg);
-          throw new IllegalArgumentException(msg);
-        }
-        awsConf.setProxyUsername(proxyUsername);
-        awsConf.setProxyPassword(proxyPassword);
-        awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
-        awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
-                  "domain {} as workstation {}", awsConf.getProxyHost(),
-              awsConf.getProxyPort(),
-              String.valueOf(awsConf.getProxyUsername()),
-              awsConf.getProxyPassword(), awsConf.getProxyDomain(),
-              awsConf.getProxyWorkstation());
-        }
-      } else if (proxyPort >= 0) {
-        String msg =
-            "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
-        LOG.error(msg);
-        throw new IllegalArgumentException(msg);
-      }
-    }
-
-    /**
-     * Initializes the User-Agent header to send in HTTP requests to the S3
-     * back-end.  We always include the Hadoop version number.  The user also
-     * may set an optional custom prefix to put in front of the Hadoop version
-     * number.  The AWS SDK interally appends its own information, which seems
-     * to include the AWS SDK version, OS and JVM version.
-     *
-     * @param conf Hadoop configuration
-     * @param awsConf AWS SDK configuration
-     */
-    private static void initUserAgent(Configuration conf,
-        ClientConfiguration awsConf) {
-      String userAgent = "Hadoop " + VersionInfo.getVersion();
-      String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
-      if (!userAgentPrefix.isEmpty()) {
-        userAgent = userAgentPrefix + ", " + userAgent;
-      }
-      LOG.debug("Using User-Agent: {}", userAgent);
-      awsConf.setUserAgentPrefix(userAgent);
-    }
-
-    /**
-     * Creates an {@link AmazonS3Client} from the established configuration.
-     *
-     * @param conf Hadoop configuration
-     * @param credentials AWS credentials
-     * @param awsConf AWS SDK configuration
-     * @return S3 client
-     * @throws IllegalArgumentException if misconfigured
-     */
-    private static AmazonS3 createAmazonS3Client(Configuration conf,
-        AWSCredentialsProvider credentials, ClientConfiguration awsConf)
-        throws IllegalArgumentException {
-      AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
-      String endPoint = conf.getTrimmed(ENDPOINT, "");
-      if (!endPoint.isEmpty()) {
-        try {
-          s3.setEndpoint(endPoint);
-        } catch (IllegalArgumentException e) {
-          String msg = "Incorrect endpoint: "  + e.getMessage();
-          LOG.error(msg);
-          throw new IllegalArgumentException(msg, e);
-        }
-      }
-      enablePathStyleAccessIfRequired(s3, conf);
-      return s3;
-    }
-
-    /**
-     * Enables path-style access to S3 buckets if configured.  By default, the
-     * behavior is to use virtual hosted-style access with URIs of the form
-     * http://bucketname.s3.amazonaws.com.  Enabling path-style access and a
-     * region-specific endpoint switches the behavior to use URIs of the form
-     * http://s3-eu-west-1.amazonaws.com/bucketname.
-     *
-     * @param s3 S3 client
-     * @param conf Hadoop configuration
-     */
-    private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
-        Configuration conf) {
-      final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
-      if (pathStyleAccess) {
-        LOG.debug("Enabling path style access!");
-        s3.setS3ClientOptions(S3ClientOptions.builder()
-            .setPathStyleAccess(true)
-            .build());
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
new file mode 100644
index 0000000..0a9ee4f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Test S3Guard list consistency feature by injecting delayed listObjects()
+ * visibility via {@link InconsistentAmazonS3Client}.
+ */
+public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
+        S3ClientFactory.class);
+    return new S3AContract(conf);
+  }
+
+  @Test
+  public void testConsistentList() throws Exception {
+
+    S3AFileSystem fs = getFileSystem();
+
+    // This test will fail if NullMetadataStore (the default) is configured:
+    // skip it.
+    Assume.assumeTrue(fs.isMetadataStoreConfigured());
+
+    // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+    // in listObjects() results via InconsistentS3Client
+    Path inconsistentPath =
+        path("a/b/dir3-" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING);
+
+    Path[] testDirs = {path("a/b/dir1"),
+        path("a/b/dir2"),
+        inconsistentPath};
+
+    for (Path path : testDirs) {
+      assertTrue(fs.mkdirs(path));
+    }
+
+    FileStatus[] paths = fs.listStatus(path("a/b/"));
+    List<Path> list = new ArrayList<>();
+    for (FileStatus fileState : paths) {
+      list.add(fileState.getPath());
+    }
+    assertTrue(list.contains(path("a/b/dir1")));
+    assertTrue(list.contains(path("a/b/dir2")));
+    // This should fail without S3Guard, and succeed with it.
+    assertTrue(list.contains(inconsistentPath));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
new file mode 100644
index 0000000..88a9c78
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+
+/**
+ * S3 Client factory used for testing with eventual consistency fault injection.
+ */
+public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
+
+  @Override
+  protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
+      ClientConfiguration awsConf) {
+    return new InconsistentAmazonS3Client(credentials, awsConf);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org