You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/02/26 05:00:08 UTC

[druid] branch master updated: Add support for optional aws credentials for s3 for ingestion (#9375)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 92fb837  Add support for optional aws credentials for s3 for ingestion (#9375)
92fb837 is described below

commit 92fb83726b0275cb936fbed21a183c81b55df419
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Tue Feb 25 20:59:53 2020 -0800

    Add support for optional aws credentials for s3 for ingestion (#9375)
    
    * Add support for optional cloud (aws, gcs, etc.) credentials for s3 for ingestion
    
    * Add support for optional cloud (aws, gcs, etc.) credentials for s3 for ingestion
    
    * Add support for optional cloud (aws, gcs, etc.) credentials for s3 for ingestion
    
    * fix build failure
    
    * fix failing build
    
    * fix failing build
    
    * Code cleanup
    
    * fix failing test
    
    * Removed CloudConfigProperties and make specific class for each cloudInputSource
    
    * Removed CloudConfigProperties and make specific class for each cloudInputSource
    
    * pass s3ConfigProperties for split
    
    * lazy init s3client
    
    * update docs
    
    * fix docs check
    
    * address comments
    
    * add ServerSideEncryptingAmazonS3.Builder
    
    * fix failing checkstyle
    
    * fix typo
    
    * wrap the ServerSideEncryptingAmazonS3.Builder in a provider
    
    * added java docs for S3InputSource constructor
    
    * added java docs for S3InputSource constructor
    
    * remove wrap the ServerSideEncryptingAmazonS3.Builder in a provider
---
 docs/development/extensions-core/s3.md             |   3 +-
 docs/ingestion/native-batch.md                     |  10 ++
 .../apache/druid/data/input/s3/S3InputSource.java  |  97 ++++++++++-
 .../druid/data/input/s3/S3InputSourceConfig.java   | 103 +++++++++++
 .../druid/storage/s3/S3StorageDruidModule.java     |  25 ++-
 .../storage/s3/ServerSideEncryptingAmazonS3.java   |  48 +++++
 .../druid/data/input/s3/S3InputSourceTest.java     | 193 +++++++++++++++++++--
 .../storage/s3/TestAWSCredentialsProvider.java     |  12 +-
 website/.spelling                                  |   6 +-
 9 files changed, 468 insertions(+), 29 deletions(-)

diff --git a/docs/development/extensions-core/s3.md b/docs/development/extensions-core/s3.md
index bcc3801..b30ba4c 100644
--- a/docs/development/extensions-core/s3.md
+++ b/docs/development/extensions-core/s3.md
@@ -63,7 +63,8 @@ In addition to this you need to set additional configuration, specific for [deep
 
 ### S3 authentication methods
 
-To connect to your S3 bucket (whether deep storage bucket or source bucket), Druid use the following credentials providers chain
+Druid uses the following credentials provider chain to connect to your S3 bucket (whether a deep storage bucket or source bucket).
+**Note :** *You can override the default credentials provider chain for connecting to source bucket by specifying an access key and secret key using [Properties Object](../../ingestion/native-batch.md#s3-input-source) parameters in the ingestionSpec.*
 
 |order|type|details|
 |--------|-----------|-------|
diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 10e57c8..39e838a 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -841,6 +841,7 @@ Sample specs:
 |uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
 |prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
 |objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
+|properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given)
 
 S3 Object:
 
@@ -849,6 +850,15 @@ S3 Object:
 |bucket|Name of the S3 bucket|None|yes|
 |path|The path where data is located.|None|yes|
 
+Properties Object:
+
+|property|description|default|required?|
+|--------|-----------|-------|---------|
+|accessKeyId|S3 access key for this S3 InputSource|None|yes if secretAccessKey is given|
+|secretAccessKey|S3 secret key for this S3 InputSource|None|yes if accessKeyId is given|
+
+**Note :** *If accessKeyId and secretAccessKey are not given, the default [S3 credentials provider chain](../development/extensions-core/s3.md#s3-authentication-methods) is used.*
+
 ### Google Cloud Storage Input Source
 
 > You need to include the [`druid-google-extensions`](../development/extensions-core/google.md) as an extension to use the Google Cloud Storage input source.
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
index 659c68f..22c1068 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java
@@ -19,11 +19,15 @@
 
 package org.apache.druid.data.input.s3;
 
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import org.apache.druid.data.input.InputEntity;
 import org.apache.druid.data.input.InputFileAttribute;
 import org.apache.druid.data.input.InputSplit;
@@ -42,32 +46,82 @@ import javax.annotation.Nullable;
 import java.net.URI;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class S3InputSource extends CloudObjectInputSource
 {
-  private final ServerSideEncryptingAmazonS3 s3Client;
+  // We lazily initialize ServerSideEncryptingAmazonS3 to avoid costly s3 operation when we only need S3InputSource
+  // for stored information (such as for task logs) and not for ingestion.
+  // (This cost only applies for new ServerSideEncryptingAmazonS3 created with s3InputSourceConfig given).
+  private final Supplier<ServerSideEncryptingAmazonS3> s3ClientSupplier;
+  @JsonProperty("properties")
+  private final S3InputSourceConfig s3InputSourceConfig;
   private final S3InputDataConfig inputDataConfig;
 
+  /**
+   * Constructor for S3InputSource
+   * @param s3Client                The default ServerSideEncryptingAmazonS3 client built with all default configs
+   *                                from Guice. This injected singleton client is use when {@param s3InputSourceConfig}
+   *                                is not provided and hence, we can skip building a new client from
+   *                                {@param s3ClientBuilder}
+   * @param s3ClientBuilder         Use for building a new s3Client to use instead of the default injected
+   *                                {@param s3Client}. The configurations of the client can be changed
+   *                                before being built
+   * @param inputDataConfig         Stores the configuration for options related to reading input data
+   * @param uris                    User provided uris to read input data
+   * @param prefixes                User provided prefixes to read input data
+   * @param objects                 User provided cloud objects values to read input data
+   * @param s3InputSourceConfig     User provided properties for overriding the default S3 configuration
+   *
+   */
   @JsonCreator
   public S3InputSource(
       @JacksonInject ServerSideEncryptingAmazonS3 s3Client,
+      @JacksonInject ServerSideEncryptingAmazonS3.Builder s3ClientBuilder,
       @JacksonInject S3InputDataConfig inputDataConfig,
       @JsonProperty("uris") @Nullable List<URI> uris,
       @JsonProperty("prefixes") @Nullable List<URI> prefixes,
-      @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
+      @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
+      @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig
   )
   {
     super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
-    this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
     this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
+    Preconditions.checkNotNull(s3Client, "s3Client");
+    this.s3InputSourceConfig = s3InputSourceConfig;
+    this.s3ClientSupplier = Suppliers.memoize(
+        () -> {
+          if (s3ClientBuilder != null && s3InputSourceConfig != null) {
+            if (s3InputSourceConfig.isCredentialsConfigured()) {
+              AWSStaticCredentialsProvider credentials = new AWSStaticCredentialsProvider(
+                  new BasicAWSCredentials(
+                      s3InputSourceConfig.getAccessKeyId().getPassword(),
+                      s3InputSourceConfig.getSecretAccessKey().getPassword()
+                  )
+              );
+              s3ClientBuilder.getAmazonS3ClientBuilder().withCredentials(credentials);
+            }
+            return s3ClientBuilder.build();
+          } else {
+            return s3Client;
+          }
+        }
+    );
+  }
+
+  @Nullable
+  @JsonProperty("properties")
+  public S3InputSourceConfig getS3InputSourceConfig()
+  {
+    return s3InputSourceConfig;
   }
 
   @Override
   protected InputEntity createEntity(CloudObjectLocation location)
   {
-    return new S3Entity(s3Client, location);
+    return new S3Entity(s3ClientSupplier.get(), location);
   }
 
   @Override
@@ -88,7 +142,37 @@ public class S3InputSource extends CloudObjectInputSource
   @Override
   public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
   {
-    return new S3InputSource(s3Client, inputDataConfig, null, null, split.get());
+    return new S3InputSource(
+        s3ClientSupplier.get(),
+        null,
+        inputDataConfig,
+        null,
+        null,
+        split.get(),
+        getS3InputSourceConfig()
+    );
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), s3InputSourceConfig);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    S3InputSource that = (S3InputSource) o;
+    return Objects.equals(s3InputSourceConfig, that.s3InputSourceConfig);
   }
 
   @Override
@@ -98,11 +182,12 @@ public class S3InputSource extends CloudObjectInputSource
            "uris=" + getUris() +
            ", prefixes=" + getPrefixes() +
            ", objects=" + getObjects() +
+           ", s3InputSourceConfig=" + getS3InputSourceConfig() +
            '}';
   }
 
   private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
   {
-    return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), inputDataConfig.getMaxListingLength());
+    return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(), getPrefixes(), inputDataConfig.getMaxListingLength());
   }
 }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java
