You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/01/03 05:44:02 UTC
[jira] [Commented] (FLINK-8271) upgrade from deprecated classes to
AmazonKinesis
[ https://issues.apache.org/jira/browse/FLINK-8271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309184#comment-16309184 ]
ASF GitHub Bot commented on FLINK-8271:
---------------------------------------
Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5171#discussion_r159363827
--- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java ---
@@ -30,37 +30,44 @@
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
-import com.amazonaws.regions.Region;
+import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
-import com.amazonaws.services.kinesis.AmazonKinesisClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import java.util.Properties;
/**
* Some utilities specific to Amazon Web Service.
*/
public class AWSUtil {
+ /** Used for formatting Flink-specific user agent string when creating Kinesis client. */
+ private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) Kinesis Connector";
/**
- * Creates an Amazon Kinesis Client.
+ * Creates an AmazonKinesis client.
* @param configProps configuration properties containing the access key, secret key, and region
- * @return a new Amazon Kinesis Client
+ * @return a new AmazonKinesis client
*/
- public static AmazonKinesisClient createKinesisClient(Properties configProps) {
+ public static AmazonKinesis createKinesisClient(Properties configProps) {
// set a Flink-specific user agent
- ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig();
- awsClientConfig.setUserAgent("Apache Flink " + EnvironmentInformation.getVersion() +
- " (" + EnvironmentInformation.getRevisionInformation().commitId + ") Kinesis Connector");
+ ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig()
+ .withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
+ EnvironmentInformation.getVersion(),
+ EnvironmentInformation.getRevisionInformation().commitId));
// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
- AmazonKinesisClient client = new AmazonKinesisClient(
- AWSUtil.getCredentialsProvider(configProps), awsClientConfig);
+ AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
+ .withCredentials(AWSUtil.getCredentialsProvider(configProps))
+ .withClientConfiguration(awsClientConfig)
+ .withRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION)));
- client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(AWSConfigConstants.AWS_REGION))));
if (configProps.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
- client.setEndpoint(configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+ builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
+ configProps.getProperty(AWSConfigConstants.AWS_ENDPOINT),
+ configProps.getProperty(AWSConfigConstants.AWS_REGION)));
--- End diff --
@tzulitai You are right. After some research I found the `region` field in `EndpointConfiguration` ([here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/client/builder/AwsClientBuilder.java#L363)) is used the same as [old code here](https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-core/src/main/java/com/amazonaws/AmazonWebServiceClient.java#L368). I will set it to null.
Take it a further step, if what you described is a frequent use case, we should add a unit test for it for future validation. Would you like to create a ticket and a PR for it?
> upgrade from deprecated classes to AmazonKinesis
> ------------------------------------------------
>
> Key: FLINK-8271
> URL: https://issues.apache.org/jira/browse/FLINK-8271
> Project: Flink
> Issue Type: Improvement
> Components: Kinesis Connector
> Affects Versions: 1.4.0
> Reporter: Bowen Li
> Assignee: Bowen Li
> Fix For: 1.5.0, 1.4.1
>
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)