You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/07/21 08:47:49 UTC

[GitHub] [flink] dannycranmer opened a new pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

dannycranmer opened a new pull request #12944:
URL: https://github.com/apache/flink/pull/12944


   ## What is the purpose of the change
   
   Adding AWS SDK v2.x dependency and skeleton proxy interface for Fan Out consumption.
   
   ## Brief change log
   
   - Update pom file to include AWS SDK v2.x dependency
     - Dependencies are shaded and relocated into the connector jar (similar to AWS SDK V1)
   - Added a `KinesisProxyV2Interface` skeleton (methods and implementation will come in follow up tasks)
   - Add utility methods to create an AWS SDK v2.x Kinesis Client from connector properties, including:
     - Region
     - Credential Provider 
     - HTTP Client config
     - Other properties
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   - AWSUtil > AWSUtilTest
   - AwsV2Util > AwsV2UtilTest
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): yes
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r459476087



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+		AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+		verify(builder).roleArn("roleArn");
+		verify(builder).roleSessionName("roleSessionName");
+		verify(builder, never()).webIdentityTokenFile(any());
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProviderWithWebIdentityFile() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER), "webIdentityTokenFile");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+		AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+		verify(builder).webIdentityTokenFile(Paths.get("webIdentityTokenFile"));
+	}
+
+	@Test
+	public void testGetCredentialsProviderAuto() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof DefaultCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderAssumeRole() {
+		Properties properties = spy(properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"));
+		properties.setProperty(AWS_REGION, "eu-west-2");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof StsAssumeRoleCredentialsProvider);
+
+		verify(properties).getProperty(AWSConfigConstants.roleArn(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWSConfigConstants.roleSessionName(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWSConfigConstants.externalId(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWS_REGION);
+	}
+
+	@Test
+	public void testGetCredentialsProviderBasic() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "BASIC");
+		properties.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+		properties.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+		AwsCredentials credentials = AwsV2Util.getCredentialsProvider(properties).resolveCredentials();
+
+		assertEquals("ak", credentials.accessKeyId());
+		assertEquals("sk", credentials.secretAccessKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderProfile() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+		properties.put(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+		properties.put(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+		AwsCredentials credentials = credentialsProvider.resolveCredentials();
+		assertEquals("11111111111111111111", credentials.accessKeyId());
+		assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111", credentials.secretAccessKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderNamedProfile() {

Review comment:
       I am not sure what you mean, can you please elaborate? It already throws an exception if the `CredentialProvider` is not defined in the enum or supported by the `getCredentialsProvider` method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r459016637



##########
File path: flink-connectors/flink-connector-kinesis/pom.xml
##########
@@ -204,6 +224,12 @@ under the License.
 									<include>com.amazonaws:*</include>
 									<include>com.google.protobuf:*</include>
 									<include>org.apache.httpcomponents:*</include>
+									<include>software.amazon.awssdk:*</include>
+									<include>software.amazon.eventstream:*</include>
+									<include>software.amazon.ion:*</include>
+									<include>org.reactivestreams:*</include>
+									<include>io.netty:*</include>
+									<include>com.typesafe.netty:*</include>

Review comment:
       Ok thanks I will take a look and update that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-686192678


   @dannycranmer 
   
   I've been fixing a few bugs in the PR that has failed CI, before merging it.
   So far have bumped into this, could you take a look?
   
   ```
   2020-09-02T10:18:54.5913921Z [ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (shade-flink) on project flink-connector-kinesis_2.11: Error creating shaded jar: duplicate entry: META-INF/services/org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.async.SdkAsyncHttpService -> [Help 1]
   2020-09-02T10:18:54.5915252Z [ERROR] 
   2020-09-02T10:18:54.5916062Z [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
   2020-09-02T10:18:54.5916918Z [ERROR] Re-run Maven using the -X switch to enable full debug logging.
   2020-09-02T10:18:54.5917384Z [ERROR] 
   2020-09-02T10:18:54.5918070Z [ERROR] For more information about the errors and possible solutions, please read the following articles:
   2020-09-02T10:18:54.5918907Z [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
   2020-09-02T10:18:54.5919331Z [ERROR] 
   2020-09-02T10:18:54.5919700Z [ERROR] After correcting the problems, you can resume the build with the command
   2020-09-02T10:18:54.5920396Z [ERROR]   mvn <goals> -rf :flink-connector-kinesis_2.11
   ```
   
   Original build logs:
   https://dev.azure.com/tzulitai/tzulitai-flink/_build/results?buildId=35&view=logs&jobId=66592496-52df-56bb-d03e-37509e1d9d0f&j=66592496-52df-56bb-d03e-37509e1d9d0f&t=ae0269db-6796-5583-2e5f-d84757d711aa


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4936",
       "triggerID" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4936) 
   * 4c03a87c2e09b3bd716346252c7bd96174ccdb44 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 42b655e62dda50684e6143cf8a9fb464bfd9acb9 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661724994


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit e754d55a091b78c3647c47b227edb0b7870f0c09 (Tue Jul 21 08:49:36 UTC 2020)
   
   **Warnings:**
    * **1 pom.xml files were touched**: Check for build and licensing issues.
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r459474390



##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.profiles.ProfileFile;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Utility methods specific to Amazon Web Service SDK v2.x.
+ */
+@Internal
+public class AwsV2Util {
+
+	/**
+	 * Creates an Amazon Kinesis Async Client from the provided properties.
+	 * Configuration is copied from AWS SDK v1 configuration class as per:
+	 * - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
+	 *
+	 * @param configProps configuration properties
+	 * @param config the AWS SDK v1.x client configuration used to create the client
+	 * @return a new Amazon Kinesis Client
+	 */
+	public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps, final ClientConfiguration config) {
+		final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder());
+		final ClientOverrideConfiguration overrideConfiguration = createClientOverrideConfiguration(config, ClientOverrideConfiguration.builder());
+		final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();
+
+		return createKinesisAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
+	}
+
+	@VisibleForTesting
+	static SdkAsyncHttpClient createHttpClient(
+			final ClientConfiguration config,
+			final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+		httpClientBuilder
+			.maxConcurrency(config.getMaxConnections())
+			.connectionTimeout(Duration.ofMillis(config.getConnectionTimeout()))
+			.writeTimeout(Duration.ofMillis(config.getSocketTimeout()))
+			.connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis()))
+			.useIdleConnectionReaper(config.useReaper());
+
+		if (config.getConnectionTTL() > -1) {
+			httpClientBuilder.connectionTimeToLive(Duration.ofMillis(config.getConnectionTTL()));
+		}
+
+		return httpClientBuilder.build();
+	}
+
+	@VisibleForTesting
+	static ClientOverrideConfiguration createClientOverrideConfiguration(
+			final ClientConfiguration config,
+			final ClientOverrideConfiguration.Builder overrideConfigurationBuilder) {
+
+		overrideConfigurationBuilder
+			.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWSUtil.formatFlinkUserAgentPrefix())
+			.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, config.getUserAgentSuffix());
+
+		if (config.getRequestTimeout() > 0) {
+			overrideConfigurationBuilder.apiCallAttemptTimeout(Duration.ofMillis(config.getRequestTimeout()));
+		}
+
+		if (config.getClientExecutionTimeout() > 0) {
+			overrideConfigurationBuilder.apiCallTimeout(Duration.ofMillis(config.getClientExecutionTimeout()));
+		}
+
+		return overrideConfigurationBuilder.build();
+	}
+
+	@VisibleForTesting
+	static KinesisAsyncClient createKinesisAsyncClient(
+			final Properties configProps,
+			final KinesisAsyncClientBuilder clientBuilder,
+			final SdkAsyncHttpClient httpClient,
+			final ClientOverrideConfiguration overrideConfiguration) {
+
+		if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+			final URI endpointOverride = URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+			clientBuilder.endpointOverride(endpointOverride);
+		}
+
+		return clientBuilder
+			.httpClient(httpClient)
+			.overrideConfiguration(overrideConfiguration)
+			.credentialsProvider(getCredentialsProvider(configProps))
+			.region(getRegion(configProps))
+			.build();
+	}
+
+	/**
+	 * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
+	 *
+	 * @param configProps the configuration properties
+	 * @return The corresponding AWS Credentials Provider instance
+	 */
+	public static AwsCredentialsProvider getCredentialsProvider(final Properties configProps) {

Review comment:
       Yes when using `AssumeRole` it is called from line 205 with a different prefix.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r480921657



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();

Review comment:
       @dannycranmer thanks for the update, lets leave as is for the time being then.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-686192678


   @dannycranmer 
   
   I've been fixing a few bugs in the PR that has failed CI, before merging it.
   So far have bumped into this:
   
   ```
   2020-09-02T10:18:54.5913921Z [ERROR] Failed to execute goal org.apache.maven.plugins:maven-shade-plugin:3.1.1:shade (shade-flink) on project flink-connector-kinesis_2.11: Error creating shaded jar: duplicate entry: META-INF/services/org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.async.SdkAsyncHttpService -> [Help 1]
   2020-09-02T10:18:54.5915252Z [ERROR] 
   2020-09-02T10:18:54.5916062Z [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
   2020-09-02T10:18:54.5916918Z [ERROR] Re-run Maven using the -X switch to enable full debug logging.
   2020-09-02T10:18:54.5917384Z [ERROR] 
   2020-09-02T10:18:54.5918070Z [ERROR] For more information about the errors and possible solutions, please read the following articles:
   2020-09-02T10:18:54.5918907Z [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
   2020-09-02T10:18:54.5919331Z [ERROR] 
   2020-09-02T10:18:54.5919700Z [ERROR] After correcting the problems, you can resume the build with the command
   2020-09-02T10:18:54.5920396Z [ERROR]   mvn <goals> -rf :flink-connector-kinesis_2.11
   ```
   
   Original build logs:
   https://dev.azure.com/tzulitai/tzulitai-flink/_build/results?buildId=35&view=logs&jobId=66592496-52df-56bb-d03e-37509e1d9d0f&j=66592496-52df-56bb-d03e-37509e1d9d0f&t=ae0269db-6796-5583-2e5f-d84757d711aa


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4936",
       "triggerID" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151",
       "triggerID" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6156",
       "triggerID" : "686411010",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 4c03a87c2e09b3bd716346252c7bd96174ccdb44 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6156) 
   * 1809c6eac63c818a34b56d0adc50ab0fd07596b7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-686413947


   Thanks for the update @dannycranmer! I'll wait for this CI run.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-678939481


   I'll address my own comments and proceed to merge this. Thanks @dannycranmer !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4936",
       "triggerID" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151",
       "triggerID" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6156",
       "triggerID" : "686411010",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 4c03a87c2e09b3bd716346252c7bd96174ccdb44 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6156) 
   * 1809c6eac63c818a34b56d0adc50ab0fd07596b7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4936",
       "triggerID" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151",
       "triggerID" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6156",
       "triggerID" : "686411010",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6157",
       "triggerID" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c03a87c2e09b3bd716346252c7bd96174ccdb44 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6156) 
   * 1809c6eac63c818a34b56d0adc50ab0fd07596b7 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6157) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817) 
   * 2d843f9d90c3252e6a22c1a12749af60a82cce02 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817) 
   * 2d843f9d90c3252e6a22c1a12749af60a82cce02 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825) 
   * 42b655e62dda50684e6143cf8a9fb464bfd9acb9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4936",
       "triggerID" : "b7e8c0631ff1484905ff1ecf8f89a8b81036bc1a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151",
       "triggerID" : "4c03a87c2e09b3bd716346252c7bd96174ccdb44",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1809c6eac63c818a34b56d0adc50ab0fd07596b7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 4c03a87c2e09b3bd716346252c7bd96174ccdb44 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=6151) 
   * 1809c6eac63c818a34b56d0adc50ab0fd07596b7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r469030309



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();

