You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/12/15 14:06:04 UTC
flink git commit: [FLINK-4611] [kinesis] Make "AUTO" credential
provider as default for Kinesis Connector
Repository: flink
Updated Branches:
refs/heads/master d84599ea0 -> 4666e65ef
[FLINK-4611] [kinesis] Make "AUTO" credential provider as default for Kinesis Connector
This closes #2914.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4666e65e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4666e65e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4666e65e
Branch: refs/heads/master
Commit: 4666e65ef7b1e42a2bf0bba1d7d08e8d68e1af01
Parents: d84599e
Author: \u9b4f\u5049\u54f2 <to...@tonyweis-MacBook-Pro.local>
Authored: Wed Nov 30 18:17:24 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Dec 15 22:04:33 2016 +0800
----------------------------------------------------------------------
.../connectors/kinesis/util/AWSUtil.java | 24 +++++++++++++++-----
.../kinesis/util/KinesisConfigUtil.java | 12 ++--------
.../kinesis/FlinkKinesisConsumerTest.java | 12 ----------
3 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index cff69e5..a6aad02 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -69,8 +69,20 @@ public class AWSUtil {
* @return The corresponding AWS Credentials Provider instance
*/
public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
- CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
- AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString()));
+ CredentialProvider credentialProviderType;
+ if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+ if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+ && configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+ // if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
+ credentialProviderType = CredentialProvider.BASIC;
+ } else {
+ // if the credential provider type is not specified, it will default to AUTO
+ credentialProviderType = CredentialProvider.AUTO;
+ }
+ } else {
+ credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
+ AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
+ }
AWSCredentialsProvider credentialsProvider;
@@ -90,10 +102,6 @@ public class AWSUtil {
? new ProfileCredentialsProvider(profileName)
: new ProfileCredentialsProvider(profileConfigPath, profileName);
break;
- case AUTO:
- credentialsProvider = new DefaultAWSCredentialsProviderChain();
- break;
- default:
case BASIC:
credentialsProvider = new AWSCredentialsProvider() {
@Override
@@ -108,6 +116,10 @@ public class AWSUtil {
// do nothing
}
};
+ break;
+ default:
+ case AUTO:
+ credentialsProvider = new DefaultAWSCredentialsProviderChain();
}
return credentialsProvider;
http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 9aa14ad..d8ea0a2 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -130,15 +130,7 @@ public class KinesisConfigUtil {
* Validate configuration properties related to Amazon AWS service
*/
public static void validateAwsConfiguration(Properties config) {
- if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
- // if the credential provider type is not specified, it will default to BASIC later on,
- // so the Access Key ID and Secret Key must be given
- if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
- || !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
- throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
- "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
- }
- } else {
+ if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
// value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
@@ -157,7 +149,7 @@ public class KinesisConfigUtil {
if (providerType == CredentialProvider.BASIC) {
if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
- throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+ throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index dbf95f9..a72d8df 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -87,18 +87,6 @@ public class FlinkKinesisConsumerTest {
}
@Test
- public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
- exception.expect(IllegalArgumentException.class);
- exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
- "and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
-
- Properties testConfig = new Properties();
- testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-
- KinesisConfigUtil.validateAwsConfiguration(testConfig);
- }
-
- @Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +