You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/03/25 14:17:31 UTC

[beam] branch master updated: [BEAM-6266] Don't require --awsRegion when S3 isn't used on the new AWS SDK.

This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b036fc7  [BEAM-6266] Don't require --awsRegion when S3 isn't used on the new AWS SDK.
     new 69b8bc1  Merge pull request #8126: [BEAM-6266] Don't require --awsRegion when S3 isn't used on the new AWS SDK
b036fc7 is described below

commit b036fc78bbbd90ae16dc3f6d7c5203f07d078905
Author: Mike Kaplinskiy <mi...@ladderlife.com>
AuthorDate: Sun Mar 24 19:03:53 2019 -0700

    [BEAM-6266] Don't require --awsRegion when S3 isn't used on the new AWS SDK.
---
 .../io/aws/s3/DefaultS3ClientBuilderFactory.java   | 14 ++++++--
 .../apache/beam/sdk/io/aws/s3/S3FileSystem.java    | 41 +++++++++++-----------
 2 files changed, 31 insertions(+), 24 deletions(-)

diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java
index eaca226..8416304 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java
@@ -22,6 +22,8 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import org.apache.beam.sdk.io.aws.options.S3ClientBuilderFactory;
 import org.apache.beam.sdk.io.aws.options.S3Options;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Construct AmazonS3ClientBuilder with default values of S3 client properties like path style
@@ -29,6 +31,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
  */
 public class DefaultS3ClientBuilderFactory implements S3ClientBuilderFactory {
 
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultS3ClientBuilderFactory.class);
+
   @Override
   public AmazonS3ClientBuilder createBuilder(S3Options s3Options) {
     AmazonS3ClientBuilder builder =
@@ -38,13 +42,17 @@ public class DefaultS3ClientBuilderFactory implements S3ClientBuilderFactory {
       builder = builder.withClientConfiguration(s3Options.getClientConfiguration());
     }
 
-    if (Strings.isNullOrEmpty(s3Options.getAwsServiceEndpoint())) {
-      builder = builder.withRegion(s3Options.getAwsRegion());
-    } else {
+    if (!Strings.isNullOrEmpty(s3Options.getAwsServiceEndpoint())) {
       builder =
           builder.withEndpointConfiguration(
               new AwsClientBuilder.EndpointConfiguration(
                   s3Options.getAwsServiceEndpoint(), s3Options.getAwsRegion()));
+    } else if (!Strings.isNullOrEmpty(s3Options.getAwsRegion())) {
+      builder = builder.withRegion(s3Options.getAwsRegion());
+    } else {
+      LOG.info(
+          "The AWS S3 Beam extension was included in this build, but the awsRegion flag "
+              + "was not specified. If you don't plan to use S3, then ignore this message.");
     }
     return builder;
   }
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 39ebcbb..276d64b 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -23,6 +23,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -69,6 +70,8 @@ import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.MoreFutures;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Suppliers;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
@@ -95,23 +98,19 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
       ImmutableSet.of("gzip");
 
   // Non-final for testing.
-  private AmazonS3 amazonS3;
+  private Supplier<AmazonS3> amazonS3;
   private final S3Options options;
   private final ListeningExecutorService executorService;
 
   S3FileSystem(S3Options options) {
     this.options = checkNotNull(options, "options");
-    if (Strings.isNullOrEmpty(options.getAwsRegion())) {
-      LOG.info(
-          "The AWS S3 Beam extension was included in this build, but the awsRegion flag "
-              + "was not specified. If you don't plan to use S3, then ignore this message.");
-    }
-    this.amazonS3 =
+    AmazonS3ClientBuilder builder =
         InstanceBuilder.ofType(S3ClientBuilderFactory.class)
             .fromClass(options.getS3ClientFactoryClass())
             .build()
-            .createBuilder(options)
-            .build();
+            .createBuilder(options);
+    // The Supplier is to make sure we don't call .build() unless we are actually using S3.
+    amazonS3 = Suppliers.memoize(builder::build);
 
     checkNotNull(options.getS3StorageClass(), "storageClass");
     checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize");
@@ -128,12 +127,12 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
 
   @VisibleForTesting
   void setAmazonS3Client(AmazonS3 amazonS3) {
-    this.amazonS3 = amazonS3;
+    this.amazonS3 = Suppliers.ofInstance(amazonS3);
   }
 
   @VisibleForTesting
   AmazonS3 getAmazonS3Client() {
-    return this.amazonS3;
+    return this.amazonS3.get();
   }
 
   @Override
@@ -308,7 +307,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
               .withContinuationToken(continuationToken);
       ListObjectsV2Result result;
       try {
-        result = amazonS3.listObjectsV2(request);
+        result = amazonS3.get().listObjectsV2(request);
       } catch (AmazonClientException e) {
         return ExpandedGlob.create(glob, new IOException(e));
       }
@@ -356,7 +355,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
     GetObjectMetadataRequest request =
         new GetObjectMetadataRequest(s3ResourceId.getBucket(), s3ResourceId.getKey());
     request.setSSECustomerKey(options.getSSECustomerKey());
-    return amazonS3.getObjectMetadata(request);
+    return amazonS3.get().getObjectMetadata(request);
   }
 
   @VisibleForTesting
@@ -459,12 +458,12 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
   @Override
   protected WritableByteChannel create(S3ResourceId resourceId, CreateOptions createOptions)
       throws IOException {
-    return new S3WritableByteChannel(amazonS3, resourceId, createOptions.mimeType(), options);
+    return new S3WritableByteChannel(amazonS3.get(), resourceId, createOptions.mimeType(), options);
   }
 
   @Override
   protected ReadableByteChannel open(S3ResourceId resourceId) throws IOException {
-    return new S3ReadableSeekableByteChannel(amazonS3, resourceId, options);
+    return new S3ReadableSeekableByteChannel(amazonS3.get(), resourceId, options);
   }
 
   @Override
@@ -520,7 +519,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
     copyObjectRequest.setStorageClass(options.getS3StorageClass());
     copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
     copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
-    return amazonS3.copyObject(copyObjectRequest);
+    return amazonS3.get().copyObject(copyObjectRequest);
   }
 
   @VisibleForTesting