Review comment:
       Do these builders being tested against in these tests have a valid `equals` implementation?
   Or getter methods to obtain the configured values?
   
   I'm asking because if that's the case, then we can maybe remove the use of Mockito mocks in these tests by having `configure*Builder(builder, properties)` which are made package-private just for test visibility.
   In that case, these tests would just be testing the `configure*Builder` methods which only just sets the config values on the builders.
   
   The verification in the test would then simply be an equals check / Matcher check:
   `assertThat(builderUnderTest, is(expectedBuilder))`.

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2.java
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+
+/**
+ * Kinesis proxy implementation using AWS SDK v2.x - a utility class that is used as a proxy to make
+ * calls to AWS Kinesis for several EFO (Enhanced Fan Out) functions, such as de-/registering stream consumers,
+ * subscribing to a shard and receiving records from a shard.
+ */
+@Internal
+public class KinesisProxyV2 implements KinesisProxyV2Interface {
+
+	private final KinesisAsyncClient kinesisAsyncClient;
+
+	/**
+	 * Create a new KinesisProxyV2 based on the supplied configuration properties.
+	 *
+	 * @param kinesisAsyncClient the kinesis async client used to communicate with Kinesis
+	 */
+	public KinesisProxyV2(final KinesisAsyncClient kinesisAsyncClient) {
+		this.kinesisAsyncClient = kinesisAsyncClient;

Review comment:
       Preconditions check

##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();

Review comment:
       We are trying to get rid of Mockito in tests as much as possible for future code additions.
   
   However, in this scenario if we don't have a valid `equals` or can't really implement custom matchers for the builders, then we probably don't have a way around using mocks.
   In that case, can leave it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-662312492


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
rmetzger commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r459523888



##########
File path: flink-connectors/flink-connector-kinesis/pom.xml
##########
@@ -204,6 +224,12 @@ under the License.
 									<include>com.amazonaws:*</include>
 									<include>com.google.protobuf:*</include>
 									<include>org.apache.httpcomponents:*</include>
+									<include>software.amazon.awssdk:*</include>
+									<include>software.amazon.eventstream:*</include>
+									<include>software.amazon.ion:*</include>
+									<include>org.reactivestreams:*</include>
+									<include>io.netty:*</include>
+									<include>com.typesafe.netty:*</include>

Review comment:
       Thanks a lot. We can include this dependency as well (see: https://www.apache.org/legal/resolved.html#handling-public-domain-licensed-works)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] asfgit closed pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #12944:
URL: https://github.com/apache/flink/pull/12944


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2d843f9d90c3252e6a22c1a12749af60a82cce02 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825) 
   * 42b655e62dda50684e6143cf8a9fb464bfd9acb9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817) 
   * 2d843f9d90c3252e6a22c1a12749af60a82cce02 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] rmetzger commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
