You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2019/03/25 14:17:31 UTC
[beam] branch master updated: [BEAM-6266] Don't require --awsRegion
when S3 isn't used on the new AWS SDK.
This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b036fc7 [BEAM-6266] Don't require --awsRegion when S3 isn't used on the new AWS SDK.
new 69b8bc1 Merge pull request #8126: [BEAM-6266] Don't require --awsRegion when S3 isn't used on the new AWS SDK
b036fc7 is described below
commit b036fc78bbbd90ae16dc3f6d7c5203f07d078905
Author: Mike Kaplinskiy <mi...@ladderlife.com>
AuthorDate: Sun Mar 24 19:03:53 2019 -0700
[BEAM-6266] Don't require --awsRegion when S3 isn't used on the new AWS SDK.
---
.../io/aws/s3/DefaultS3ClientBuilderFactory.java | 14 ++++++--
.../apache/beam/sdk/io/aws/s3/S3FileSystem.java | 41 +++++++++++-----------
2 files changed, 31 insertions(+), 24 deletions(-)
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java
index eaca226..8416304 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/DefaultS3ClientBuilderFactory.java
@@ -22,6 +22,8 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.apache.beam.sdk.io.aws.options.S3ClientBuilderFactory;
import org.apache.beam.sdk.io.aws.options.S3Options;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Construct AmazonS3ClientBuilder with default values of S3 client properties like path style
@@ -29,6 +31,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
*/
public class DefaultS3ClientBuilderFactory implements S3ClientBuilderFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultS3ClientBuilderFactory.class);
+
@Override
public AmazonS3ClientBuilder createBuilder(S3Options s3Options) {
AmazonS3ClientBuilder builder =
@@ -38,13 +42,17 @@ public class DefaultS3ClientBuilderFactory implements S3ClientBuilderFactory {
builder = builder.withClientConfiguration(s3Options.getClientConfiguration());
}
- if (Strings.isNullOrEmpty(s3Options.getAwsServiceEndpoint())) {
- builder = builder.withRegion(s3Options.getAwsRegion());
- } else {
+ if (!Strings.isNullOrEmpty(s3Options.getAwsServiceEndpoint())) {
builder =
builder.withEndpointConfiguration(
new AwsClientBuilder.EndpointConfiguration(
s3Options.getAwsServiceEndpoint(), s3Options.getAwsRegion()));
+ } else if (!Strings.isNullOrEmpty(s3Options.getAwsRegion())) {
+ builder = builder.withRegion(s3Options.getAwsRegion());
+ } else {
+ LOG.info(
+ "The AWS S3 Beam extension was included in this build, but the awsRegion flag "
+ + "was not specified. If you don't plan to use S3, then ignore this message.");
}
return builder;
}
diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
index 39ebcbb..276d64b 100644
--- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
+++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java
@@ -23,6 +23,7 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Precondi
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -69,6 +70,8 @@ import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
@@ -95,23 +98,19 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
ImmutableSet.of("gzip");
// Non-final for testing.
- private AmazonS3 amazonS3;
+ private Supplier<AmazonS3> amazonS3;
private final S3Options options;
private final ListeningExecutorService executorService;
S3FileSystem(S3Options options) {
this.options = checkNotNull(options, "options");
- if (Strings.isNullOrEmpty(options.getAwsRegion())) {
- LOG.info(
- "The AWS S3 Beam extension was included in this build, but the awsRegion flag "
- + "was not specified. If you don't plan to use S3, then ignore this message.");
- }
- this.amazonS3 =
+ AmazonS3ClientBuilder builder =
InstanceBuilder.ofType(S3ClientBuilderFactory.class)
.fromClass(options.getS3ClientFactoryClass())
.build()
- .createBuilder(options)
- .build();
+ .createBuilder(options);
+ // The Supplier is to make sure we don't call .build() unless we are actually using S3.
+ amazonS3 = Suppliers.memoize(builder::build);
checkNotNull(options.getS3StorageClass(), "storageClass");
checkArgument(options.getS3ThreadPoolSize() > 0, "threadPoolSize");
@@ -128,12 +127,12 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
@VisibleForTesting
void setAmazonS3Client(AmazonS3 amazonS3) {
- this.amazonS3 = amazonS3;
+ this.amazonS3 = Suppliers.ofInstance(amazonS3);
}
@VisibleForTesting
AmazonS3 getAmazonS3Client() {
- return this.amazonS3;
+ return this.amazonS3.get();
}
@Override
@@ -308,7 +307,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
.withContinuationToken(continuationToken);
ListObjectsV2Result result;
try {
- result = amazonS3.listObjectsV2(request);
+ result = amazonS3.get().listObjectsV2(request);
} catch (AmazonClientException e) {
return ExpandedGlob.create(glob, new IOException(e));
}
@@ -356,7 +355,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
GetObjectMetadataRequest request =
new GetObjectMetadataRequest(s3ResourceId.getBucket(), s3ResourceId.getKey());
request.setSSECustomerKey(options.getSSECustomerKey());
- return amazonS3.getObjectMetadata(request);
+ return amazonS3.get().getObjectMetadata(request);
}
@VisibleForTesting
@@ -459,12 +458,12 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
@Override
protected WritableByteChannel create(S3ResourceId resourceId, CreateOptions createOptions)
throws IOException {
- return new S3WritableByteChannel(amazonS3, resourceId, createOptions.mimeType(), options);
+ return new S3WritableByteChannel(amazonS3.get(), resourceId, createOptions.mimeType(), options);
}
@Override
protected ReadableByteChannel open(S3ResourceId resourceId) throws IOException {
- return new S3ReadableSeekableByteChannel(amazonS3, resourceId, options);
+ return new S3ReadableSeekableByteChannel(amazonS3.get(), resourceId, options);
}
@Override
@@ -520,7 +519,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
copyObjectRequest.setStorageClass(options.getS3StorageClass());
copyObjectRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
copyObjectRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
- return amazonS3.copyObject(copyObjectRequest);
+ return amazonS3.get().copyObject(copyObjectRequest);
}
@VisibleForTesting
@@ -534,7 +533,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
initiateUploadRequest.setSSECustomerKey(options.getSSECustomerKey());
InitiateMultipartUploadResult initiateUploadResult =
- amazonS3.initiateMultipartUpload(initiateUploadRequest);
+ amazonS3.get().initiateMultipartUpload(initiateUploadRequest);
final String uploadId = initiateUploadResult.getUploadId();
List<PartETag> eTags = new ArrayList<>();
@@ -554,7 +553,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
- CopyPartResult copyPartResult = amazonS3.copyPart(copyPartRequest);
+ CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest);
eTags.add(copyPartResult.getPartETag());
} else {
long bytePosition = 0;
@@ -574,7 +573,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
copyPartRequest.setSourceSSECustomerKey(options.getSSECustomerKey());
copyPartRequest.setDestinationSSECustomerKey(options.getSSECustomerKey());
- CopyPartResult copyPartResult = amazonS3.copyPart(copyPartRequest);
+ CopyPartResult copyPartResult = amazonS3.get().copyPart(copyPartRequest);
eTags.add(copyPartResult.getPartETag());
bytePosition += uploadBufferSizeBytes;
@@ -587,7 +586,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
.withKey(destinationPath.getKey())
.withUploadId(uploadId)
.withPartETags(eTags);
- return amazonS3.completeMultipartUpload(completeUploadRequest);
+ return amazonS3.get().completeMultipartUpload(completeUploadRequest);
}
@Override
@@ -634,7 +633,7 @@ class S3FileSystem extends FileSystem<S3ResourceId> {
keys.stream().map(KeyVersion::new).collect(Collectors.toList());
DeleteObjectsRequest request = new DeleteObjectsRequest(bucket).withKeys(deleteKeyVersions);
try {
- amazonS3.deleteObjects(request);
+ amazonS3.get().deleteObjects(request);
} catch (AmazonClientException e) {
throw new IOException(e);
}