new file mode 100644
index 0000000..fd27b99
--- /dev/null
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceConfig.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.s3;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.metadata.PasswordProvider;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Contains properties for s3 input source.
+ * Properties can be specified by ingestionSpec which will override system default.
+ */
+public class S3InputSourceConfig
+{
+  @JsonCreator
+  public S3InputSourceConfig(
+      @JsonProperty("accessKeyId") @Nullable PasswordProvider accessKeyId,
+      @JsonProperty("secretAccessKey") @Nullable PasswordProvider secretAccessKey
+  )
+  {
+    if (accessKeyId != null || secretAccessKey != null) {
+      this.accessKeyId = Preconditions.checkNotNull(accessKeyId, "accessKeyId cannot be null if secretAccessKey is given");
+      this.secretAccessKey = Preconditions.checkNotNull(secretAccessKey, "secretAccessKey cannot be null if accessKeyId is given");
+    }
+  }
+
+  @JsonProperty
+  private PasswordProvider accessKeyId;
+
+  @JsonProperty
+  private PasswordProvider secretAccessKey;
+
+
+  public PasswordProvider getAccessKeyId()
+  {
+    return accessKeyId;
+  }
+
+  public PasswordProvider getSecretAccessKey()
+  {
+    return secretAccessKey;
+  }
+
+  @JsonIgnore
+  public boolean isCredentialsConfigured()
+  {
+    return accessKeyId != null &&
+           accessKeyId.getPassword() != null &&
+           secretAccessKey != null &&
+           secretAccessKey.getPassword() != null;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "S3InputSourceConfig{" +
+           "accessKeyId=" + accessKeyId +
+           ", secretAccessKey=" + secretAccessKey +
+           '}';
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    S3InputSourceConfig that = (S3InputSourceConfig) o;
+    return Objects.equals(accessKeyId, that.accessKeyId) &&
+           Objects.equals(secretAccessKey, that.secretAccessKey);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(accessKeyId, secretAccessKey);
+  }
+}
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
index 8cc6c25..2676b6d 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java
@@ -166,9 +166,11 @@ public class S3StorageDruidModule implements DruidModule
     binder.bind(S3TaskLogs.class).in(LazySingleton.class);
   }
 