rmetzger commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r458843982



##########
File path: flink-connectors/flink-connector-kinesis/pom.xml
##########
@@ -204,6 +224,12 @@ under the License.
 									<include>com.amazonaws:*</include>
 									<include>com.google.protobuf:*</include>
 									<include>org.apache.httpcomponents:*</include>
+									<include>software.amazon.awssdk:*</include>
+									<include>software.amazon.eventstream:*</include>
+									<include>software.amazon.ion:*</include>
+									<include>org.reactivestreams:*</include>
+									<include>io.netty:*</include>
+									<include>com.typesafe.netty:*</include>

Review comment:
       I guess you'll have to update the `NOTICE` file of the Kinesis connector as well. (The FAQ section here should give you some pointers: https://cwiki.apache.org/confluence/display/FLINK/Licensing )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r459489336



##########
File path: flink-connectors/flink-connector-kinesis/pom.xml
##########
@@ -204,6 +224,12 @@ under the License.
 									<include>com.amazonaws:*</include>
 									<include>com.google.protobuf:*</include>
 									<include>org.apache.httpcomponents:*</include>
+									<include>software.amazon.awssdk:*</include>
+									<include>software.amazon.eventstream:*</include>
+									<include>software.amazon.ion:*</include>
+									<include>org.reactivestreams:*</include>
+									<include>io.netty:*</include>
+									<include>com.typesafe.netty:*</include>

Review comment:
       @rmetzger I have updated the NOTICE file as requested. All of the new dependencies are Apache 2.0 except reactive streams is creative commons. Is that ok to bundle? I could not shade that one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] xiaolong-sn commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
xiaolong-sn commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r458720418



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+		AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+		verify(builder).roleArn("roleArn");
+		verify(builder).roleSessionName("roleSessionName");
+		verify(builder, never()).webIdentityTokenFile(any());
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProviderWithWebIdentityFile() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER), "webIdentityTokenFile");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();
+
+		AwsV2Util.getWebIdentityTokenFileCredentialsProvider(builder, properties, AWS_CREDENTIALS_PROVIDER);
+
+		verify(builder).webIdentityTokenFile(Paths.get("webIdentityTokenFile"));
+	}
+
+	@Test
+	public void testGetCredentialsProviderAuto() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "AUTO");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof DefaultCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderAssumeRole() {
+		Properties properties = spy(properties(AWS_CREDENTIALS_PROVIDER, "ASSUME_ROLE"));
+		properties.setProperty(AWS_REGION, "eu-west-2");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof StsAssumeRoleCredentialsProvider);
+
+		verify(properties).getProperty(AWSConfigConstants.roleArn(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWSConfigConstants.roleSessionName(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWSConfigConstants.externalId(AWS_CREDENTIALS_PROVIDER));
+		verify(properties).getProperty(AWS_REGION);
+	}
+
+	@Test
+	public void testGetCredentialsProviderBasic() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "BASIC");
+		properties.setProperty(AWSConfigConstants.accessKeyId(AWS_CREDENTIALS_PROVIDER), "ak");
+		properties.setProperty(AWSConfigConstants.secretKey(AWS_CREDENTIALS_PROVIDER), "sk");
+
+		AwsCredentials credentials = AwsV2Util.getCredentialsProvider(properties).resolveCredentials();
+
+		assertEquals("ak", credentials.accessKeyId());
+		assertEquals("sk", credentials.secretAccessKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderProfile() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "PROFILE");
+		properties.put(AWSConfigConstants.profileName(AWS_CREDENTIALS_PROVIDER), "default");
+		properties.put(AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER), "src/test/resources/profile");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof ProfileCredentialsProvider);
+
+		AwsCredentials credentials = credentialsProvider.resolveCredentials();
+		assertEquals("11111111111111111111", credentials.accessKeyId());
+		assertEquals("wJalrXUtnFEMI/K7MDENG/bPxRfiCY1111111111", credentials.secretAccessKey());
+	}
+
+	@Test
+	public void testGetCredentialsProviderNamedProfile() {

Review comment:
       Should we just throw an exception when the `CredentialsProvicer` is set to an illegal name, rather than give it a default value?

##########
File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2Util.java
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.CredentialProvider;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.profiles.ProfileFile;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Utility methods specific to Amazon Web Service SDK v2.x.
+ */
+@Internal
+public class AwsV2Util {
+
+	/**
+	 * Creates an Amazon Kinesis Async Client from the provided properties.
+	 * Configuration is copied from AWS SDK v1 configuration class as per:
+	 * - https://github.com/aws/aws-sdk-java-v2/blob/2.13.52/docs/LaunchChangelog.md#134-client-override-retry-configuration
+	 *
+	 * @param configProps configuration properties
+	 * @param config the AWS SDK v1.x client configuration used to create the client
+	 * @return a new Amazon Kinesis Client
+	 */
+	public static KinesisAsyncClient createKinesisAsyncClient(final Properties configProps, final ClientConfiguration config) {
+		final SdkAsyncHttpClient httpClient = createHttpClient(config, NettyNioAsyncHttpClient.builder());
+		final ClientOverrideConfiguration overrideConfiguration = createClientOverrideConfiguration(config, ClientOverrideConfiguration.builder());
+		final KinesisAsyncClientBuilder clientBuilder = KinesisAsyncClient.builder();
+
+		return createKinesisAsyncClient(configProps, clientBuilder, httpClient, overrideConfiguration);
+	}
+
+	@VisibleForTesting
+	static SdkAsyncHttpClient createHttpClient(
+			final ClientConfiguration config,
+			final NettyNioAsyncHttpClient.Builder httpClientBuilder) {
+		httpClientBuilder
+			.maxConcurrency(config.getMaxConnections())
+			.connectionTimeout(Duration.ofMillis(config.getConnectionTimeout()))
+			.writeTimeout(Duration.ofMillis(config.getSocketTimeout()))
+			.connectionMaxIdleTime(Duration.ofMillis(config.getConnectionMaxIdleMillis()))
+			.useIdleConnectionReaper(config.useReaper());
+
+		if (config.getConnectionTTL() > -1) {
+			httpClientBuilder.connectionTimeToLive(Duration.ofMillis(config.getConnectionTTL()));
+		}
+
+		return httpClientBuilder.build();
+	}
+
+	@VisibleForTesting
+	static ClientOverrideConfiguration createClientOverrideConfiguration(
+			final ClientConfiguration config,
+			final ClientOverrideConfiguration.Builder overrideConfigurationBuilder) {
+
+		overrideConfigurationBuilder
+			.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, AWSUtil.formatFlinkUserAgentPrefix())
+			.putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, config.getUserAgentSuffix());
+
+		if (config.getRequestTimeout() > 0) {
+			overrideConfigurationBuilder.apiCallAttemptTimeout(Duration.ofMillis(config.getRequestTimeout()));
+		}
+
+		if (config.getClientExecutionTimeout() > 0) {
+			overrideConfigurationBuilder.apiCallTimeout(Duration.ofMillis(config.getClientExecutionTimeout()));
+		}
+
+		return overrideConfigurationBuilder.build();
+	}
+
+	@VisibleForTesting
+	static KinesisAsyncClient createKinesisAsyncClient(
+			final Properties configProps,
+			final KinesisAsyncClientBuilder clientBuilder,
+			final SdkAsyncHttpClient httpClient,
+			final ClientOverrideConfiguration overrideConfiguration) {
+
+		if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+			final URI endpointOverride = URI.create(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+			clientBuilder.endpointOverride(endpointOverride);
+		}
+
+		return clientBuilder
+			.httpClient(httpClient)
+			.overrideConfiguration(overrideConfiguration)
+			.credentialsProvider(getCredentialsProvider(configProps))
+			.region(getRegion(configProps))
+			.build();
+	}
+
+	/**
+	 * Return a {@link AWSCredentialsProvider} instance corresponding to the configuration properties.
+	 *
+	 * @param configProps the configuration properties
+	 * @return The corresponding AWS Credentials Provider instance
+	 */
+	public static AwsCredentialsProvider getCredentialsProvider(final Properties configProps) {

Review comment:
       Why should we overload this method? Will there be an alternative prefix?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r469769449



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();

Review comment:
       After some offline discussion with @dannycranmer, we decided to move ahead with the PR without addressing the use of Mockito. There will be follow-ups to address that afterwards.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676) Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817) 
   * 2d843f9d90c3252e6a22c1a12749af60a82cce02 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825) 
   * 42b655e62dda50684e6143cf8a9fb464bfd9acb9 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4827) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tzulitai commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
