You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/08/11 10:10:33 UTC

[flink] branch release-1.15 updated: [FLINK-28027][connectors] Implement slow start for AIMDRateLimitingStrategy

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new d22c52f6dda [FLINK-28027][connectors] Implement slow start for AIMDRateLimitingStrategy
d22c52f6dda is described below

commit d22c52f6dda6d5aec37f50b2657293157ca40d96
Author: Hong Teoh <li...@amazon.com>
AuthorDate: Thu Aug 11 05:04:31 2022 +0100

    [FLINK-28027][connectors] Implement slow start for AIMDRateLimitingStrategy
---
 .../flink/connector/base/sink/writer/AsyncSinkWriter.java |  2 +-
 .../connector/base/sink/writer/AsyncSinkWriterTest.java   | 15 +++++++++++++--
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 090504ab8bc..f7a649b1636 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -291,7 +291,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
                         INFLIGHT_MESSAGES_LIMIT_INCREASE_RATE,
                         INFLIGHT_MESSAGES_LIMIT_DECREASE_FACTOR,
                         maxBatchSize * maxInFlightRequests,
-                        maxBatchSize * maxInFlightRequests);
+                        maxBatchSize);
 
         this.metrics = context.metricGroup();
         this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 47c437e01e5..14d9924bdd0 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -871,8 +871,19 @@ public class AsyncSinkWriterTest {
                         delayedStartLatch,
                         false);
 
+        // Write a single successful record to trigger increase in rateLimitingStrategy rate
+        // threshold
+        writeSingleElementUsingProcessingTimeTrigger(sink);
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
-        assertThat(res).isEqualTo(Arrays.asList(4, 1, 2, 3));
+        assertThat(res).isEqualTo(Arrays.asList(0, 4, 1, 2, 3));
+    }
+
+    private void writeSingleElementUsingProcessingTimeTrigger(AsyncSinkWriterImpl sink)
+            throws Exception {
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        sink.write("0");
+        tpts.setCurrentTime(100L);
     }
 
     private void writeTwoElementsAndInterleaveTheNextTwoElements(
@@ -898,7 +909,7 @@ public class AsyncSinkWriterTest {
 
         delayedStartLatch.await();
         sink.write("4");
-        tpts.setCurrentTime(100L);
+        tpts.setCurrentTime(200L);
         blockedWriteLatch.countDown();
         es.shutdown();
         assertThat(es.awaitTermination(500, TimeUnit.MILLISECONDS))