+  // This provides ServerSideEncryptingAmazonS3.Builder with default configs from Guice injection initially set.
+  // However, this builder can then be modified and have configuration(s) inside
+  // AmazonS3ClientBuilder and/or S3StorageConfig overridden before being built.
   @Provides
-  @LazySingleton
-  public ServerSideEncryptingAmazonS3 getAmazonS3Client(
+  public ServerSideEncryptingAmazonS3.Builder getServerSideEncryptingAmazonS3Builder(
       AWSCredentialsProvider provider,
       AWSProxyConfig proxyConfig,
       AWSEndpointConfig endpointConfig,
@@ -178,7 +180,7 @@ public class S3StorageDruidModule implements DruidModule
   {
     final ClientConfiguration configuration = new ClientConfigurationFactory().getConfig();
     final Protocol protocol = determineProtocol(clientConfig, endpointConfig);
-    final AmazonS3ClientBuilder builder = AmazonS3Client
+    final AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client
         .builder()
         .withCredentials(provider)
         .withClientConfiguration(setProxyConfig(configuration, proxyConfig).withProtocol(protocol))
@@ -187,11 +189,24 @@ public class S3StorageDruidModule implements DruidModule
         .withForceGlobalBucketAccessEnabled(clientConfig.isForceGlobalBucketAccessEnabled());
 
     if (StringUtils.isNotEmpty(endpointConfig.getUrl())) {
-      builder.setEndpointConfiguration(
+      amazonS3ClientBuilder.setEndpointConfiguration(
           new EndpointConfiguration(endpointConfig.getUrl(), endpointConfig.getSigningRegion())
       );
     }
 
-    return new ServerSideEncryptingAmazonS3(builder.build(), storageConfig.getServerSideEncryption());
+    return ServerSideEncryptingAmazonS3.builder()
+                                       .setAmazonS3ClientBuilder(amazonS3ClientBuilder)
+                                       .setS3StorageConfig(storageConfig);
+
+  }
+
+  // This provides ServerSideEncryptingAmazonS3 built with all default configs from Guice injection
+  @Provides
+  @LazySingleton
+  public ServerSideEncryptingAmazonS3 getAmazonS3Client(
+      ServerSideEncryptingAmazonS3.Builder serverSideEncryptingAmazonS3Builder
+  )
+  {
+    return serverSideEncryptingAmazonS3Builder.build();
   }
 }
diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
index f7e8fa6..236cb2c 100644
--- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
+++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java
@@ -20,6 +20,8 @@
 package org.apache.druid.storage.s3;
 
 import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AccessControlList;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.CopyObjectResult;
@@ -31,6 +33,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.S3Object;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 
 import java.io.ByteArrayInputStream;
@@ -47,6 +50,11 @@ import java.io.InputStream;
  */
 public class ServerSideEncryptingAmazonS3
 {
+  public static Builder builder()
+  {
+    return new Builder();
+  }
+
   private final AmazonS3 amazonS3;
   private final ServerSideEncryption serverSideEncryption;
 
@@ -119,4 +127,44 @@ public class ServerSideEncryptingAmazonS3
   {
     amazonS3.deleteObject(bucket, key);
   }
+
+  public static class Builder
+  {
+    private AmazonS3ClientBuilder amazonS3ClientBuilder = AmazonS3Client.builder();
+    private S3StorageConfig s3StorageConfig = new S3StorageConfig(new NoopServerSideEncryption());
+
+    public Builder setAmazonS3ClientBuilder(AmazonS3ClientBuilder amazonS3ClientBuilder)
+    {
+      this.amazonS3ClientBuilder = amazonS3ClientBuilder;
+      return this;
+    }
+
+    public Builder setS3StorageConfig(S3StorageConfig s3StorageConfig)
+    {
+      this.s3StorageConfig = s3StorageConfig;
+      return this;
+    }
+
+    public AmazonS3ClientBuilder getAmazonS3ClientBuilder()
+    {
+      return this.amazonS3ClientBuilder;
+    }
+
+    public S3StorageConfig getS3StorageConfig()
+    {
+      return this.s3StorageConfig;
+    }
+
+    public ServerSideEncryptingAmazonS3 build()
+    {
+      if (amazonS3ClientBuilder == null) {
+        throw new ISE("AmazonS3ClientBuilder cannot be null!");
+      }
+      if (s3StorageConfig == null) {
+        throw new ISE("S3StorageConfig cannot be null!");
+      }
+
+      return new ServerSideEncryptingAmazonS3(amazonS3ClientBuilder.build(), s3StorageConfig.getServerSideEncryption());
+    }
+  }
 }
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
index a7ab23b..59b6303 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.data.input.s3;
 
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
@@ -54,6 +55,7 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.metadata.DefaultPasswordProvider;
 import org.apache.druid.storage.s3.NoopServerSideEncryption;
 import org.apache.druid.storage.s3.S3InputDataConfig;
 import org.apache.druid.storage.s3.S3Utils;
@@ -85,6 +87,9 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
 {
   private static final ObjectMapper MAPPER = createS3ObjectMapper();
   private static final AmazonS3Client S3_CLIENT = EasyMock.createMock(AmazonS3Client.class);
+  private static final ServerSideEncryptingAmazonS3.Builder SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER =
+      EasyMock.createMock(ServerSideEncryptingAmazonS3.Builder.class);
+  private static final AmazonS3ClientBuilder AMAZON_S3_CLIENT_BUILDER = AmazonS3Client.builder();
   private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
       S3_CLIENT,
       new NoopServerSideEncryption()
@@ -112,6 +117,9 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
       URI.create("s3://bar/foo")
   );
 
+  private static final S3InputSourceConfig CLOUD_CONFIG_PROPERTIES = new S3InputSourceConfig(
+      new DefaultPasswordProvider("myKey"), new DefaultPasswordProvider("mySecret"));
+
   private static final List<CloudObjectLocation> EXPECTED_LOCATION =
       ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
 
@@ -133,7 +141,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   @Test
   public void testSerdeWithUris() throws Exception
   {
-    final S3InputSource withUris = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null);
+    final S3InputSource withUris = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        EXPECTED_URIS,
+        null,
+        null,
+        null
+    );
     final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
     Assert.assertEquals(withUris, serdeWithUris);
   }