tzulitai commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r469030309



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();

Review comment:
       Do these builders being tested against in these tests have a valid `equals` implementation?
   Or getter methods to obtain the configured values?
   
   I'm asking because if that's the case, then we can maybe remove the use of Mockito mocks in these tests by having `configure*Builder(builder, properties)` which are made package-private just for test visibility.
   In that case, these tests can be modified to just test against the `configure*Builder` methods which only just sets the config values on the builders.
   
   The verification in the test would then simply be an equals check / Matcher check:
   `assertThat(builderUnderTest, is(expectedBuilder))`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-686411010


   @flinkbot run azure


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-686388071


   @tzulitai the build issue was caused by running multiple builds without `clean`. For example:
   ```
   mvn package
   mvn package
   ```
   
   The files left hanging around were not being overridden by the shade plugin. The `org.apache.maven.plugins.shade.resource.ServicesResourceTransformer` configuration is inherited by an ancestor pom. I have verified that these service manifest files are not required by the connector and removed them from the shaded jar (if you know a different/better way to configure this plugin, happy to swap it out):
   
   ```
     META-INF/services/reactor.blockhound.integration.BlockHoundIntegration
   - META-INF/services/org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.async.SdkAsyncHttpService
   - META-INF/services/org.apache.flink.kinesis.shaded.software.amazon.awssdk.http.SdkHttpService
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825",
       "triggerID" : "2d843f9d90c3252e6a22c1a12749af60a82cce02",
       "triggerType" : "PUSH"
     }, {
       "hash" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "42b655e62dda50684e6143cf8a9fb464bfd9acb9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817) 
   * 2d843f9d90c3252e6a22c1a12749af60a82cce02 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4825) 
   * 42b655e62dda50684e6143cf8a9fb464bfd9acb9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] dannycranmer commented on a change in pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
dannycranmer commented on a change in pull request #12944:
URL: https://github.com/apache/flink/pull/12944#discussion_r475410998



##########
File path: flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/AwsV2UtilTest.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import org.junit.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
+import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
+import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
+
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_CREDENTIALS_PROVIDER;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleArn;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.roleSessionName;
+import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.webIdentityTokenFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AwsV2Util}.
+ */
+public class AwsV2UtilTest {
+
+	@Test
+	public void testGetCredentialsProviderEnvironmentVariables() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "ENV_VAR");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof EnvironmentVariableCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderSystemProperties() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "SYS_PROP");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof SystemPropertyCredentialsProvider);
+	}
+
+	@Test
+	public void testGetCredentialsProviderWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+
+		AwsCredentialsProvider credentialsProvider = AwsV2Util.getCredentialsProvider(properties);
+
+		assertTrue(credentialsProvider instanceof WebIdentityTokenFileCredentialsProvider);
+	}
+
+	@Test
+	public void testGetWebIdentityTokenFileCredentialsProvider() {
+		Properties properties = properties(AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");
+		properties.setProperty(roleArn(AWS_CREDENTIALS_PROVIDER), "roleArn");
+		properties.setProperty(roleSessionName(AWS_CREDENTIALS_PROVIDER), "roleSessionName");
+
+		WebIdentityTokenFileCredentialsProvider.Builder builder = mockWebIdentityTokenFileCredentialsProviderBuilder();

Review comment:
       @tzulitai I had a look at [one of the builders](https://github.com/aws/aws-sdk-java-v2/blob/master/core/auth/src/main/java/software/amazon/awssdk/auth/credentials/WebIdentityTokenFileCredentialsProvider.java), and unfortunately it does not have an `equals` implementation. That was a good idea though. 
   
   The other option is to create our own implementation of the builders. But that would be a lot of boilerplate and essentially doing the same thing Mockito does, albeit more verbose. I will assume we are leaving these as is, but let me know if you would like me to remove Mockito and implement test builders.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12944: [FLINK-18513][Kinesis] Add AWS SDK v2.x dependency and KinesisProxyV2

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12944:
URL: https://github.com/apache/flink/pull/12944#issuecomment-661736413


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676",
       "triggerID" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "triggerType" : "PUSH"
     }, {
       "hash" : "e754d55a091b78c3647c47b227edb0b7870f0c09",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817",
       "triggerID" : "662312492",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * e754d55a091b78c3647c47b227edb0b7870f0c09 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4676) Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=4817) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org