You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2016/12/15 14:06:04 UTC

flink git commit: [FLINK-4611] [kinesis] Make "AUTO" credential provider as default for Kinesis Connector

Repository: flink
Updated Branches:
  refs/heads/master d84599ea0 -> 4666e65ef


[FLINK-4611] [kinesis] Make "AUTO" credential provider as default for Kinesis Connector

This closes #2914.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4666e65e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4666e65e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4666e65e

Branch: refs/heads/master
Commit: 4666e65ef7b1e42a2bf0bba1d7d08e8d68e1af01
Parents: d84599e
Author: \u9b4f\u5049\u54f2 <to...@tonyweis-MacBook-Pro.local>
Authored: Wed Nov 30 18:17:24 2016 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Thu Dec 15 22:04:33 2016 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/util/AWSUtil.java        | 24 +++++++++++++++-----
 .../kinesis/util/KinesisConfigUtil.java         | 12 ++--------
 .../kinesis/FlinkKinesisConsumerTest.java       | 12 ----------
 3 files changed, 20 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index cff69e5..a6aad02 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -69,8 +69,20 @@ public class AWSUtil {
 	 * @return The corresponding AWS Credentials Provider instance
 	 */
 	public static AWSCredentialsProvider getCredentialsProvider(final Properties configProps) {
-		CredentialProvider credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
-			AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, CredentialProvider.BASIC.toString()));
+		CredentialProvider credentialProviderType;
+		if (!configProps.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
+			if (configProps.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
+				&& configProps.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
+				// if the credential provider type is not specified, but the Access Key ID and Secret Key are given, it will default to BASIC
+				credentialProviderType = CredentialProvider.BASIC;
+			} else {
+				// if the credential provider type is not specified, it will default to AUTO
+				credentialProviderType = CredentialProvider.AUTO;
+			}
+		} else {
+			credentialProviderType = CredentialProvider.valueOf(configProps.getProperty(
+				AWSConfigConstants.AWS_CREDENTIALS_PROVIDER));
+		}
 
 		AWSCredentialsProvider credentialsProvider;
 
@@ -90,10 +102,6 @@ public class AWSUtil {
 					? new ProfileCredentialsProvider(profileName)
 					: new ProfileCredentialsProvider(profileConfigPath, profileName);
 				break;
-			case AUTO:
-				credentialsProvider = new DefaultAWSCredentialsProviderChain();
-				break;
-			default:
 			case BASIC:
 				credentialsProvider = new AWSCredentialsProvider() {
 					@Override
@@ -108,6 +116,10 @@ public class AWSUtil {
 						// do nothing
 					}
 				};
+				break;
+			default:
+			case AUTO:
+				credentialsProvider = new DefaultAWSCredentialsProviderChain();
 		}
 
 		return credentialsProvider;

http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index 9aa14ad..d8ea0a2 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -130,15 +130,7 @@ public class KinesisConfigUtil {
 	 * Validate configuration properties related to Amazon AWS service
 	 */
 	public static void validateAwsConfiguration(Properties config) {
-		if (!config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
-			// if the credential provider type is not specified, it will default to BASIC later on,
-			// so the Access Key ID and Secret Key must be given
-			if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
-				|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
-				throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
-					"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
-			}
-		} else {
+		if (config.containsKey(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)) {
 			String credentialsProviderType = config.getProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER);
 
 			// value specified for AWSConfigConstants.AWS_CREDENTIALS_PROVIDER needs to be recognizable
@@ -157,7 +149,7 @@ public class KinesisConfigUtil {
 			if (providerType == CredentialProvider.BASIC) {
 				if (!config.containsKey(AWSConfigConstants.AWS_ACCESS_KEY_ID)
 					|| !config.containsKey(AWSConfigConstants.AWS_SECRET_ACCESS_KEY)) {
-					throw new IllegalArgumentException("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
+					throw new IllegalArgumentException("Please set values for AWS Access Key ID ('" + AWSConfigConstants.AWS_ACCESS_KEY_ID + "') " +
 						"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/4666e65e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index dbf95f9..a72d8df 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -87,18 +87,6 @@ public class FlinkKinesisConsumerTest {
 	}
 
 	@Test
-	public void testCredentialProviderTypeDefaultToBasicButNoCredentialsSetInConfig() {
-		exception.expect(IllegalArgumentException.class);
-		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +
-				"and Secret Key ('" + AWSConfigConstants.AWS_SECRET_ACCESS_KEY + "') when using the BASIC AWS credential provider type.");
-
-		Properties testConfig = new Properties();
-		testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
-
-		KinesisConfigUtil.validateAwsConfiguration(testConfig);
-	}
-
-	@Test
 	public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
 		exception.expect(IllegalArgumentException.class);
 		exception.expectMessage("Please set values for AWS Access Key ID ('"+ AWSConfigConstants.AWS_ACCESS_KEY_ID +"') " +