@@ -534,7 +533,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
     initiateUploadRequest.setSSECustomerKey(options.getSSECustomerKey());
 
     InitiateMultipartUploadResult initiateUploadResult =
-        amazonS3.initiateMultipartUpload(initiateUploadRequest);
+        amazonS3.get().initiateMultipartUpload(initiateUploadRequest);
     final String uploadId = initiateUploadResult.getUploadId();
 
     List<PartETag> eTags = new ArrayList<>();
@@ -554,7 +553,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
       copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
       copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
 
-      CopyPartResult copyPartResult = amazonS3.copyPart(copyPartRequest);
+      CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest);
       eTags.add(copyPartResult.getPartETag());
     } else {
       long bytePosition = 0;
@@ -574,7 +573,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
         copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
         copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
 
-        CopyPartResult copyPartResult = amazonS3.copyPart(copyPartRequest);
+        CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest);
         eTags.add(copyPartResult.getPartETag());
 
         bytePosition += uploadBufferSizeBytes;
@@ -587,7 +586,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
             .withKey(destinationPath.getKey())
             .withUploadId(uploadId)
             .withPartETags(eTags);
-    return amazonS3.completeMultipartUpload(completeUploadRequest);
+    return amazonS3.get().completeMultipartUpload(completeUploadRequest);
   }
 
   @Override
@@ -634,7 +633,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
         keys.stream().map(KeyVersion::new).collect(Collectors.toList());
     DeleteObjectsRequest request = new DeleteObjectsRequest(bucket).withKeys(deleteKeyVersions);
     try {
-      amazonS3.deleteObjects(request);
+      amazonS3.get().deleteObjects(request);
     } catch (AmazonClientException e) {
       throw new IOException(e);
     }