@@ -141,7 +157,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   @Test
   public void testSerdeWithPrefixes() throws Exception
   {
-    final S3InputSource withPrefixes = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
+    final S3InputSource withPrefixes = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        PREFIXES,
+        null,
+        null
+    );
     final S3InputSource serdeWithPrefixes =
         MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
     Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@@ -150,17 +174,114 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   @Test
   public void testSerdeWithObjects() throws Exception
   {
+    final S3InputSource withPrefixes = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        null,
+        EXPECTED_LOCATION,
+        null
+    );
+    final S3InputSource serdeWithPrefixes =
+        MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+    Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+  }
 
+  @Test
+  public void testSerdeWithCloudConfigPropertiesWithKeyAndSecret() throws Exception
+  {
+    EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.getAmazonS3ClientBuilder())
+            .andStubReturn(AMAZON_S3_CLIENT_BUILDER);
+    EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
+            .andReturn(SERVICE);
+    EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
     final S3InputSource withPrefixes = new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         null,
         null,
-        EXPECTED_LOCATION
+        EXPECTED_LOCATION,
+        CLOUD_CONFIG_PROPERTIES
     );
     final S3InputSource serdeWithPrefixes =
         MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+    // This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3
+    serdeWithPrefixes.createEntity(new CloudObjectLocation("bucket", "path"));
     Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+    EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+  }
