You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/08/11 10:10:33 UTC
[flink] branch release-1.15 updated: [FLINK-28027][connectors] Implement slow start for AIMDRateLimitingStrategy
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new d22c52f6dda [FLINK-28027][connectors] Implement slow start for AIMDRateLimitingStrategy
d22c52f6dda is described below
commit d22c52f6dda6d5aec37f50b2657293157ca40d96
Author: Hong Teoh <li...@amazon.com>
AuthorDate: Thu Aug 11 05:04:31 2022 +0100
[FLINK-28027][connectors] Implement slow start for AIMDRateLimitingStrategy
---
.../flink/connector/base/sink/writer/AsyncSinkWriter.java | 2 +-
.../connector/base/sink/writer/AsyncSinkWriterTest.java | 15 +++++++++++++--
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 090504ab8bc..f7a649b1636 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -291,7 +291,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
INFLIGHT_MESSAGES_LIMIT_INCREASE_RATE,
INFLIGHT_MESSAGES_LIMIT_DECREASE_FACTOR,
maxBatchSize * maxInFlightRequests,
- maxBatchSize * maxInFlightRequests);
+ maxBatchSize);
this.metrics = context.metricGroup();
this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 47c437e01e5..14d9924bdd0 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -871,8 +871,19 @@ public class AsyncSinkWriterTest {
delayedStartLatch,
false);
+ // Write a single successful record to trigger increase in rateLimitingStrategy rate
+ // threshold
+ writeSingleElementUsingProcessingTimeTrigger(sink);
writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
- assertThat(res).isEqualTo(Arrays.asList(4, 1, 2, 3));
+ assertThat(res).isEqualTo(Arrays.asList(0, 4, 1, 2, 3));
+ }
+
+ private void writeSingleElementUsingProcessingTimeTrigger(AsyncSinkWriterImpl sink)
+ throws Exception {
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ tpts.setCurrentTime(0L);
+ sink.write("0");
+ tpts.setCurrentTime(100L);
}
private void writeTwoElementsAndInterleaveTheNextTwoElements(
@@ -898,7 +909,7 @@ public class AsyncSinkWriterTest {
delayedStartLatch.await();
sink.write("4");
- tpts.setCurrentTime(100L);
+ tpts.setCurrentTime(200L);
blockedWriteLatch.countDown();
es.shutdown();
assertThat(es.awaitTermination(500, TimeUnit.MILLISECONDS))