You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2016/12/06 06:55:08 UTC
hadoop git commit: HADOOP-13793. S3guard: add inconsistency injection,
integration tests. Contributed by Aaron Fabbri
Repository: hadoop
Updated Branches:
refs/heads/HADOOP-13345 cfd0fbf13 -> 013a3c454
HADOOP-13793. S3guard: add inconsistency injection, integration tests. Contributed by Aaron Fabbri
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/013a3c45
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/013a3c45
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/013a3c45
Branch: refs/heads/HADOOP-13345
Commit: 013a3c4540f2622fc8182a214e19cdb407f21b8b
Parents: cfd0fbf
Author: Mingliang Liu <li...@apache.org>
Authored: Mon Dec 5 22:10:14 2016 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Mon Dec 5 22:53:54 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/fs/s3a/Constants.java | 2 +-
.../hadoop/fs/s3a/DefaultS3ClientFactory.java | 223 +++++++++++++++++++
.../fs/s3a/InconsistentAmazonS3Client.java | 189 ++++++++++++++++
.../apache/hadoop/fs/s3a/S3ClientFactory.java | 186 ----------------
.../fs/s3a/ITestS3GuardListConsistency.java | 79 +++++++
.../fs/s3a/InconsistentS3ClientFactory.java | 35 +++
6 files changed, 527 insertions(+), 187 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 518bd33..c102460 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -280,7 +280,7 @@ public final class Constants {
@InterfaceStability.Unstable
public static final Class<? extends S3ClientFactory>
DEFAULT_S3_CLIENT_FACTORY_IMPL =
- S3ClientFactory.DefaultS3ClientFactory.class;
+ DefaultS3ClientFactory.class;
/**
* Maximum number of partitions in a multipart upload: {@value}.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
new file mode 100644
index 0000000..a43a746
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.createAWSCredentialProviderSet;
+import static org.apache.hadoop.fs.s3a.S3AUtils.intOption;
+
+/**
+ * The default factory implementation, which calls the AWS SDK to configure
+ * and create an {@link AmazonS3Client} that communicates with the S3 service.
+ */
+public class DefaultS3ClientFactory extends Configured implements
+ S3ClientFactory {
+
+ private static final Logger LOG = S3AFileSystem.LOG;
+
+ @Override
+ public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
+ Configuration conf = getConf();
+ AWSCredentialsProvider credentials =
+ createAWSCredentialProviderSet(name, conf, uri);
+ ClientConfiguration awsConf = new ClientConfiguration();
+ initConnectionSettings(conf, awsConf);
+ initProxySupport(conf, awsConf);
+ initUserAgent(conf, awsConf);
+ AmazonS3 s3 = newAmazonS3Client(credentials, awsConf);
+ return createAmazonS3Client(s3, conf, credentials, awsConf);
+ }
+
+ /**
+ * Wrapper around constructor for {@link AmazonS3} client. Override this to
+ * provide an extended version of the client
+ * @param credentials credentials to use
+ * @param awsConf AWS configuration
+ * @return new AmazonS3 client
+ */
+ protected AmazonS3 newAmazonS3Client(
+ AWSCredentialsProvider credentials, ClientConfiguration awsConf) {
+ return new AmazonS3Client(credentials, awsConf);
+ }
+
+ /**
+ * Initializes all AWS SDK settings related to connection management.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ */
+ private static void initConnectionSettings(Configuration conf,
+ ClientConfiguration awsConf) {
+ awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+ DEFAULT_MAXIMUM_CONNECTIONS, 1));
+ boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+ DEFAULT_SECURE_CONNECTIONS);
+ awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
+ awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+ DEFAULT_MAX_ERROR_RETRIES, 0));
+ awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+ DEFAULT_ESTABLISH_TIMEOUT, 0));
+ awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+ DEFAULT_SOCKET_TIMEOUT, 0));
+ int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
+ DEFAULT_SOCKET_SEND_BUFFER, 2048);
+ int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
+ DEFAULT_SOCKET_RECV_BUFFER, 2048);
+ awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
+ String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+ if (!signerOverride.isEmpty()) {
+ LOG.debug("Signer override = {}", signerOverride);
+ awsConf.setSignerOverride(signerOverride);
+ }
+ }
+
+ /**
+ * Initializes AWS SDK proxy support if configured.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ * @throws IllegalArgumentException if misconfigured
+ */
+ private static void initProxySupport(Configuration conf,
+ ClientConfiguration awsConf) throws IllegalArgumentException {
+ String proxyHost = conf.getTrimmed(PROXY_HOST, "");
+ int proxyPort = conf.getInt(PROXY_PORT, -1);
+ if (!proxyHost.isEmpty()) {
+ awsConf.setProxyHost(proxyHost);
+ if (proxyPort >= 0) {
+ awsConf.setProxyPort(proxyPort);
+ } else {
+ if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
+ LOG.warn("Proxy host set without port. Using HTTPS default 443");
+ awsConf.setProxyPort(443);
+ } else {
+ LOG.warn("Proxy host set without port. Using HTTP default 80");
+ awsConf.setProxyPort(80);
+ }
+ }
+ String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
+ String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
+ if ((proxyUsername == null) != (proxyPassword == null)) {
+ String msg = "Proxy error: " + PROXY_USERNAME + " or " +
+ PROXY_PASSWORD + " set without the other.";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ awsConf.setProxyUsername(proxyUsername);
+ awsConf.setProxyPassword(proxyPassword);
+ awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
+ awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
+ "domain {} as workstation {}", awsConf.getProxyHost(),
+ awsConf.getProxyPort(),
+ String.valueOf(awsConf.getProxyUsername()),
+ awsConf.getProxyPassword(), awsConf.getProxyDomain(),
+ awsConf.getProxyWorkstation());
+ }
+ } else if (proxyPort >= 0) {
+ String msg =
+ "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ /**
+ * Initializes the User-Agent header to send in HTTP requests to the S3
+ * back-end. We always include the Hadoop version number. The user also
+ * may set an optional custom prefix to put in front of the Hadoop version
+ * number. The AWS SDK interally appends its own information, which seems
+ * to include the AWS SDK version, OS and JVM version.
+ *
+ * @param conf Hadoop configuration
+ * @param awsConf AWS SDK configuration
+ */
+ private static void initUserAgent(Configuration conf,
+ ClientConfiguration awsConf) {
+ String userAgent = "Hadoop " + VersionInfo.getVersion();
+ String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
+ if (!userAgentPrefix.isEmpty()) {
+ userAgent = userAgentPrefix + ", " + userAgent;
+ }
+ LOG.debug("Using User-Agent: {}", userAgent);
+ awsConf.setUserAgentPrefix(userAgent);
+ }
+
+ /**
+ * Creates an {@link AmazonS3Client} from the established configuration.
+ *
+ * @param conf Hadoop configuration
+ * @param credentials AWS credentials
+ * @param awsConf AWS SDK configuration
+ * @return S3 client
+ * @throws IllegalArgumentException if misconfigured
+ */
+ private static AmazonS3 createAmazonS3Client(AmazonS3 s3, Configuration conf,
+ AWSCredentialsProvider credentials, ClientConfiguration awsConf)
+ throws IllegalArgumentException {
+ String endPoint = conf.getTrimmed(ENDPOINT, "");
+ if (!endPoint.isEmpty()) {
+ try {
+ s3.setEndpoint(endPoint);
+ } catch (IllegalArgumentException e) {
+ String msg = "Incorrect endpoint: " + e.getMessage();
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg, e);
+ }
+ }
+ enablePathStyleAccessIfRequired(s3, conf);
+ return s3;
+ }
+
+ /**
+ * Enables path-style access to S3 buckets if configured. By default, the
+ * behavior is to use virtual hosted-style access with URIs of the form
+ * http://bucketname.s3.amazonaws.com. Enabling path-style access and a
+ * region-specific endpoint switches the behavior to use URIs of the form
+ * http://s3-eu-west-1.amazonaws.com/bucketname.
+ *
+ * @param s3 S3 client
+ * @param conf Hadoop configuration
+ */
+ private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
+ Configuration conf) {
+ final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
+ if (pathStyleAccess) {
+ LOG.debug("Enabling path style access!");
+ s3.setS3ClientOptions(S3ClientOptions.builder()
+ .setPathStyleAccess(true)
+ .build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
new file mode 100644
index 0000000..ebca268
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects
+ * inconsistency and/or errors. Used for testing S3Guard.
+ * Currently only delays listing visibility, not affecting GET.
+ */
+public class InconsistentAmazonS3Client extends AmazonS3Client {
+
+ /**
+ * Keys containing this substring will be subject to delayed visibility.
+ */
+ public static final String DELAY_KEY_SUBSTRING = "DELAY_LISTING_ME";
+
+ /**
+ * How many seconds affected keys will be delayed from appearing in listing.
+ * This should probably be a config value.
+ */
+ public static final long DELAY_KEY_MILLIS = 5 * 1000;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(InconsistentAmazonS3Client.class);
+
+ /** Map of key to delay -> time it was created. */
+ private Map<String, Long> delayedKeys = new HashMap<>();
+
+ public InconsistentAmazonS3Client(AWSCredentialsProvider credentials,
+ ClientConfiguration clientConfiguration) {
+ super(credentials, clientConfiguration);
+ }
+
+ /* We should only need to override this version of putObject() */
+ @Override
+ public PutObjectResult putObject(PutObjectRequest putObjectRequest)
+ throws AmazonClientException, AmazonServiceException {
+ LOG.debug("key {}", putObjectRequest.getKey());
+ registerPutObject(putObjectRequest);
+ return super.putObject(putObjectRequest);
+ }
+
+ /* We should only need to override this version of listObjects() */
+ @Override
+ public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
+ throws AmazonClientException, AmazonServiceException {
+ LOG.debug("prefix {}", listObjectsRequest.getPrefix());
+ ObjectListing listing = super.listObjects(listObjectsRequest);
+ return filterListObjects(listObjectsRequest,
+ listing);
+ }
+
+
+ private ObjectListing filterListObjects(ListObjectsRequest request,
+ ObjectListing rawListing) {
+
+ // Filter object listing
+ List<S3ObjectSummary> outputList = new ArrayList<>();
+ for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
+ if (!isVisibilityDelayed(s.getKey())) {
+ outputList.add(s);
+ }
+ }
+
+ // Filter prefixes (directories)
+ List<String> outputPrefixes = new ArrayList<>();
+ for (String key : rawListing.getCommonPrefixes()) {
+ if (!isVisibilityDelayed(key)) {
+ outputPrefixes.add(key);
+ }
+ }
+
+ return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+ }
+
+ private boolean isVisibilityDelayed(String key) {
+ Long createTime = delayedKeys.get(key);
+ if (createTime == null) {
+ LOG.debug("no delay for key {}", key);
+ return false;
+ }
+ long currentTime = System.currentTimeMillis();
+ long deadline = createTime + DELAY_KEY_MILLIS;
+ if (currentTime >= deadline) {
+ delayedKeys.remove(key);
+ LOG.debug("{} no longer delayed", key);
+ return false;
+ } else {
+ LOG.info("{} delaying visibility", key);
+ return true;
+ }
+ }
+
+ private void registerPutObject(PutObjectRequest req) {
+ String key = req.getKey();
+ if (shouldDelay(key)) {
+ enqueueDelayKey(key);
+ }
+ }
+
+ /**
+ * Should we delay listing visibility for this key?
+ * @param key key which is being put
+ * @return true if we should delay
+ */
+ private boolean shouldDelay(String key) {
+ boolean delay = key.contains(DELAY_KEY_SUBSTRING);
+ LOG.debug("{} -> {}", key, delay);
+ return delay;
+ }
+
+ /**
+ * Record this key as something that should not become visible in
+ * listObject replies for a while, to simulate eventual list consistency.
+ * @param key key to delay visibility of
+ */
+ private void enqueueDelayKey(String key) {
+ LOG.debug("key {}", key);
+ delayedKeys.put(key, System.currentTimeMillis());
+ }
+
+ /** Since ObjectListing is immutable, we just override it with wrapper. */
+ private static class CustomObjectListing extends ObjectListing {
+
+ private final List<S3ObjectSummary> customListing;
+ private final List<String> customPrefixes;
+
+ public CustomObjectListing(ObjectListing rawListing,
+ List<S3ObjectSummary> customListing, List<String> customPrefixes) {
+ super();
+ this.customListing = customListing;
+ this.customPrefixes = customPrefixes;
+
+ this.setBucketName(rawListing.getBucketName());
+ this.setCommonPrefixes(rawListing.getCommonPrefixes());
+ this.setDelimiter(rawListing.getDelimiter());
+ this.setEncodingType(rawListing.getEncodingType());
+ this.setMarker(rawListing.getMarker());
+ this.setMaxKeys(rawListing.getMaxKeys());
+ this.setNextMarker(rawListing.getNextMarker());
+ this.setPrefix(rawListing.getPrefix());
+ this.setTruncated(rawListing.isTruncated());
+ }
+
+ @Override
+ public List<S3ObjectSummary> getObjectSummaries() {
+ return customListing;
+ }
+
+ @Override
+ public List<String> getCommonPrefixes() {
+ return customPrefixes;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
index 871322d..5169840 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java
@@ -18,26 +18,13 @@
package org.apache.hadoop.fs.s3a;
-import static org.apache.hadoop.fs.s3a.Constants.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
-
import java.io.IOException;
import java.net.URI;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.util.VersionInfo;
-
-import org.slf4j.Logger;
/**
* Factory for creation of S3 client instances to be used by {@link S3Store}.
@@ -58,177 +45,4 @@ interface S3ClientFactory {
*/
AmazonS3 createS3Client(URI name, URI uri) throws IOException;
- /**
- * The default factory implementation, which calls the AWS SDK to configure
- * and create an {@link AmazonS3Client} that communicates with the S3 service.
- */
- static class DefaultS3ClientFactory extends Configured
- implements S3ClientFactory {
-
- private static final Logger LOG = S3AFileSystem.LOG;
-
- @Override
- public AmazonS3 createS3Client(URI name, URI uri) throws IOException {
- Configuration conf = getConf();
- AWSCredentialsProvider credentials =
- createAWSCredentialProviderSet(name, conf, uri);
- ClientConfiguration awsConf = new ClientConfiguration();
- initConnectionSettings(conf, awsConf);
- initProxySupport(conf, awsConf);
- initUserAgent(conf, awsConf);
- return createAmazonS3Client(conf, credentials, awsConf);
- }
-
- /**
- * Initializes all AWS SDK settings related to connection management.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- */
- private static void initConnectionSettings(Configuration conf,
- ClientConfiguration awsConf) {
- awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
- DEFAULT_MAXIMUM_CONNECTIONS, 1));
- boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
- DEFAULT_SECURE_CONNECTIONS);
- awsConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
- awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
- DEFAULT_MAX_ERROR_RETRIES, 0));
- awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
- DEFAULT_ESTABLISH_TIMEOUT, 0));
- awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
- DEFAULT_SOCKET_TIMEOUT, 0));
- int sockSendBuffer = intOption(conf, SOCKET_SEND_BUFFER,
- DEFAULT_SOCKET_SEND_BUFFER, 2048);
- int sockRecvBuffer = intOption(conf, SOCKET_RECV_BUFFER,
- DEFAULT_SOCKET_RECV_BUFFER, 2048);
- awsConf.setSocketBufferSizeHints(sockSendBuffer, sockRecvBuffer);
- String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
- if (!signerOverride.isEmpty()) {
- LOG.debug("Signer override = {}", signerOverride);
- awsConf.setSignerOverride(signerOverride);
- }
- }
-
- /**
- * Initializes AWS SDK proxy support if configured.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- * @throws IllegalArgumentException if misconfigured
- */
- private static void initProxySupport(Configuration conf,
- ClientConfiguration awsConf) throws IllegalArgumentException {
- String proxyHost = conf.getTrimmed(PROXY_HOST, "");
- int proxyPort = conf.getInt(PROXY_PORT, -1);
- if (!proxyHost.isEmpty()) {
- awsConf.setProxyHost(proxyHost);
- if (proxyPort >= 0) {
- awsConf.setProxyPort(proxyPort);
- } else {
- if (conf.getBoolean(SECURE_CONNECTIONS, DEFAULT_SECURE_CONNECTIONS)) {
- LOG.warn("Proxy host set without port. Using HTTPS default 443");
- awsConf.setProxyPort(443);
- } else {
- LOG.warn("Proxy host set without port. Using HTTP default 80");
- awsConf.setProxyPort(80);
- }
- }
- String proxyUsername = conf.getTrimmed(PROXY_USERNAME);
- String proxyPassword = conf.getTrimmed(PROXY_PASSWORD);
- if ((proxyUsername == null) != (proxyPassword == null)) {
- String msg = "Proxy error: " + PROXY_USERNAME + " or " +
- PROXY_PASSWORD + " set without the other.";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- awsConf.setProxyUsername(proxyUsername);
- awsConf.setProxyPassword(proxyPassword);
- awsConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN));
- awsConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
- "domain {} as workstation {}", awsConf.getProxyHost(),
- awsConf.getProxyPort(),
- String.valueOf(awsConf.getProxyUsername()),
- awsConf.getProxyPassword(), awsConf.getProxyDomain(),
- awsConf.getProxyWorkstation());
- }
- } else if (proxyPort >= 0) {
- String msg =
- "Proxy error: " + PROXY_PORT + " set without " + PROXY_HOST;
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- }
-
- /**
- * Initializes the User-Agent header to send in HTTP requests to the S3
- * back-end. We always include the Hadoop version number. The user also
- * may set an optional custom prefix to put in front of the Hadoop version
- * number. The AWS SDK interally appends its own information, which seems
- * to include the AWS SDK version, OS and JVM version.
- *
- * @param conf Hadoop configuration
- * @param awsConf AWS SDK configuration
- */
- private static void initUserAgent(Configuration conf,
- ClientConfiguration awsConf) {
- String userAgent = "Hadoop " + VersionInfo.getVersion();
- String userAgentPrefix = conf.getTrimmed(USER_AGENT_PREFIX, "");
- if (!userAgentPrefix.isEmpty()) {
- userAgent = userAgentPrefix + ", " + userAgent;
- }
- LOG.debug("Using User-Agent: {}", userAgent);
- awsConf.setUserAgentPrefix(userAgent);
- }
-
- /**
- * Creates an {@link AmazonS3Client} from the established configuration.
- *
- * @param conf Hadoop configuration
- * @param credentials AWS credentials
- * @param awsConf AWS SDK configuration
- * @return S3 client
- * @throws IllegalArgumentException if misconfigured
- */
- private static AmazonS3 createAmazonS3Client(Configuration conf,
- AWSCredentialsProvider credentials, ClientConfiguration awsConf)
- throws IllegalArgumentException {
- AmazonS3 s3 = new AmazonS3Client(credentials, awsConf);
- String endPoint = conf.getTrimmed(ENDPOINT, "");
- if (!endPoint.isEmpty()) {
- try {
- s3.setEndpoint(endPoint);
- } catch (IllegalArgumentException e) {
- String msg = "Incorrect endpoint: " + e.getMessage();
- LOG.error(msg);
- throw new IllegalArgumentException(msg, e);
- }
- }
- enablePathStyleAccessIfRequired(s3, conf);
- return s3;
- }
-
- /**
- * Enables path-style access to S3 buckets if configured. By default, the
- * behavior is to use virtual hosted-style access with URIs of the form
- * http://bucketname.s3.amazonaws.com. Enabling path-style access and a
- * region-specific endpoint switches the behavior to use URIs of the form
- * http://s3-eu-west-1.amazonaws.com/bucketname.
- *
- * @param s3 S3 client
- * @param conf Hadoop configuration
- */
- private static void enablePathStyleAccessIfRequired(AmazonS3 s3,
- Configuration conf) {
- final boolean pathStyleAccess = conf.getBoolean(PATH_STYLE_ACCESS, false);
- if (pathStyleAccess) {
- LOG.debug("Enabling path style access!");
- s3.setS3ClientOptions(S3ClientOptions.builder()
- .setPathStyleAccess(true)
- .build());
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
new file mode 100644
index 0000000..0a9ee4f
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Assume;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Test S3Guard list consistency feature by injecting delayed listObjects()
+ * visibility via {@link InconsistentAmazonS3Client}.
+ */
+public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
+ S3ClientFactory.class);
+ return new S3AContract(conf);
+ }
+
+ @Test
+ public void testConsistentList() throws Exception {
+
+ S3AFileSystem fs = getFileSystem();
+
+ // This test will fail if NullMetadataStore (the default) is configured:
+ // skip it.
+ Assume.assumeTrue(fs.isMetadataStoreConfigured());
+
+ // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+ // in listObjects() results via InconsistentS3Client
+ Path inconsistentPath =
+ path("a/b/dir3-" + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING);
+
+ Path[] testDirs = {path("a/b/dir1"),
+ path("a/b/dir2"),
+ inconsistentPath};
+
+ for (Path path : testDirs) {
+ assertTrue(fs.mkdirs(path));
+ }
+
+ FileStatus[] paths = fs.listStatus(path("a/b/"));
+ List<Path> list = new ArrayList<>();
+ for (FileStatus fileState : paths) {
+ list.add(fileState.getPath());
+ }
+ assertTrue(list.contains(path("a/b/dir1")));
+ assertTrue(list.contains(path("a/b/dir2")));
+ // This should fail without S3Guard, and succeed with it.
+ assertTrue(list.contains(inconsistentPath));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/013a3c45/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
new file mode 100644
index 0000000..88a9c78
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/InconsistentS3ClientFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+
+/**
+ * S3 Client factory used for testing with eventual consistency fault injection.
+ */
+public class InconsistentS3ClientFactory extends DefaultS3ClientFactory {
+
+ @Override
+ protected AmazonS3 newAmazonS3Client(AWSCredentialsProvider credentials,
+ ClientConfiguration awsConf) {
+ return new InconsistentAmazonS3Client(credentials, awsConf);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org