You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "vahmed-hamdy (via GitHub)" <gi...@apache.org> on 2023/04/14 17:29:41 UTC

[GitHub] [flink-connector-aws] vahmed-hamdy opened a new pull request, #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

vahmed-hamdy opened a new pull request, #70:
URL: https://github.com/apache/flink-connector-aws/pull/70

   
   
   <!--
   *Thank you for contributing to Apache Flink AWS Connectors - we are happy that you want to help us improve our Flink connectors. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   ## Contribution Checklist
   
   - The name of the pull request should correspond to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
   - Commits should be in the form of "[FLINK-XXXX][component] Title of the pull request", where [FLINK-XXXX] should be replaced by the actual issue number. 
       Generally, [component] should be the connector you are working on.
       For example: "[FLINK-XXXX][Connectors/Kinesis] XXXX" if you are working on the Kinesis connector or "[FLINK-XXXX][Connectors/AWS] XXXX" if you are working on components shared among all the connectors.
   - Each pull request should only have one JIRA issue.
   - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   -->
   
   ## Purpose of the change
   
   *For example: Implements the Table API for the Kinesis Source.*
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   - *Added integration tests for end-to-end deployment*
   - *Added unit tests*
   - *Manually verified by running the Kinesis connector on a local Flink cluster.*
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
     - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#issuecomment-1510930733

   Should we also update the PR summary?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#issuecomment-1519527926

   Note that performance improvements have been put on the JIRA as comments:
   https://issues.apache.org/jira/browse/FLINK-31772


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1168359667


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -166,6 +174,20 @@ private KinesisAsyncClient buildClient(
                 KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
     }
 
+    private static RateLimitingStrategy buildRateLimitingStrategy(
+            int maxInFlightRequests, int maxBatchSize) {
+        return CongestionControlRateLimitingStrategy.builder()
+                .setMaxInFlightRequests(maxInFlightRequests)
+                .setInitialMaxInFlightMessages(maxBatchSize)
+                .setScalingStrategy(
+                        AIMDScalingStrategy.builder(maxInFlightRequests * maxInFlightRequests)

Review Comment:
   should this be maxBatchSize * maxInFlightRequests ?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -166,6 +174,20 @@ private KinesisAsyncClient buildClient(
                 KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX);
     }
 
+    private static RateLimitingStrategy buildRateLimitingStrategy(
+            int maxInFlightRequests, int maxBatchSize) {
+        return CongestionControlRateLimitingStrategy.builder()
+                .setMaxInFlightRequests(maxInFlightRequests)
+                .setInitialMaxInFlightMessages(maxBatchSize)
+                .setScalingStrategy(
+                        AIMDScalingStrategy.builder(maxInFlightRequests * maxInFlightRequests)
+                                .setIncreaseRate(KINESIS_AIMD_RATE_LIMITING_STRATEGY_INCREASE_RATE)

Review Comment:
   Nit: we could rename this without KINESIS_, since its scoped to the class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1185454980


##########
flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;

Review Comment:
   Thanks @hlteoh37 ! I agree as per coding guidelines.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1175360642


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -144,6 +150,8 @@
                         .setMaxBufferedRequests(maxBufferedRequests)
                         .setMaxTimeInBufferMS(maxTimeInBufferMS)
                         .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .setRateLimitingStrategy(

Review Comment:
   We verified regression in Kinesis Streams only.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] boring-cyborg[bot] commented on pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#issuecomment-1549697513

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1175722682


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -144,6 +150,8 @@
                         .setMaxBufferedRequests(maxBufferedRequests)
                         .setMaxTimeInBufferMS(maxTimeInBufferMS)
                         .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .setRateLimitingStrategy(

Review Comment:
   @dannycranmer Do you think we should follow up with regression testing for KDF/DDB?
   
   I am skeptical to enforce it against `AsyncSink` as the previous value is standard for AIMD and could be addressed by sink implementer as suitable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy closed pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy closed pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression
URL: https://github.com/apache/flink-connector-aws/pull/70


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#issuecomment-1539815250

   @dannycranmer 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#issuecomment-1539815851

   @dannycranmer Could we rerun the pipeline and merge if passed plz?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] boring-cyborg[bot] commented on pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "boring-cyborg[bot] (via GitHub)" <gi...@apache.org>.
boring-cyborg[bot] commented on PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#issuecomment-1508997457

   Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] hlteoh37 commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "hlteoh37 (via GitHub)" <gi...@apache.org>.
hlteoh37 commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1182191285


##########
flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriterTest.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;

Review Comment:
   Could we not use Mockito here?
   
   Instead let's consider using JUnit's reflection to validate this. It's not great that we are relying on reflection, but it feels better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1175081394


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -144,6 +150,8 @@
                         .setMaxBufferedRequests(maxBufferedRequests)
                         .setMaxTimeInBufferMS(maxTimeInBufferMS)
                         .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .setRateLimitingStrategy(

Review Comment:
   Why are we only tweaking KDS? Are KDF and DDB not impacted by this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1177504112


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -144,6 +150,8 @@
                         .setMaxBufferedRequests(maxBufferedRequests)
                         .setMaxTimeInBufferMS(maxTimeInBufferMS)
                         .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .setRateLimitingStrategy(
+                                buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize))

Review Comment:
   Since you did not change any unit tests this must be uncovered, or unasserted. Can you add tests to prevent regressions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1177501504


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -144,6 +150,8 @@
                         .setMaxBufferedRequests(maxBufferedRequests)
                         .setMaxTimeInBufferMS(maxTimeInBufferMS)
                         .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .setRateLimitingStrategy(

Review Comment:
   Yes @vahmed-hamdy let's follow up to perform the same testing and tweak rate limiters if needed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] vahmed-hamdy commented on a diff in pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "vahmed-hamdy (via GitHub)" <gi...@apache.org>.
vahmed-hamdy commented on code in PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70#discussion_r1177508026


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java:
##########
@@ -144,6 +150,8 @@
                         .setMaxBufferedRequests(maxBufferedRequests)
                         .setMaxTimeInBufferMS(maxTimeInBufferMS)
                         .setMaxRecordSizeInBytes(maxRecordSizeInBytes)
+                        .setRateLimitingStrategy(
+                                buildRateLimitingStrategy(maxInFlightRequests, maxBatchSize))

Review Comment:
   yeah it was uncovered. 
   thanks Danny will add Unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-connector-aws] dannycranmer merged pull request #70: [FLINK-31772] Adjusting Kinesis Ratelimiting strategy to fix performance regression

Posted by "dannycranmer (via GitHub)" <gi...@apache.org>.
dannycranmer merged PR #70:
URL: https://github.com/apache/flink-connector-aws/pull/70


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org