You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/02/26 05:00:08 UTC
[druid] branch master updated: Add support for optional aws
credentials for s3 for ingestion (#9375)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 92fb837 Add support for optional aws credentials for s3 for ingestion (#9375)
92fb837 is described below
commit 92fb83726b0275cb936fbed21a183c81b55df419
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Tue Feb 25 20:59:53 2020 -0800
Add support for optional aws credentials for s3 for ingestion (#9375)
* Add support for optional cloud (aws, gcs, etc.) credentials for s3 for ingestion
* Add support for optional cloud (aws, gcs, etc.) credentials for s3 for ingestion
* Add support for optional cloud (aws, gcs, etc.) credentials for s3 for ingestion
* fix build failure
* fix failing build
* fix failing build
* Code cleanup
* fix failing test
* Removed CloudConfigProperties and make specific class for each cloudInputSource
* Removed CloudConfigProperties and make specific class for each cloudInputSource
* pass s3ConfigProperties for split
* lazy init s3client
* update docs
* fix docs check
* address comments
* add ServerSideEncryptingAmazonS3.Builder
* fix failing checkstyle
* fix typo
* wrap the ServerSideEncryptingAmazonS3.Builder in a provider
* added java docs for S3InputSource constructor
* added java docs for S3InputSource constructor
* remove wrap the ServerSideEncryptingAmazonS3.Builder in a provider
---
docs/development/extensions-core/s3.md | 3 +-
docs/ingestion/native-batch.md | 10 ++
.../apache/druid/data/input/s3/S3InputSource.java | 97 ++++++++++-
.../druid/data/input/s3/S3InputSourceConfig.java | 103 +++++++++++
.../druid/storage/s3/S3StorageDruidModule.java | 25 ++-
.../storage/s3/ServerSideEncryptingAmazonS3.java | 48 +++++
.../druid/data/input/s3/S3InputSourceTest.java | 193 +++++++++++++++++++--
.../storage/s3/TestAWSCredentialsProvider.java | 12 +-
website/.spelling | 6 +-
9 files changed, 468 insertions(+), 29 deletions(-)
diff --git a/docs/development/extensions-core/s3.md b/docs/development/extensions-core/s3.md
index bcc3801..b30ba4c 100644
--- a/docs/development/extensions-core/s3.md
+++ b/docs/development/extensions-core/s3.md
@@ -63,7 +63,8 @@ In addition to this you need to set additional configuration, specific for [deep
### S3 authentication methods
-To connect to your S3 bucket (whether deep storage bucket or source bucket), Druid use the following credentials providers chain
+Druid uses the following credentials provider chain to connect to your S3 bucket (whether a deep storage bucket or source bucket).
+**Note :** *You can override the default credentials provider chain for connecting to source bucket by specifying an access key and secret key using [Properties Object](../../ingestion/native-batch.md#s3-input-source) parameters in the ingestionSpec.*
|order|type|details|
|--------|-----------|-------|
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 10e57c8..39e838a 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -841,6 +841,7 @@ Sample specs:
|uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
+|properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given)
S3 Object:
@@ -849,6 +850,15 @@ S3 Object:
|bucket|Name of the S3 bucket|None|yes|
|path|The path where data is located.|None|yes|
+Properties Object:
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|accessKeyId|S3 access key for this S3 InputSource|None|yes if secretAccessKey is given|
+|secretAccessKey|S3 secret key for this S3 InputSource|None|yes if accessKeyId is given|
+
+**Note :** *If accessKeyId and secretAccessKey are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.*
+
### Google Cloud Storage Input Source
> You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the Google Cloud Storage input source.
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 659c68f..22c1068 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -19,11 +19,15 @@
package org.apache.druid.data.input.s3;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@@ -42,32 +46,82 @@ import javax.annotation.Nullable;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class S3InputSource extends CloudObjectInputSource
{
- private final ServerSideEncryptingAmazonS3 s3Client;
+ // We lazily initialize ServerSideEncryptingAmazonS3 to avoid costly s3 operation when we only need S3InputSource
+ // for stored information (such as for task logs) and not for ingestion.
+ // (This cost only applies for new ServerSideEncryptingAmazonS3 created with s3InputSourceConfig given).
+ private final Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier;
+ @JsonProperty("properties")
+ private final S3InputSourceConfig s3InputSourceConfig;
private final S3InputDataConfig inputDataConfig;
+ /**
+ * Constructor for S3InputSource
+ * @param s3Client The default ServerSideEncryptingAmazonS3 client built with all default configs
+ * from Guice. This injected singleton client is use when {@param s3InputSourceConfig}
+ * is not provided and hence, we can skip building a new client from
+ * {@param s3ClientBuilder}
+ * @param s3ClientBuilder Use for building a new s3Client to use instead of the default injected
+ * {@param s3Client}. The configurations of the client can be changed
+ * before being built
+ * @param inputDataConfig Stores the configuration for options related to reading input data
+ * @param uris User provided uris to read input data
+ * @param prefixes User provided prefixes to read input data
+ * @param objects User provided cloud objects values to read input data
+ * @param s3InputSourceConfig User provided properties for overriding the default S3 configuration
+ *
+ */
@JsonCreator
public S3InputSource(
@JacksonInject ServerSideEncryptingAmazonS3 s3Client,
+ @JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
@JacksonInject S3InputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
- @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
+ @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
+ @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig
)
{
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
- this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
+ Preconditions.checkNotNull(s3Client, "s3Client");
+ this.s3InputSourceConfig = s3InputSourceConfig;
+ this.s3ClientSupplier = Suppliers.memoize(
+ () -> {
+ if (s3ClientBuilder != null && s3InputSourceConfig != null) {
+ if (s3InputSourceConfig.isCredentialsConfigured()) {
+ AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(
+ s3InputSourceConfig.getAccessKeyId().getPassword(),
+ s3InputSourceConfig.getSecretAccessKey().getPassword()
+ )
+ );
+ s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(credentials);
+ }
+ return s3ClientBuilder.build();
+ } else {
+ return s3Client;
+ }
+ }
+ );
+ }
+
+ @Nullable
+ @JsonProperty("properties")
+ public S3InputSourceConfig getS3InputSourceConfig()
+ {
+ return s3InputSourceConfig;
}
@Override
protected InputEntity createEntity(CloudObjectLocation location)
{
- return new S3Entity(s3Client, location);
+ return new S3Entity(s3ClientSupplier.get(), location);
}
@Override
@@ -88,7 +142,37 @@ public class S3InputSource extends CloudObjectInputSource
@Override
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
{
- return new S3InputSource(s3Client, inputDataConfig, null, null, split.get());
+ return new S3InputSource(
+ s3ClientSupplier.get(),
+ null,
+ inputDataConfig,
+ null,
+ null,
+ split.get(),
+ getS3InputSourceConfig()
+ );
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), s3InputSourceConfig);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ S3InputSource that = (S3InputSource) o;
+ return Objects.equals(s3InputSourceConfig, that.s3InputSourceConfig);
}
@Override
@@ -98,11 +182,12 @@ public class S3InputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
+ ", s3InputSourceConfig=" + getS3InputSourceConfig() +
'}';
}
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
- return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), inputDataConfig.getMaxListingLength());
+ return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(), getPrefixes(), inputDataConfig.getMaxListingLength());
}
}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java
new file mode 100644
index 0000000..fd27b99
--- /dev/null
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.s3;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.metadata.PasswordProvider;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Contains properties for s3 input source.
+ * Properties can be specified by ingestionSpec which will override system default.
+ */
+public class S3InputSourceConfig
+{
+ @JsonCreator
+ public S3InputSourceConfig(
+ @JsonProperty("accessKeyId") @Nullable PasswordProvider accessKeyId,
+ @JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey
+ )
+ {
+ if (accessKeyId != null || secretAccessKey != null) {
+ this.accessKeyId = Preconditions.checkNotNull(accessKeyId, "accessKeyId cannot be null if secretAccessKey is given");
+ this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey, "secretAccessKey cannot be null if accessKeyId is given");
+ }
+ }
+
+ @JsonProperty
+ private PasswordProvider accessKeyId;
+
+ @JsonProperty
+ private PasswordProvider secretAccessKey;
+
+
+ public PasswordProvider getAccessKeyId()
+ {
+ return accessKeyId;
+ }
+
+ public PasswordProvider getSecretAccessKey()
+ {
+ return secretAccessKey;
+ }
+
+ @JsonIgnore
+ public boolean isCredentialsConfigured()
+ {
+ return accessKeyId != null &&
+ accessKeyId.getPassword() != null &&
+ secretAccessKey != null &&
+ secretAccessKey.getPassword() != null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "S3InputSourceConfig{" +
+ "accessKeyId=" + accessKeyId +
+ ", secretAccessKey=" + secretAccessKey +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ S3InputSourceConfig that = (S3InputSourceConfig) o;
+ return Objects.equals(accessKeyId, that.accessKeyId) &&
+ Objects.equals(secretAccessKey, that.secretAccessKey);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(accessKeyId, secretAccessKey);
+ }
+}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
index 8cc6c25..2676b6d 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
@@ -166,9 +166,11 @@ public class S3StorageDruidModule implements DruidModule
binder.bind(S3TaskLogs.class).in(LazySingleton.class);
}
+ // This provides ServerSideEncryptingAmazonS3.Builder with default configs from Guice injection initially set.
+ // However, this builder can then be modified and have configuration(s) inside
+ // AmazonS3ClientBuilder and/or S3StorageConfig overridden before being built.
@Provides
- @LazySingleton
- public ServerSideEncryptingAmazonS3 getAmazonS3Client(
+ public ServerSideEncryptingAmazonS3.Builder getServerSideEncryptingAmazonS3Builder(
AWSCredentialsProvider provider,
AWSProxyConfig proxyConfig,
AWSEndpointConfig endpointConfig,
@@ -178,7 +180,7 @@ public class S3StorageDruidModule implements DruidModule
{
final ClientConfiguration configuration = new ClientConfigurationFactory().getConfig();
final Protocol protocol = determineProtocol(clientConfig, endpointConfig);
- final AmazonS3ClientBuilder builder = AmazonS3Client
+ final AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client
.builder()
.withCredentials(provider)
.withClientConfiguration(setProxyConfig(configuration, proxyConfig).withProtocol(protocol))
@@ -187,11 +189,24 @@ public class S3StorageDruidModule implements DruidModule
.withForceGlobalBucketAccessEnabled(clientConfig.isForceGlobalBucketAccessEnabled());
if (StringUtils.isNotEmpty(endpointConfig.getUrl())) {
- builder.setEndpointConfiguration(
+ amazonS3ClientBuilder.setEndpointConfiguration(
new EndpointConfiguration(endpointConfig.getUrl(), endpointConfig.getSigningRegion())
);
}
- return new ServerSideEncryptingAmazonS3(builder.build(), storageConfig.getServerSideEncryption());
+ return ServerSideEncryptingAmazonS3.builder()
+ .setAmazonS3ClientBuilder(amazonS3ClientBuilder)
+ .setS3StorageConfig(storageConfig);
+
+ }
+
+ // This provides ServerSideEncryptingAmazonS3 built with all default configs from Guice injection
+ @Provides
+ @LazySingleton
+ public ServerSideEncryptingAmazonS3 getAmazonS3Client(
+ ServerSideEncryptingAmazonS3.Builder serverSideEncryptingAmazonS3Builder
+ )
+ {
+ return serverSideEncryptingAmazonS3Builder.build();
}
}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
index f7e8fa6..236cb2c 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
@@ -20,6 +20,8 @@
package org.apache.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectResult;
@@ -31,6 +33,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import java.io.ByteArrayInputStream;
@@ -47,6 +50,11 @@ import java.io.InputStream;
*/
public class ServerSideEncryptingAmazonS3
{
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
private final AmazonS3 amazonS3;
private final ServerSideEncryption serverSideEncryption;
@@ -119,4 +127,44 @@ public class ServerSideEncryptingAmazonS3
{
amazonS3.deleteObject(bucket, key);
}
+
+ public static class Builder
+ {
+ private AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client.builder();
+ private S3StorageConfig s3StorageConfig = new S3StorageConfig(new NoopServerSideEncryption());
+
+ public Builder setAmazonS3ClientBuilder(AmazonS3ClientBuilder amazonS3ClientBuilder)
+ {
+ this.amazonS3ClientBuilder = amazonS3ClientBuilder;
+ return this;
+ }
+
+ public Builder setS3StorageConfig(S3StorageConfig s3StorageConfig)
+ {
+ this.s3StorageConfig = s3StorageConfig;
+ return this;
+ }
+
+ public AmazonS3ClientBuilder getAmazonS3ClientBuilder()
+ {
+ return this.amazonS3ClientBuilder;
+ }
+
+ public S3StorageConfig getS3StorageConfig()
+ {
+ return this.s3StorageConfig;
+ }
+
+ public ServerSideEncryptingAmazonS3 build()
+ {
+ if (amazonS3ClientBuilder == null) {
+ throw new ISE("AmazonS3ClientBuilder cannot be null!");
+ }
+ if (s3StorageConfig == null) {
+ throw new ISE("S3StorageConfig cannot be null!");
+ }
+
+ return new ServerSideEncryptingAmazonS3(amazonS3ClientBuilder.build(), s3StorageConfig.getServerSideEncryption());
+ }
+ }
}
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index a7ab23b..59b6303 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
@@ -54,6 +55,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3Utils;
@@ -85,6 +87,9 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
{
private static final ObjectMapper MAPPER = createS3ObjectMapper();
private static final AmazonS3Client S3_CLIENT = EasyMock.createMock(AmazonS3Client.class);
+ private static final ServerSideEncryptingAmazonS3.Builder SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER =
+ EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
+ private static final AmazonS3ClientBuilder AMAZON_S3_CLIENT_BUILDER = AmazonS3Client.builder();
private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
S3_CLIENT,
new NoopServerSideEncryption()
@@ -112,6 +117,9 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
URI.create("s3://bar/foo")
);
+ private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new S3InputSourceConfig(
+ new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"));
+
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
@@ -133,7 +141,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Test
public void testSerdeWithUris() throws Exception
{
- final S3InputSource withUris = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null);
+ final S3InputSource withUris = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ null,
+ null,
+ null
+ );
final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
}
@@ -141,7 +157,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Test
public void testSerdeWithPrefixes() throws Exception
{
- final S3InputSource withPrefixes = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
+ final S3InputSource withPrefixes = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null
+ );
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@@ -150,17 +174,114 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Test
public void testSerdeWithObjects() throws Exception
{
+ final S3InputSource withPrefixes = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ null
+ );
+ final S3InputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ }
+ @Test
+ public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exception
+ {
+ EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
+ .andStubReturn(AMAZON_S3_CLIENT_BUILDER);
+ EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
+ .andReturn(SERVICE);
+ EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
- EXPECTED_LOCATION
+ EXPECTED_LOCATION,
+ CLOUD_CONFIG_PROPERTIES
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+ // This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3
+ serdeWithPrefixes.createEntity(new CloudObjectLocation("bucket", "path"));
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ }
+
+ @Test
+ public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCrediential()
+ {
+ S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class);
+ EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
+ EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
+ .andStubReturn(false);
+ EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
+ EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
+ .andReturn(SERVICE);
+ EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ final S3InputSource withPrefixes = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ mockConfigPropertiesWithoutKeyAndSecret
+ );
+ Assert.assertNotNull(withPrefixes);
+ // This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3
+ withPrefixes.createEntity(new CloudObjectLocation("bucket", "path"));
+ EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ EasyMock.verify(mockConfigPropertiesWithoutKeyAndSecret);
+ }
+
+ @Test
+ public void testSerdeS3ClientLazyInitializedWithCrediential() throws Exception
+ {
+ // Amazon S3 builder should not build anything as we did not make any call that requires the S3 client
+ EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ final S3InputSource withPrefixes = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ CLOUD_CONFIG_PROPERTIES
+ );
+ final S3InputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ }
+
+ @Test
+ public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws Exception
+ {
+ // Amazon S3 builder should not build anything as we did not make any call that requires the S3 client
+ EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+ final S3InputSource withPrefixes = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ null,
+ EXPECTED_LOCATION,
+ null
+ );
+ final S3InputSource serdeWithPrefixes =
+ MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+ Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+ EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
}
@Test
@@ -168,10 +289,12 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
{
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
ImmutableList.of(),
ImmutableList.of(),
- EXPECTED_LOCATION
+ EXPECTED_LOCATION,
+ null
);
final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
@@ -185,10 +308,12 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
// constructor will explode
new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
PREFIXES,
- EXPECTED_LOCATION
+ EXPECTED_LOCATION,
+ null
);
}
@@ -199,10 +324,12 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
// constructor will explode
new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
PREFIXES,
- ImmutableList.of()
+ ImmutableList.of(),
+ null
);
}
@@ -213,17 +340,27 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
// constructor will explode
new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
ImmutableList.of(),
PREFIXES,
- EXPECTED_LOCATION
+ EXPECTED_LOCATION,
+ null
);
}
@Test
public void testWithUrisSplit()
{
- S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null);
+ S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ EXPECTED_URIS,
+ null,
+ null,
+ null
+ );
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@@ -241,7 +378,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.replay(S3_CLIENT);
- S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
+ S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null
+ );
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@@ -260,7 +405,14 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.replay(S3_CLIENT);
- S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
+ S3InputSource inputSource = new S3InputSource(
+ SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+ INPUT_DATA_CONFIG,
+ null,
+ PREFIXES,
+ null,
+ null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@@ -284,9 +436,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
S3InputSource inputSource = new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
+ null,
null
);
@@ -313,9 +467,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
S3InputSource inputSource = new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
+ null,
null
);
@@ -355,9 +511,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
S3InputSource inputSource = new S3InputSource(
SERVICE,
+ SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
+ null,
null
);
@@ -489,13 +647,22 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
{
// Deserializer is need for AmazonS3Client even though it is injected.
// See https://github.com/FasterXML/jackson-databind/issues/962.
- return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer()));
+ return ImmutableList.of(
+ new SimpleModule()
+ .addDeserializer(AmazonS3.class, new ItemDeserializer<AmazonS3>())
+ .addDeserializer(AmazonS3ClientBuilder.class, new ItemDeserializer<AmazonS3ClientBuilder>())
+ );
}
@Override
public void configure(Binder binder)
{
+ }
+ @Provides
+ public ServerSideEncryptingAmazonS3.Builder getServerSideEncryptingAmazonS3Builder()
+ {
+ return SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER;
}
@Provides
@@ -505,7 +672,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
}
}
- public static class ItemDeserializer extends StdDeserializer<AmazonS3>
+ public static class ItemDeserializer<T> extends StdDeserializer<T>
{
ItemDeserializer()
{
@@ -518,7 +685,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
}
@Override
- public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt)
+ public T deserialize(JsonParser jp, DeserializationContext ctxt)
{
throw new UnsupportedOperationException();
}
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java
index 7f8e267..3685fc6 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java
@@ -62,13 +62,17 @@ public class TestAWSCredentialsProvider
Assert.assertEquals("secretKeySample", credentials.getAWSSecretKey());
// try to create
- s3Module.getAmazonS3Client(
+ ServerSideEncryptingAmazonS3.Builder amazonS3ClientBuilder = s3Module.getServerSideEncryptingAmazonS3Builder(
provider,
new AWSProxyConfig(),
new AWSEndpointConfig(),
new AWSClientConfig(),
new S3StorageConfig(new NoopServerSideEncryption())
);
+
+ s3Module.getAmazonS3Client(
+ amazonS3ClientBuilder
+ );
}
@Test
@@ -93,12 +97,16 @@ public class TestAWSCredentialsProvider
Assert.assertEquals("sessionTokenSample", sessionCredentials.getSessionToken());
// try to create
- s3Module.getAmazonS3Client(
+ ServerSideEncryptingAmazonS3.Builder amazonS3ClientBuilder = s3Module.getServerSideEncryptingAmazonS3Builder(
provider,
new AWSProxyConfig(),
new AWSEndpointConfig(),
new AWSClientConfig(),
new S3StorageConfig(new NoopServerSideEncryption())
);
+
+ s3Module.getAmazonS3Client(
+ amazonS3ClientBuilder
+ );
}
}
diff --git a/website/.spelling b/website/.spelling
index 275854f..eed68ee 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -251,6 +251,7 @@ http
https
i.e.
influxdb
+ingestionSpec
injective
inlined
interruptible
@@ -907,7 +908,6 @@ ignoreInvalidRows
ignoreWhenNoSegments
indexSpecForIntermediatePersists
index_hadoop
-ingestionSpec
inputPath
inputSpecs
interval1
@@ -954,6 +954,7 @@ PartitionsSpec
PasswordProviders
SegmentsSplitHintSpec
SplitHintSpec
+accessKeyId
appendToExisting
baseDir
chatHandlerNumRetries
@@ -973,6 +974,7 @@ maxNumSegmentsToMerge
maxRetry
pushTimeout
reportParseExceptions
+secretAccessKey
segmentWriteOutMediumFactory
sql
sqls
@@ -1711,4 +1713,4 @@ isRobot
isUnpatrolled
metroCode
regionIsoCode
-regionName
+regionName
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org