You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2018/01/12 13:30:57 UTC
[05/19] flink git commit: [FLINK-8271] [kinesis] Remove usage of
deprecated Kinesis APIs
[FLINK-8271] [kinesis] Remove usage of deprecated Kinesis APIs
This closes #5171.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d53a722e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d53a722e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d53a722e
Branch: refs/heads/master
Commit: d53a722e769e8ff6009d53208bf6702ec3e4a6f5
Parents: 0692275
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Jan 2 11:21:28 2018 -0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Fri Jan 12 19:43:28 2018 +0800
----------------------------------------------------------------------
.../connectors/kinesis/proxy/KinesisProxy.java | 4 +--
.../connectors/kinesis/util/AWSUtil.java | 34 ++++++++++++--------
.../manualtests/ManualExactlyOnceTest.java | 7 ++--
...nualExactlyOnceWithStreamReshardingTest.java | 6 ++--
4 files changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 7daaad2..6eb8134 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -22,7 +22,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
@@ -65,7 +65,7 @@ public class KinesisProxy implements KinesisProxyInterface {
private static final Logger LOG = LoggerFactory.getLogger(KinesisProxy.class);
/** The actual Kinesis client from the AWS SDK that we will be using to make calls. */
- private final AmazonKinesisClient kinesisClient;
+ private final AmazonKinesis kinesisClient;
/** Random seed used to calculate backoff jitter for Kinesis operations. */
private static final Random seed = new Random();
http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
index 5670526..c2dc5d3 100644
--- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
+++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java
@@ -30,9 +30,10 @@ import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import java.util.Properties;
@@ -40,27 +41,34 @@ import java.util.Properties;
* Some utilities specific to Amazon Web Service.
*/
public class AWSUtil {
+ /** Used for formatting Flink-specific user agent string when creating Kinesis client. */
+ private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector";
/**
- * Creates an Amazon Kinesis Client.
+ * Creates an AmazonKinesis client.
* @param configProps configuration properties containing the access key, secret key, and region
- * @return a new Amazon Kinesis Client
+ * @return a new AmazonKinesis client
*/
- public static AmazonKinesisClient createKinesisClient(Properties configProps) {
+ public static AmazonKinesis createKinesisClient(Properties configProps) {
// set a Flink-specific user agent
- ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
- awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
- " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
+ ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig()
+ .withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+ EnvironmentInformation.getVersion(),
+ EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
- AmazonKinesisClient client = new AmazonKinesisClient(
- AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
+ AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
+ .withCredentials(AWSUtil.getCredentialsProvider(configProps))
+ .withClientConfiguration(awsClientConfig)
+ .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
- client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
- client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+ // Set signingRegion as null, to facilitate mocking Kinesis for local tests
+ builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+ configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+ null));
}
- return client;
+ return builder.build();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
index 67ddad2..963002f 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValida
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +37,7 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
/**
- * This test first starts a data generator, producing data into kinesis.
+ * This test first starts a data generator, producing data into Kinesis.
* Then, it starts a consuming topology, ensuring that all records up to a certain
* point have been seen.
*
@@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicReference;
* --region eu-central-1 --accessKey X --secretKey X
*/
public class ManualExactlyOnceTest {
-
private static final Logger LOG = LoggerFactory.getLogger(ManualExactlyOnceTest.class);
static final int TOTAL_EVENT_COUNT = 1000; // the producer writes one per 10 ms, so it runs for 10k ms = 10 seconds
@@ -63,7 +62,7 @@ public class ManualExactlyOnceTest {
configProps.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, accessKey);
configProps.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
configProps.setProperty(AWSConfigConstants.AWS_REGION, region);
- AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+ AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
// create a stream for the test:
client.createStream(streamName, 1);
http://git-wip-us.apache.org/repos/asf/flink/blob/d53a722e/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
index cef8720..93b9caf 100644
--- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
+++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/manualtests/ManualExactlyOnceWithStreamReshardingTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValida
import org.apache.flink.streaming.connectors.kinesis.testutils.KinesisShardIdGenerator;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
@@ -74,7 +74,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
configProps.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, secretKey);
configProps.setProperty(ConsumerConfigConstants.AWS_REGION, region);
configProps.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS, "0");
- final AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+ final AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
// the stream is first created with 1 shard
client.createStream(streamName, 1);
@@ -107,7 +107,7 @@ public class ManualExactlyOnceWithStreamReshardingTest {
Runnable manualGenerate = new Runnable() {
@Override
public void run() {
- AmazonKinesisClient client = AWSUtil.createKinesisClient(configProps);
+ AmazonKinesis client = AWSUtil.createKinesisClient(configProps);
int count = 0;
final int batchSize = 30;
while (true) {