You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "hlteoh37 (via GitHub)" <gi...@apache.org> on 2023/04/03 10:42:02 UTC

[GitHub] [flink-connector-aws] hlteoh37 opened a new pull request, #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

hlteoh37 opened a new pull request, #65:
URL: https://github.com/apache/flink-connector-aws/pull/65

   …me for Kinesis API calls
   
   ## Purpose of the change
   - Change Kinesis API calls to use streamARN instead of streamName. This allows for lower request latency on Kinesis service side.
   - We could have also made a bigger change to allow users to specify streamARN in rather than streamName. However, that would require state migration, so we have passed on that for now.
   
   ## Verifying this change
   This change added tests and can be verified as follows:
   - Modified existing unit tests for ListShards and GetShardIterator
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
     - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155997741


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -253,4 +263,26 @@ private boolean isRetryable(Throwable err) {
 
         return true;
     }
+
+    private String lookupStreamArn(final String streamName) {
+        if (streamArn.get() == null) {
+            try {
+                final String arn =
+                        kinesisClient
+                                .describeStream(
+                                        DescribeStreamRequest.builder()
+                                                .streamName(streamName)
+                                                .build())
+                                .get()
+                                .streamDescription()
+                                .streamARN();

Review Comment:
   You're right. In this case, we can actually just extend the public API to allow users to specify streamARN instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#issuecomment-1531177783

   LGTM, thanks @hlteoh37, will merge after actions pass


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155998033


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -253,4 +263,26 @@ private boolean isRetryable(Throwable err) {
 
         return true;
     }
+
+    private String lookupStreamArn(final String streamName) {
+        if (streamArn.get() == null) {
+            try {
+                final String arn =
+                        kinesisClient
+                                .describeStream(
+                                        DescribeStreamRequest.builder()
+                                                .streamName(streamName)
+                                                .build())
+                                .get()
+                                .streamDescription()
+                                .streamARN();
+                streamArn.set(arn);
+                return arn;
+            } catch (InterruptedException | ExecutionException e) {
+                throw new KinesisStreamsException(

Review Comment:
   side-step this issue by allowing users to specify stream ARN



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155880425


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -94,6 +97,9 @@
     /* Flag to whether fatally fail any time we encounter an exception when persisting records */
     private final boolean failOnError;
 
+    /* Stores the identified stream ARN */
+    private AtomicReference<String> streamArn = new AtomicReference<>();

Review Comment:
   This can be `final`



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -140,6 +141,9 @@ public class KinesisProxy implements KinesisProxyInterface {
     /** Exponential backoff power constant for the describe stream operation. */
     private final double describeStreamExpConstant;
 
+    /** Caches retrieved stream ARNs for give stream names. */
+    private ConcurrentHashMap<String, String> streamNameToArnLookup = new ConcurrentHashMap<>();

Review Comment:
   Left side can be a standard `Map`



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -498,18 +514,18 @@ private List<StreamShardHandle> getShardsOfStream(
      * subtask's fetcher. This jitter backoff approach will help distribute calls across the
      * fetchers over time.
      *
-     * @param streamName the stream to describe
+     * @param streamArn the stream to describe
      * @param startShardId which shard to start with for this describe operation (earlier shard's
      *     infos will not appear in result)
      * @return the result of the describe stream operation
      */
     private ListShardsResult listShards(
-            String streamName, @Nullable String startShardId, @Nullable String startNextToken)
+            String streamArn, @Nullable String startShardId, @Nullable String startNextToken)
             throws InterruptedException {
         final ListShardsRequest listShardsRequest = new ListShardsRequest();
         if (startNextToken == null) {
             listShardsRequest.setExclusiveStartShardId(startShardId);
-            listShardsRequest.setStreamName(streamName);
+            listShardsRequest.setStreamARN(streamArn);

Review Comment:
   Does this impact the DDB streams adapter? I assume it is compatible with this SDK?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -253,4 +263,26 @@ private boolean isRetryable(Throwable err) {
 
         return true;
     }
+
+    private String lookupStreamArn(final String streamName) {
+        if (streamArn.get() == null) {
+            try {
+                final String arn =
+                        kinesisClient
+                                .describeStream(
+                                        DescribeStreamRequest.builder()
+                                                .streamName(streamName)
+                                                .build())
+                                .get()
+                                .streamDescription()
+                                .streamARN();

Review Comment:
   Is this possible backwards incompatible for users that are using the sink only today? After this change they might require additional IAM permissions? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer merged pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer merged PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1166944534


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java:
##########
@@ -89,13 +92,23 @@
                 maxBatchSizeInBytes,
                 maxTimeInBufferMS,
                 maxRecordSizeInBytes);
-        this.streamName =
-                Preconditions.checkNotNull(
-                        streamName,
-                        "The stream name must not be null when initializing the KDS Sink.");
-        Preconditions.checkArgument(
-                !this.streamName.isEmpty(),
-                "The stream name must be set when initializing the KDS Sink.");
+
+        // Ensure that either streamName or streamArn is set. If both are set, streamArn takes
+        // precedence.
+        if (streamArn != null) {

Review Comment:
   Do you think this can be reusable by firehose, maybe we should move to aws-util



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1182318616


##########
flink-connector-kinesis/src/main/resources/META-INF/NOTICE:
##########
@@ -45,15 +45,15 @@ This project bundles the following dependencies under the Apache Software Licens
 - com.fasterxml.jackson.core:jackson-databind:2.13.4.2
 - com.fasterxml.jackson.core:jackson-core:2.13.4
 - com.fasterxml.jackson.core:jackson-annotations:2.13.4
-- com.amazonaws:jmespath-java:1.12.276
+- com.amazonaws:jmespath-java:1.12.439
 - com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.3
-- com.amazonaws:aws-java-sdk-sts:1.12.276
-- com.amazonaws:aws-java-sdk-s3:1.12.276
-- com.amazonaws:aws-java-sdk-kms:1.12.276
-- com.amazonaws:aws-java-sdk-kinesis:1.12.276
-- com.amazonaws:aws-java-sdk-dynamodb:1.12.276
-- com.amazonaws:aws-java-sdk-core:1.12.276
-- com.amazonaws:aws-java-sdk-cloudwatch:1.12.276
+- com.amazonaws:aws-java-sdk-sts:1.12.439

Review Comment:
   good shout - updated



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -140,6 +141,9 @@ public class KinesisProxy implements KinesisProxyInterface {
     /** Exponential backoff power constant for the describe stream operation. */
     private final double describeStreamExpConstant;
 
+    /** Caches retrieved stream ARNs for give stream names. */
+    private Map<String, String> streamNameToArnLookup = new ConcurrentHashMap<>();

Review Comment:
   done



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -488,6 +495,15 @@ private List<StreamShardHandle> getShardsOfStream(
         return shardsOfStream;
     }
 
+    private synchronized String lookupStreamArn(String streamName) throws InterruptedException {
+        if (streamNameToArnLookup.containsKey(streamName)) {
+            return streamNameToArnLookup.get(streamName);
+        }
+        String streamArn = describeStream(streamName, null).getStreamDescription().getStreamARN();

Review Comment:
   It would first respect the `DESCRIBE_STREAM` restart configurations set by customer (defaults to 3 max retries). And if it fails after that, it would restart the job. Given we are caching the result, we should only see this on job startup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155912639


##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -488,6 +495,15 @@ private List<StreamShardHandle> getShardsOfStream(
         return shardsOfStream;
     }
 
+    private String lookupStreamArn(String streamName) throws InterruptedException {

Review Comment:
   Ok added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155890908


##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -488,6 +495,15 @@ private List<StreamShardHandle> getShardsOfStream(
         return shardsOfStream;
     }
 
+    private String lookupStreamArn(String streamName) throws InterruptedException {

Review Comment:
   We could `synchronize` this call to prevent multiple requests in parallel



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155889298


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -253,4 +263,26 @@ private boolean isRetryable(Throwable err) {
 
         return true;
     }
+
+    private String lookupStreamArn(final String streamName) {
+        if (streamArn.get() == null) {
+            try {
+                final String arn =
+                        kinesisClient
+                                .describeStream(
+                                        DescribeStreamRequest.builder()
+                                                .streamName(streamName)
+                                                .build())
+                                .get()
+                                .streamDescription()
+                                .streamARN();
+                streamArn.set(arn);
+                return arn;
+            } catch (InterruptedException | ExecutionException e) {
+                throw new KinesisStreamsException(

Review Comment:
   What happens if we get throttled? A Flink app with large parallelism could easily breach the 10 TPS on DescribeStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1182228611


##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -140,6 +141,9 @@ public class KinesisProxy implements KinesisProxyInterface {
     /** Exponential backoff power constant for the describe stream operation. */
     private final double describeStreamExpConstant;
 
+    /** Caches retrieved stream ARNs for give stream names. */
+    private Map<String, String> streamNameToArnLookup = new ConcurrentHashMap<>();

Review Comment:
   nit: can be `final`



##########
flink-connector-kinesis/src/main/resources/META-INF/NOTICE:
##########
@@ -45,15 +45,15 @@ This project bundles the following dependencies under the Apache Software Licens
 - com.fasterxml.jackson.core:jackson-databind:2.13.4.2
 - com.fasterxml.jackson.core:jackson-core:2.13.4
 - com.fasterxml.jackson.core:jackson-annotations:2.13.4
-- com.amazonaws:jmespath-java:1.12.276
+- com.amazonaws:jmespath-java:1.12.439
 - com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.3
-- com.amazonaws:aws-java-sdk-sts:1.12.276
-- com.amazonaws:aws-java-sdk-s3:1.12.276
-- com.amazonaws:aws-java-sdk-kms:1.12.276
-- com.amazonaws:aws-java-sdk-kinesis:1.12.276
-- com.amazonaws:aws-java-sdk-dynamodb:1.12.276
-- com.amazonaws:aws-java-sdk-core:1.12.276
-- com.amazonaws:aws-java-sdk-cloudwatch:1.12.276
+- com.amazonaws:aws-java-sdk-sts:1.12.439

Review Comment:
   Should we have included `arns` here?



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -488,6 +495,15 @@ private List<StreamShardHandle> getShardsOfStream(
         return shardsOfStream;
     }
 
+    private synchronized String lookupStreamArn(String streamName) throws InterruptedException {
+        if (streamNameToArnLookup.containsKey(streamName)) {
+            return streamNameToArnLookup.get(streamName);
+        }
+        String streamArn = describeStream(streamName, null).getStreamDescription().getStreamARN();

Review Comment:
   What happens if this fails? Does it restart the job or retry?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155889829


##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -140,6 +141,9 @@ public class KinesisProxy implements KinesisProxyInterface {
     /** Exponential backoff power constant for the describe stream operation. */
     private final double describeStreamExpConstant;
 
+    /** Caches retrieved stream ARNs for give stream names. */
+    private ConcurrentHashMap<String, String> streamNameToArnLookup = new ConcurrentHashMap<>();

Review Comment:
   ok, changed



##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -498,18 +514,18 @@ private List<StreamShardHandle> getShardsOfStream(
      * subtask's fetcher. This jitter backoff approach will help distribute calls across the
      * fetchers over time.
      *
-     * @param streamName the stream to describe
+     * @param streamArn the stream to describe
      * @param startShardId which shard to start with for this describe operation (earlier shard's
      *     infos will not appear in result)
      * @return the result of the describe stream operation
      */
     private ListShardsResult listShards(
-            String streamName, @Nullable String startShardId, @Nullable String startNextToken)
+            String streamArn, @Nullable String startShardId, @Nullable String startNextToken)
             throws InterruptedException {
         final ListShardsRequest listShardsRequest = new ListShardsRequest();
         if (startNextToken == null) {
             listShardsRequest.setExclusiveStartShardId(startShardId);
-            listShardsRequest.setStreamName(streamName);
+            listShardsRequest.setStreamARN(streamArn);

Review Comment:
   Good to check. It is compatible.
   
   There are changes to two places:
   - `listShards()` -> `DynamoDBStreamsProxy` overrides the `getShardList()` method, so this code path is not used
   - `getShardIterator()` -> `DynamoDBStreamsProxy` calls `super.getShardIterator()`, so it's important to check. The `AdapterClient` is compatible, because it treats `streamName` and `streamArn` interchangably.
      - [DescribeStreamResultAdapter returns stream ARN as stream Name](https://github.com/awslabs/dynamodb-streams-kinesis-adapter/blob/4fc3d79a0a0ea51b05175510b4cbd7daaa286587/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/model/StreamDescriptionAdapter.java#L55-L57)
      -  [GetShardIteratorRequestAdapter sets stream Name when stream ARN is set](https://github.com/awslabs/dynamodb-streams-kinesis-adapter/blob/4fc3d79a0a0ea51b05175510b4cbd7daaa286587/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/model/GetShardIteratorRequestAdapter.java#L49-L51)



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -94,6 +97,9 @@
     /* Flag to whether fatally fail any time we encounter an exception when persisting records */
     private final boolean failOnError;
 
+    /* Stores the identified stream ARN */
+    private AtomicReference<String> streamArn = new AtomicReference<>();

Review Comment:
   Ok added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155819229


##########
pom.xml:
##########
@@ -53,7 +53,7 @@ under the License.
     </scm>
 
     <properties>
-        <aws.sdkv1.version>1.12.276</aws.sdkv1.version>
+        <aws.sdkv1.version>1.12.439</aws.sdkv1.version>

Review Comment:
   Need to update NOTICE files for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1174902579


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java:
##########
@@ -89,13 +92,23 @@
                 maxBatchSizeInBytes,
                 maxTimeInBufferMS,
                 maxRecordSizeInBytes);
-        this.streamName =
-                Preconditions.checkNotNull(
-                        streamName,
-                        "The stream name must not be null when initializing the KDS Sink.");
-        Preconditions.checkArgument(
-                !this.streamName.isEmpty(),
-                "The stream name must be set when initializing the KDS Sink.");
+
+        // Ensure that either streamName or streamArn is set. If both are set, streamArn takes
+        // precedence.
+        if (streamArn != null) {

Review Comment:
   mmm... We could move it once we re-use the logic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1166947224


##########
flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java:
##########
@@ -343,18 +344,21 @@ void wrongCredentialProviderNameShouldResultInFailureWhenInFailOnErrorIsOff() {
                 false, "WRONG", "Invalid AWS Credential Provider Type");
     }
 
-    private void credentialsProvidedThroughProfilePathShouldResultInFailure(
-            boolean failOnError,
-            String credentialsProvider,
-            String credentialsProfileLocation,
-            String expectedMessage) {
-        Properties properties =
-                getDefaultPropertiesWithoutCredentialsSetAndCredentialProvider(credentialsProvider);
-        properties.put(
-                AWSConfigConstants.profilePath(AWS_CREDENTIALS_PROVIDER),
-                credentialsProfileLocation);
-        assertRunWithPropertiesAndStreamShouldFailWithExceptionOfType(
-                failOnError, properties, expectedMessage);
+    @Test
+    void streamArnShouldTakePrecedenceOverStreamName() {

Review Comment:
   Great use case!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155890908


##########
flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java:
##########
@@ -488,6 +495,15 @@ private List<StreamShardHandle> getShardsOfStream(
         return shardsOfStream;
     }
 
+    private String lookupStreamArn(String streamName) throws InterruptedException {

Review Comment:
   We could `synchronize` this method to prevent multiple requests in parallel



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #65: [FLINK-31108][Connectors/Kinesis] Use Stream ARN instead of Stream Na…

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #65:
URL: https://github.com/apache/flink-connector-aws/pull/65#discussion_r1155847861


##########
pom.xml:
##########
@@ -53,7 +53,7 @@ under the License.
     </scm>
 
     <properties>
-        <aws.sdkv1.version>1.12.276</aws.sdkv1.version>
+        <aws.sdkv1.version>1.12.439</aws.sdkv1.version>

Review Comment:
   good catch. Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org