You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/12 13:30:57 UTC

[05/19] flink git commit: [FLINK-8271] [kinesis] Remove usage of deprecated Kinesis APIs

[FLINK-8271] [kinesis] Remove usage of deprecated Kinesis APIs

This closes #5171.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d53a722e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d53a722e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d53a722e

Branch: refs/heads/master
Commit: d53a722e769e8ff6009d53208bf6702ec3e4a6f5
Parents: 0692275
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Jan 2 11:21:28 2018 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 12 19:43:28 2018 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/proxy/KinesisProxy.java  |  4 +--
 .../connectors/kinesis/util/AWSUtil.java        | 34 ++++++++++++--------
 .../manualtests/ManualExactlyOnceTest.java      |  7 ++--
 ...nualExactlyOnceWithStreamReshardingTest.java |  6 ++--
 4 files changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 7daaad2..6eb8134 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
 import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
@@ -65,7 +65,7 @@ public class KinesisProxy implements KinesisProxyInterface {
 	private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
 
 	/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
-	private final AmazonKinesisClient kinesisClient;
+	private final AmazonKinesis kinesisClient;
 
 	/** Random seed used to calculate backoff jitter for Kinesis operations. */
 	private static final Random seed = new Random();

http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 5670526..c2dc5d3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -30,9 +30,10 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 
 import java.util.Properties;
 
@@ -40,27 +41,34 @@ import java.util.Properties;
  * Some utilities specific to Amazon Web Service.
  */
 public class AWSUtil {
+	/** Used for formatting Flink-specific user agent string when creating Kinesis client. */
+	private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector";
 
 	/**
-	 * Creates an Amazon Kinesis Client.
+	 * Creates an AmazonKinesis client.
 	 * @param configProps configuration properties containing the access key, secret key, and region
-	 * @return a new Amazon Kinesis Client
+	 * @return a new AmazonKinesis client
 	 */
-	public static AmazonKinesisClient createKinesisClient(Properties configProps) {
+	public static AmazonKinesis createKinesisClient(Properties configProps) {
 		// set a Flink-specific user agent
-		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
-		awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
-			" (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
+		ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig()
+				.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+														EnvironmentInformation.getVersion(),
+														EnvironmentInformation.getRevisionInformation().commitId));
 
 		// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
-		AmazonKinesisClient client = new AmazonKinesisClient(
-			AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
+		AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
+				.withCredentials(AWSUtil.getCredentialsProvider(configProps))
+				.withClientConfiguration(awsClientConfig)
+				.withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
 
-		client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
 		if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
-			client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+			// Set signingRegion as null, to facilitate mocking Kinesis for local tests
+			builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+													configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+													null));
 		}
-		return client;
+		return builder.build();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 67ddad2..963002f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValida
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * This test first starts a data generator, producing data into kinesis.
+ * This test first starts a data generator, producing data into Kinesis.
  * Then, it starts a consuming topology, ensuring that all records up to a certain
  * point have been seen.
  *
@@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicReference;
  * --region eu-central-1 --accessKey X --secretKey X
  */
 public class ManualExactlyOnceTest {
-
 	private static final Logger LOG = LoggerFactory.getLogger(ManualExactlyOnceTest.class);
 
 	static final int TOTAL_EVENT_COUNT = 1000; // the producer writes one per 10 ms, so it runs for 10k ms = 10 seconds
@@ -63,7 +62,7 @@ public class ManualExactlyOnceTest {
 		configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
 		configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
 		configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
-		AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+		AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
 
 		// create a stream for the test:
 		client.createStream(streamName, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index cef8720..93b9caf 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValida
 import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
 import com.amazonaws.services.kinesis.model.PutRecordsRequest;
@@ -74,7 +74,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 		configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
 		configProps.setProperty(ConsumerConfigConstants.AWS_REGION, region);
 		configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "0");
-		final AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+		final AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
 
 		// the stream is first created with 1 shard
 		client.createStream(streamName, 1);
@@ -107,7 +107,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
 			Runnable manualGenerate = new Runnable() {
 				@Override
 				public void run() {
-					AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+					AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
 					int count = 0;
 					final int batchSize = 30;
 					while (true) {