+
+  @Test
+  public void testS3InputSourceUseDefaultPasswordWhenCloudConfigPropertiesWithoutCrediential()
+  {
+    S3InputSourceConfig mockConfigPropertiesWithoutKeyAndSecret = EasyMock.createMock(S3InputSourceConfig.class);
+    EasyMock.reset(mockConfigPropertiesWithoutKeyAndSecret);
+    EasyMock.expect(mockConfigPropertiesWithoutKeyAndSecret.isCredentialsConfigured())
+            .andStubReturn(false);
+    EasyMock.replay(mockConfigPropertiesWithoutKeyAndSecret);
+    EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    EasyMock.expect(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER.build())
+            .andReturn(SERVICE);
+    EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    final S3InputSource withPrefixes = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        null,
+        EXPECTED_LOCATION,
+        mockConfigPropertiesWithoutKeyAndSecret
+    );
+    Assert.assertNotNull(withPrefixes);
+    // This is to force the s3ClientSupplier to initialize the ServerSideEncryptingAmazonS3
+    withPrefixes.createEntity(new CloudObjectLocation("bucket", "path"));
+    EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    EasyMock.verify(mockConfigPropertiesWithoutKeyAndSecret);
+  }
+
+  @Test
+  public void testSerdeS3ClientLazyInitializedWithCrediential() throws Exception
+  {
+    // Amazon S3 builder should not build anything as we did not make any call that requires the S3 client
+    EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    final S3InputSource withPrefixes = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        null,
+        EXPECTED_LOCATION,
+        CLOUD_CONFIG_PROPERTIES
+    );
+    final S3InputSource serdeWithPrefixes =
+        MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+    Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+    EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+  }
+
+  @Test
+  public void testSerdeS3ClientLazyInitializedWithoutCrediential() throws Exception
+  {
+    // Amazon S3 builder should not build anything as we did not make any call that requires the S3 client
+    EasyMock.reset(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    EasyMock.replay(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
+    final S3InputSource withPrefixes = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        null,
+        EXPECTED_LOCATION,
+        null
+    );
+    final S3InputSource serdeWithPrefixes =
+        MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
+    Assert.assertEquals(withPrefixes, serdeWithPrefixes);
+    EasyMock.verify(SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER);
   }
 
   @Test
@@ -168,10 +289,12 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
   {
     final S3InputSource withPrefixes = new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         ImmutableList.of(),
         ImmutableList.of(),
-        EXPECTED_LOCATION
+        EXPECTED_LOCATION,
+        null
     );
     final S3InputSource serdeWithPrefixes =
         MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
@@ -185,10 +308,12 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     // constructor will explode
     new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         EXPECTED_URIS,
         PREFIXES,
-        EXPECTED_LOCATION
+        EXPECTED_LOCATION,
+        null
     );
   }
 
@@ -199,10 +324,12 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     // constructor will explode
     new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         EXPECTED_URIS,
         PREFIXES,
-        ImmutableList.of()
+        ImmutableList.of(),
+        null
     );
   }
 
@@ -213,17 +340,27 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     // constructor will explode
     new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         ImmutableList.of(),
         PREFIXES,
-        EXPECTED_LOCATION
+        EXPECTED_LOCATION,
+        null
     );
   }
 
   @Test
   public void testWithUrisSplit()
   {
-    S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null);
+    S3InputSource inputSource = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        EXPECTED_URIS,
+        null,
+        null,
+        null
+    );
 
     Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
         new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@@ -241,7 +378,15 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
     EasyMock.replay(S3_CLIENT);
 
-    S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
+    S3InputSource inputSource = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        PREFIXES,
+        null,
+        null
+    );
 
     Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
         new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@@ -260,7 +405,14 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
     EasyMock.replay(S3_CLIENT);
 
-    S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
+    S3InputSource inputSource = new S3InputSource(
+        SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
+        INPUT_DATA_CONFIG,
+        null,
+        PREFIXES,
+        null,
+        null);
 
     Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
         new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@@ -284,9 +436,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
 
     S3InputSource inputSource = new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         null,
         ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
+        null,
         null
     );
 
@@ -313,9 +467,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
 
     S3InputSource inputSource = new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         null,
         ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
+        null,
         null
     );
 
@@ -355,9 +511,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
 
     S3InputSource inputSource = new S3InputSource(
         SERVICE,
+        SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
         INPUT_DATA_CONFIG,
         null,
         ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
+        null,
         null
     );
 
@@ -489,13 +647,22 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     {
       // Deserializer is need for AmazonS3Client even though it is injected.
       // See https://github.com/FasterXML/jackson-databind/issues/962.
-      return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer()));
+      return ImmutableList.of(
+          new SimpleModule()
+              .addDeserializer(AmazonS3.class, new ItemDeserializer<AmazonS3>())
+              .addDeserializer(AmazonS3ClientBuilder.class, new ItemDeserializer<AmazonS3ClientBuilder>())
+      );
     }
 
     @Override
     public void configure(Binder binder)
     {
+    }
 
+    @Provides
+    public ServerSideEncryptingAmazonS3.Builder getServerSideEncryptingAmazonS3Builder()
+    {
+      return SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER;
     }
 
     @Provides
@@ -505,7 +672,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     }
   }
 
-  public static class ItemDeserializer extends StdDeserializer<AmazonS3>
+  public static class ItemDeserializer<T> extends StdDeserializer<T>
   {
     ItemDeserializer()
     {
@@ -518,7 +685,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
     }
 
     @Override
-    public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt)
+    public T deserialize(JsonParser jp, DeserializationContext ctxt)
     {
       throw new UnsupportedOperationException();
     }
diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java
index 7f8e267..3685fc6 100644
--- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java
+++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/TestAWSCredentialsProvider.java
@@ -62,13 +62,17 @@ public class TestAWSCredentialsProvider
     Assert.assertEquals("secretKeySample", credentials.getAWSSecretKey());
 
     // try to create
-    s3Module.getAmazonS3Client(
+    ServerSideEncryptingAmazonS3.Builder amazonS3ClientBuilder = s3Module.getServerSideEncryptingAmazonS3Builder(
         provider,
         new AWSProxyConfig(),
         new AWSEndpointConfig(),
         new AWSClientConfig(),
         new S3StorageConfig(new NoopServerSideEncryption())
     );
+
+    s3Module.getAmazonS3Client(
+        amazonS3ClientBuilder
+    );
   }
 
   @Test
@@ -93,12 +97,16 @@ public class TestAWSCredentialsProvider
     Assert.assertEquals("sessionTokenSample", sessionCredentials.getSessionToken());
 
     // try to create
-    s3Module.getAmazonS3Client(
+    ServerSideEncryptingAmazonS3.Builder amazonS3ClientBuilder = s3Module.getServerSideEncryptingAmazonS3Builder(
         provider,
         new AWSProxyConfig(),
         new AWSEndpointConfig(),
         new AWSClientConfig(),
         new S3StorageConfig(new NoopServerSideEncryption())
     );
+
+    s3Module.getAmazonS3Client(
+        amazonS3ClientBuilder
+    );
   }
 }
diff --git a/website/.spelling b/website/.spelling
index 275854f..eed68ee 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -251,6 +251,7 @@ http
 https
 i.e.
 influxdb
+ingestionSpec
 injective
 inlined
 interruptible
@@ -907,7 +908,6 @@ ignoreInvalidRows
 ignoreWhenNoSegments
 indexSpecForIntermediatePersists
 index_hadoop
-ingestionSpec
 inputPath
 inputSpecs
 interval1
@@ -954,6 +954,7 @@ PartitionsSpec
 PasswordProviders
 SegmentsSplitHintSpec
 SplitHintSpec
+accessKeyId
 appendToExisting
 baseDir
 chatHandlerNumRetries
@@ -973,6 +974,7 @@ maxNumSegmentsToMerge
 maxRetry
 pushTimeout
 reportParseExceptions
+secretAccessKey
 segmentWriteOutMediumFactory
 sql
 sqls
@@ -1711,4 +1713,4 @@ isRobot
 isUnpatrolled
 metroCode
 regionIsoCode
-regionName
+regionName
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org