You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/03/01 11:22:20 UTC

[flink] branch master updated (9055279 -> e16ef6b)

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9055279  [hotfix] [runtime] Remove unused method ResourceManager#getNumberRequiredTaskManagers()
     new 493bf3d  [FLINK-25792][connectors] Only flushing the async sink base if it is possible to do it in a non blocking fashion or if the buffer is full. Added test to verify blocking behaviour when number of max in flight requests has been reached. Bubble up interrupted exception if raised while waiting to clear an in flight request in the async sink.
     new e16ef6b  [FLINK-25792][connectors] Changed repeated yielding in write to non blocking flush after complete request, changed tests so that the mailbox thread is cleared before each write().

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sink/KinesisFirehoseSinkWriterTest.java        |  14 +-
 .../base/sink/writer/AsyncSinkWriter.java          |  54 ++++--
 .../base/sink/writer/AsyncSinkWriterTest.java      | 198 ++++++++++++++-------
 .../base/sink/writer/TestSinkInitContext.java      |  51 +++---
 .../TestSinkInitContextAnyThreadMailbox.java       |  54 ++++++
 5 files changed, 269 insertions(+), 102 deletions(-)
 create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContextAnyThreadMailbox.java

[flink] 02/02: [FLINK-25792][connectors] Changed repeated yielding in write to non blocking flush after complete request, changed tests so that the mailbox thread is cleared before each write().

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e16ef6bb7ced7c66bd00b7dfe2c7199d7303a54c
Author: Zichen Liu <zi...@amazon.com>
AuthorDate: Thu Feb 24 19:45:23 2022 +0000

    [FLINK-25792][connectors] Changed repeated yielding in write to non blocking flush after complete request, changed tests so that the mailbox thread is cleared before each write().
---
 .../sink/KinesisFirehoseSinkWriterTest.java        | 13 ++--
 .../base/sink/writer/AsyncSinkWriter.java          | 16 +++--
 .../base/sink/writer/AsyncSinkWriterTest.java      | 75 +++++++++++++++++-----
 .../base/sink/writer/TestSinkInitContext.java      | 15 +++--
 4 files changed, 90 insertions(+), 29 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 81ebfb1..4542b1c 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -20,20 +20,22 @@ package org.apache.flink.connector.firehose.sink;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
-import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 
 import org.junit.Before;
 import org.junit.Test;
 import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.core.exception.SdkClientException;
 import software.amazon.awssdk.services.firehose.model.Record;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Properties;
+import java.util.concurrent.CompletionException;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 
 /** Covers construction, defaults and sanity checking of {@link KinesisFirehoseSinkWriter}. */
 public class KinesisFirehoseSinkWriterTest {
@@ -72,7 +74,7 @@ public class KinesisFirehoseSinkWriterTest {
                 .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
     }
 
-    @Test(expected = KinesisFirehoseException.KinesisFirehoseFailFastException.class)
+    @Test
     public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
             throws IOException, InterruptedException {
         TestSinkInitContext ctx = new TestSinkInitContext();
@@ -93,8 +95,11 @@ public class KinesisFirehoseSinkWriterTest {
         for (int i = 0; i < 12; i++) {
             writer.write("data_bytes", null);
         }
-        writer.flush(true);
-
+        assertThatExceptionOfType(CompletionException.class)
+                .isThrownBy(() -> writer.flush(true))
+                .withCauseInstanceOf(SdkClientException.class)
+                .withMessageContaining(
+                        "Unable to execute HTTP request: Connection refused: localhost/127.0.0.1:443");
         assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
     }
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 43fc110..f45a3a5 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -344,10 +344,6 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * </ul>
      */
     private void nonBlockingFlush() throws InterruptedException {
-        boolean uncompletedInFlightResponses = true;
-        while (uncompletedInFlightResponses) {
-            uncompletedInFlightResponses = mailboxExecutor.tryYield();
-        }
         while (!isInFlightRequestOrMessageLimitExceeded()
                 && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
                         || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
@@ -434,7 +430,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * @param failedRequestEntries requestEntries that need to be retried
      */
     private void completeRequest(
-            List<RequestEntryT> failedRequestEntries, int batchSize, long requestStartTime) {
+            List<RequestEntryT> failedRequestEntries, int batchSize, long requestStartTime)
+            throws InterruptedException {
         lastSendTimestamp = requestStartTime;
         ackTime = System.currentTimeMillis();
 
@@ -448,6 +445,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
         while (iterator.hasPrevious()) {
             addEntryToBuffer(iterator.previous(), true);
         }
+        nonBlockingFlush();
     }
 
     private void updateInFlightMessagesLimit(boolean isSuccessfulRequest) {
@@ -493,13 +491,19 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
     @Override
     public void flush(boolean flush) throws InterruptedException {
         while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 && flush)) {
-            mailboxExecutor.tryYield();
+            yieldIfThereExistsInFlightRequests();
             if (flush) {
                 flush();
             }
         }
     }
 
+    private void yieldIfThereExistsInFlightRequests() throws InterruptedException {
+        if (inFlightRequestsCount > 0) {
+            mailboxExecutor.yield();
+        }
+    }
+
     /**
      * All in-flight requests that are relevant for the snapshot have been completed, but there may
      * still be request entries in the internal buffers that are yet to be sent to the endpoint.
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 18390c0..c6e95c7 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.connector.base.sink.writer;
 
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 
@@ -556,26 +557,29 @@ public class AsyncSinkWriterTest {
     }
 
     @Test
-    public void prepareCommitFlushesInflightElementsIfFlushIsSetToFalse() throws Exception {
+    public void prepareCommitFlushesInflightElementsAndDoesNotFlushIfFlushIsSetToFalse()
+            throws Exception {
         AsyncSinkWriterImpl sink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
-                        .maxBatchSize(4)
+                        .maxBatchSize(8)
                         .maxBufferedRequests(10)
                         .simulateFailures(true)
                         .build();
         sink.write(String.valueOf(225)); // buffer: [225]
         sink.write(String.valueOf(0)); // buffer: [225, 0]
         sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
-        sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing next round
-        sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2], [225] fails
-        sink.write(String.valueOf(4)); // buffer: [225, 3, 4]
-
-        assertEquals(4, res.size());
-        sink.flush(false); // inflight should be added to  buffer still [225, 2]
-        assertEquals(4, res.size());
-        sink.flush(true); // buffer now flushed []
-        assertEquals(Arrays.asList(0, 1, 225, 2, 3, 4), res);
+        sink.write(String.valueOf(2)); // buffer: [2], inflight: [225], destination: [0, 1]
+
+        assertEquals(Arrays.asList(0, 1), res);
+        assertThatBufferStatesAreEqual(sink.wrapRequests(2), getWriterState(sink));
+
+        sink.flush(false); // buffer: [225, 2], inflight: [], destination: [0, 1]
+        assertEquals(Arrays.asList(0, 1), res);
+        assertThatBufferStatesAreEqual(sink.wrapRequests(225, 2), getWriterState(sink));
+
+        sink.flush(true); // buffer: [], inflight: [], destination: [0, 1, 225, 2]
+        assertEquals(Arrays.asList(0, 1, 225, 2), res);
     }
 
     @Test
@@ -851,7 +855,7 @@ public class AsyncSinkWriterTest {
         es.submit(
                 () -> {
                     try {
-                        sink.write("3");
+                        sink.writeAsNonMailboxThread("3");
                     } catch (IOException | InterruptedException e) {
                         e.printStackTrace();
                     }
@@ -867,6 +871,17 @@ public class AsyncSinkWriterTest {
                 "Executor Service stuck at termination, not terminated after 500ms!");
     }
 
+    /**
+     * A thread separate to the main thread is used to write 3 records to the destination and is
+     * blocked using the latch mechanism just before it writes to the destination, simulating a
+     * long-running in flight request.
+     *
+     * <p>Another thread separate to the main thread is then created and instructed to flush. The
+     * idea is to assert that this action is blocking because there is an in flight request it must
+     * wait to complete. Since the maximum number of inflight requests allowed is 1, we desire a
+     * blocking behaviour here. If the blocking behaviour is not achieved, then the test will
+     * immediately fail.
+     */
     @Test
     public void ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
             throws Exception {
@@ -884,16 +899,33 @@ public class AsyncSinkWriterTest {
                 new Thread(
                         () -> {
                             try {
-                                sink.write("1");
-                                sink.write("2");
-                                sink.write("3");
+                                sink.writeAsNonMailboxThread("1");
+                                sink.writeAsNonMailboxThread("2");
+                                sink.writeAsNonMailboxThread("3");
                             } catch (IOException | InterruptedException e) {
                                 e.printStackTrace();
+                                fail(
+                                        "Auxiliary thread encountered an exception when writing to the sink",
+                                        e);
                             }
                         });
         t.start();
 
         delayedStartLatch.await();
+        Thread s =
+                new Thread(
+                        () -> {
+                            try {
+                                sink.flush(true);
+                                fail(
+                                        "Sink did not block successfully and reached here when it shouldn't have.");
+                            } catch (InterruptedException ignored) {
+
+                            }
+                        });
+        Thread.sleep(300);
+        assertFalse(s.isInterrupted());
+        s.interrupt();
         blockedWriteLatch.countDown();
 
         t.join();
@@ -964,6 +996,19 @@ public class AsyncSinkWriterTest {
         }
 
         public void write(String val) throws IOException, InterruptedException {
+            yieldMailbox(sinkInitContext.getMailboxExecutor());
+            yieldMailbox(sinkInitContextAnyThreadMailbox.getMailboxExecutor());
+            write(val, null);
+        }
+
+        public void yieldMailbox(MailboxExecutor mailbox) {
+            boolean canYield = true;
+            while (canYield) {
+                canYield = mailbox.tryYield();
+            }
+        }
+
+        public void writeAsNonMailboxThread(String val) throws IOException, InterruptedException {
             write(val, null);
         }
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index a5bd015..b146190 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -51,6 +51,8 @@ public class TestSinkInitContext implements Sink.InitContext {
     private final SinkWriterMetricGroup metricGroup =
             InternalSinkWriterMetricGroup.mock(
                     metricListener.getMetricGroup(), operatorIOMetricGroup);
+    private final MailboxExecutor mailboxExecutor;
+
     StreamTaskActionExecutor streamTaskActionExecutor =
             new StreamTaskActionExecutor() {
                 @Override
@@ -70,6 +72,14 @@ public class TestSinkInitContext implements Sink.InitContext {
                 }
             };
 
+    public TestSinkInitContext() {
+        mailboxExecutor =
+                new MailboxExecutorImpl(
+                        new TaskMailboxImpl(Thread.currentThread()),
+                        Integer.MAX_VALUE,
+                        streamTaskActionExecutor);
+    }
+
     static {
         processingTimeService = new TestProcessingTimeService();
     }
@@ -81,10 +91,7 @@ public class TestSinkInitContext implements Sink.InitContext {
 
     @Override
     public MailboxExecutor getMailboxExecutor() {
-        return new MailboxExecutorImpl(
-                new TaskMailboxImpl(Thread.currentThread()),
-                Integer.MAX_VALUE,
-                streamTaskActionExecutor);
+        return mailboxExecutor;
     }
 
     @Override

[flink] 01/02: [FLINK-25792][connectors] Only flushing the async sink base if it is possible to do it in a non blocking fashion or if the buffer is full. Added test to verify blocking behaviour when number of max in flight requests has been reached. Bubble up interrupted exception if raised while waiting to clear an in flight request in the async sink.

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 493bf3d05e43dfa24f04b4c4f06458837ad399b6
Author: Zichen Liu <zi...@amazon.com>
AuthorDate: Mon Feb 7 13:17:28 2022 +0000

    [FLINK-25792][connectors] Only flushing the async sink base if it is possible to do it in a non blocking fashion or if the buffer is full. Added test to verify blocking behaviour when number of max in flight requests has been reached. Bubble up interrupted exception if raised while waiting to clear an in flight request in the async sink.
---
 .../sink/KinesisFirehoseSinkWriterTest.java        |   9 +-
 .../base/sink/writer/AsyncSinkWriter.java          |  46 +++++--
 .../base/sink/writer/AsyncSinkWriterTest.java      | 147 ++++++++++++---------
 .../base/sink/writer/TestSinkInitContext.java      |  36 ++---
 .../TestSinkInitContextAnyThreadMailbox.java       |  54 ++++++++
 5 files changed, 199 insertions(+), 93 deletions(-)

diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
index 0366d0f..81ebfb1 100644
--- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
+++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.firehose.sink;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
+import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 
@@ -71,15 +72,14 @@ public class KinesisFirehoseSinkWriterTest {
                 .isEqualTo(testString.getBytes(StandardCharsets.US_ASCII).length);
     }
 
-    @Test
+    @Test(expected = KinesisFirehoseException.KinesisFirehoseFailFastException.class)
     public void getNumRecordsOutErrorsCounterRecordsCorrectNumberOfFailures()
             throws IOException, InterruptedException {
-        Properties prop = AWSServicesTestUtils.createConfig("https://fake_aws_endpoint");
         TestSinkInitContext ctx = new TestSinkInitContext();
         KinesisFirehoseSink<String> kinesisFirehoseSink =
                 new KinesisFirehoseSink<>(
                         ELEMENT_CONVERTER_PLACEHOLDER,
-                        6,
+                        12,
                         16,
                         10000,
                         4 * 1024 * 1024L,
@@ -87,12 +87,13 @@ public class KinesisFirehoseSinkWriterTest {
                         1000 * 1024L,
                         true,
                         "test-stream",
-                        prop);
+                        AWSServicesTestUtils.createConfig("https://localhost"));
         SinkWriter<String> writer = kinesisFirehoseSink.createWriter(ctx);
 
         for (int i = 0; i < 12; i++) {
             writer.write("data_bytes", null);
         }
+        writer.flush(true);
 
         assertThat(ctx.metricGroup().getNumRecordsOutErrorsCounter().getCount()).isEqualTo(12);
     }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index bb9da6d..43fc110 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -324,31 +324,57 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
     @Override
     public void write(InputT element, Context context) throws IOException, InterruptedException {
         while (bufferedRequestEntries.size() >= maxBufferedRequests) {
-            mailboxExecutor.tryYield();
+            flush();
         }
 
         addEntryToBuffer(elementConverter.apply(element, context), false);
 
-        flushIfAble();
+        nonBlockingFlush();
     }
 
-    private void flushIfAble() {
-        while (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
-                || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes) {
+    /**
+     * Determines if a call to flush will be non-blocking (i.e. {@code inFlightRequestsCount} is
+     * strictly smaller than {@code maxInFlightRequests}). Also requires one of the following
+     * requirements to be met:
+     *
+     * <ul>
+     *   <li>The number of elements buffered is greater than or equal to the {@code maxBatchSize}
+     *   <li>The sum of the size in bytes of all records in the buffer is greater than or equal to
+     *       {@code maxBatchSizeInBytes}
+     * </ul>
+     */
+    private void nonBlockingFlush() throws InterruptedException {
+        boolean uncompletedInFlightResponses = true;
+        while (uncompletedInFlightResponses) {
+            uncompletedInFlightResponses = mailboxExecutor.tryYield();
+        }
+        while (!isInFlightRequestOrMessageLimitExceeded()
+                && (bufferedRequestEntries.size() >= getNextBatchSizeLimit()
+                        || bufferedRequestEntriesTotalSizeInBytes >= maxBatchSizeInBytes)) {
             flush();
         }
     }
 
     /**
+     * Determines if the sink should block and complete existing in flight requests before it may
+     * prudently create any new ones. This is exactly determined by if the number of requests
+     * currently in flight exceeds the maximum supported by the sink OR if the number of in flight
+     * messages exceeds the maximum determined to be appropriate by the rate limiting strategy.
+     */
+    private boolean isInFlightRequestOrMessageLimitExceeded() {
+        return inFlightRequestsCount >= maxInFlightRequests
+                || inFlightMessages >= rateLimitingStrategy.getRateLimit();
+    }
+
+    /**
      * Persists buffered RequestsEntries into the destination by invoking {@code
      * submitRequestEntries} with batches according to the user specified buffering hints.
      *
      * <p>The method blocks if too many async requests are in flight.
      */
-    private void flush() {
-        while (inFlightRequestsCount >= maxInFlightRequests
-                || inFlightMessages >= rateLimitingStrategy.getRateLimit()) {
-            mailboxExecutor.tryYield();
+    private void flush() throws InterruptedException {
+        while (isInFlightRequestOrMessageLimitExceeded()) {
+            mailboxExecutor.yield();
         }
 
         List<RequestEntryT> batch = createNextAvailableBatch();
@@ -465,7 +491,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
      * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
      */
     @Override
-    public void flush(boolean flush) {
+    public void flush(boolean flush) throws InterruptedException {
         while (inFlightRequestsCount > 0 || (bufferedRequestEntries.size() > 0 && flush)) {
             mailboxExecutor.tryYield();
             if (flush) {
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index bbb649d..18390c0 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -54,11 +54,13 @@ public class AsyncSinkWriterTest {
 
     private final List<Integer> res = new ArrayList<>();
     private TestSinkInitContext sinkInitContext;
+    private TestSinkInitContextAnyThreadMailbox sinkInitContextAnyThreadMailbox;
 
     @Before
     public void before() {
         res.clear();
         sinkInitContext = new TestSinkInitContext();
+        sinkInitContextAnyThreadMailbox = new TestSinkInitContextAnyThreadMailbox();
     }
 
     private void performNormalWriteOfEightyRecordsToMock()
@@ -241,10 +243,10 @@ public class AsyncSinkWriterTest {
                 sink, "965", Arrays.asList(25, 55), Arrays.asList());
 
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "75", Arrays.asList(25, 55), Arrays.asList(75));
+                sink, "75", Arrays.asList(25, 55, 965, 75), Arrays.asList());
 
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "95", Arrays.asList(25, 55), Arrays.asList(75, 95));
+                sink, "95", Arrays.asList(25, 55, 965, 75), Arrays.asList(95));
 
         /*
          * Writing 955 to the sink increases the buffer to size 3 containing [75, 95, 955]. This
@@ -257,16 +259,16 @@ public class AsyncSinkWriterTest {
          * 955 is in flight after failure.
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "955", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList());
+                sink, "955", Arrays.asList(25, 55, 965, 75), Arrays.asList(95, 955));
 
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "550", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(550));
+                sink, "550", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList());
 
         /*
          * [550, 45] are attempted to be persisted
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "45", Arrays.asList(25, 55, 965, 75, 95), Arrays.asList(550, 45));
+                sink, "45", Arrays.asList(25, 55, 965, 75, 95, 955, 550), Arrays.asList(45));
 
         /*
          * [550,45,35] triggers inflight request to be added, buffer should be [955,550,45,35]
@@ -276,17 +278,14 @@ public class AsyncSinkWriterTest {
          * All are persisted and batch size is 3.
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink, "35", Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35), Arrays.asList());
+                sink, "35", Arrays.asList(25, 55, 965, 75, 95, 955, 550), Arrays.asList(45, 35));
 
         /* ] should be in the bufferedRequestEntries
          * [ 550] should be in the inFlightRequest, ready to be added
          * [25, 55, 965, 75, 95, 995, 45, 35] should be downstream already
          */
         writeXToSinkAssertDestinationIsInStateYAndBufferHasZ(
-                sink,
-                "535",
-                Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35),
-                Arrays.asList(535));
+                sink, "535", Arrays.asList(25, 55, 965, 75, 95, 955, 550, 45, 35), Arrays.asList());
 
         // Checkpoint occurs
         sink.flush(true);
@@ -297,35 +296,52 @@ public class AsyncSinkWriterTest {
     }
 
     @Test
-    public void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater()
+    public void
+            testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfPrepareCommitIsTriggered()
+                    throws IOException, InterruptedException {
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkWriterImplBuilder()
+                        .context(sinkInitContext)
+                        .maxBatchSize(3)
+                        .simulateFailures(true)
+                        .build();
+        testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(sink);
+    }
+
+    @Test
+    public void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestIfBufferFillsToFull()
             throws IOException, InterruptedException {
         AsyncSinkWriterImpl sink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
                         .maxBatchSize(3)
+                        .maxInFlightRequests(1)
+                        .maxBufferedRequests(8)
                         .simulateFailures(true)
                         .build();
+        testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(sink);
+    }
 
+    private void testFailedEntriesAreRetriedInTheNextPossiblePersistRequestAndNoLater(
+            AsyncSinkWriterImpl sink) throws IOException, InterruptedException {
         sink.write("25");
         sink.write("55");
-        sink.write("965");
+        sink.write("965"); // Flush, 25, 55 succeeds, 965 fails and is moved in flight
         sink.write("75");
         sink.write("95");
         sink.write("955");
-        assertTrue(res.contains(965));
+        // Buffer has filled up to size 3, does not flush since there is an in flight request and
+        // the buffer still has space - in terms of both number of records and bytes
         sink.write("550");
         sink.write("645");
         sink.write("545");
         sink.write("535");
         sink.write("515");
-        assertTrue(res.contains(955));
         sink.write("505");
-        assertTrue(res.contains(550));
-        assertTrue(res.contains(645));
+        // Buffer continues to fill up without blocking on write, until eventually yield is called
+        // on the mailbox thread during the prepare commit
         sink.flush(true);
-        assertTrue(res.contains(545));
-        assertTrue(res.contains(535));
-        assertTrue(res.contains(515));
+        assertEquals(Arrays.asList(25, 55, 965, 75, 95, 955, 550, 645, 545, 535, 515, 505), res);
     }
 
     @Test
@@ -399,13 +415,10 @@ public class AsyncSinkWriterTest {
          * should occur once 7 elements have been written - an 8th element cannot be added since
          * that would make the buffer 32 bytes, which is over the threshold.
          */
-        for (int i = 0; i < 13; i++) {
+        for (int i = 0; i < 100; i++) {
             sink.write(String.valueOf(i));
+            assertEquals((i / 7) * 7, res.size());
         }
-        assertEquals(7, res.size());
-        sink.write(String.valueOf(13));
-        sink.write(String.valueOf(14));
-        assertEquals(14, res.size());
     }
 
     @Test
@@ -547,20 +560,22 @@ public class AsyncSinkWriterTest {
         AsyncSinkWriterImpl sink =
                 new AsyncSinkWriterImplBuilder()
                         .context(sinkInitContext)
-                        .maxBatchSize(3)
+                        .maxBatchSize(4)
                         .maxBufferedRequests(10)
                         .simulateFailures(true)
                         .build();
-        sink.write(String.valueOf(225)); // buffer :[225]
-        sink.write(String.valueOf(0)); // buffer [225,0]
-        sink.write(String.valueOf(1)); // buffer [225,0,1] -- flushing
-        sink.write(String.valueOf(2)); // flushing -- request should have [225,0,1], [225] fails,
-        // buffer has [2]
-        assertEquals(2, res.size());
+        sink.write(String.valueOf(225)); // buffer: [225]
+        sink.write(String.valueOf(0)); // buffer: [225, 0]
+        sink.write(String.valueOf(1)); // buffer: [225, 0, 1]
+        sink.write(String.valueOf(2)); // buffer: [225, 0, 1, 2] // flushing next round
+        sink.write(String.valueOf(3)); // flushing, request is [225, 0, 1, 2], [225] fails
+        sink.write(String.valueOf(4)); // buffer: [225, 3, 4]
+
+        assertEquals(4, res.size());
         sink.flush(false); // inflight should be added to  buffer still [225, 2]
-        assertEquals(2, res.size());
+        assertEquals(4, res.size());
         sink.flush(true); // buffer now flushed []
-        assertEquals(Arrays.asList(0, 1, 225, 2), res);
+        assertEquals(Arrays.asList(0, 1, 225, 2, 3, 4), res);
     }
 
     @Test
@@ -780,13 +795,8 @@ public class AsyncSinkWriterTest {
         CountDownLatch delayedStartLatch = new CountDownLatch(1);
         AsyncSinkWriterImpl sink =
                 new AsyncSinkReleaseAndBlockWriterImpl(
-                        sinkInitContext,
-                        3,
+                        sinkInitContextAnyThreadMailbox,
                         1,
-                        20,
-                        100,
-                        100,
-                        100,
                         blockedWriteLatch,
                         delayedStartLatch,
                         true);
@@ -816,19 +826,14 @@ public class AsyncSinkWriterTest {
         CountDownLatch delayedStartLatch = new CountDownLatch(1);
         AsyncSinkWriterImpl sink =
                 new AsyncSinkReleaseAndBlockWriterImpl(
-                        sinkInitContext,
-                        3,
+                        sinkInitContextAnyThreadMailbox,
                         2,
-                        20,
-                        100,
-                        100,
-                        100,
                         blockedWriteLatch,
                         delayedStartLatch,
                         false);
 
         writeTwoElementsAndInterleaveTheNextTwoElements(sink, blockedWriteLatch, delayedStartLatch);
-        assertEquals(new ArrayList<>(Arrays.asList(4, 1, 2, 3)), res);
+        assertEquals(Arrays.asList(4, 1, 2, 3), res);
     }
 
     private void writeTwoElementsAndInterleaveTheNextTwoElements(
@@ -862,6 +867,40 @@ public class AsyncSinkWriterTest {
                 "Executor Service stuck at termination, not terminated after 500ms!");
     }
 
+    @Test
+    public void ifTheNumberOfUncompletedInFlightRequestsIsTooManyThenBlockInFlushMethod()
+            throws Exception {
+        CountDownLatch blockedWriteLatch = new CountDownLatch(1);
+        CountDownLatch delayedStartLatch = new CountDownLatch(1);
+        AsyncSinkWriterImpl sink =
+                new AsyncSinkReleaseAndBlockWriterImpl(
+                        sinkInitContextAnyThreadMailbox,
+                        1,
+                        blockedWriteLatch,
+                        delayedStartLatch,
+                        false);
+
+        Thread t =
+                new Thread(
+                        () -> {
+                            try {
+                                sink.write("1");
+                                sink.write("2");
+                                sink.write("3");
+                            } catch (IOException | InterruptedException e) {
+                                e.printStackTrace();
+                            }
+                        });
+        t.start();
+
+        delayedStartLatch.await();
+        blockedWriteLatch.countDown();
+
+        t.join();
+
+        assertEquals(Arrays.asList(1, 2, 3), res);
+    }
+
     private BufferedRequestState<Integer> getWriterState(
             AsyncSinkWriter<String, Integer> sinkWriter) {
         List<BufferedRequestState<Integer>> states = sinkWriter.snapshotState(1);
@@ -1104,25 +1143,11 @@ public class AsyncSinkWriterTest {
 
         public AsyncSinkReleaseAndBlockWriterImpl(
                 Sink.InitContext context,
-                int maxBatchSize,
                 int maxInFlightRequests,
-                int maxBufferedRequests,
-                long maxBatchSizeInBytes,
-                long maxTimeInBufferMS,
-                long maxRecordSizeInBytes,
                 CountDownLatch blockedThreadLatch,
                 CountDownLatch delayedStartLatch,
                 boolean blockForLimitedTime) {
-            super(
-                    context,
-                    maxBatchSize,
-                    maxInFlightRequests,
-                    maxBufferedRequests,
-                    maxBatchSizeInBytes,
-                    maxTimeInBufferMS,
-                    maxRecordSizeInBytes,
-                    false,
-                    0);
+            super(context, 3, maxInFlightRequests, 20, 100, 100, 100, false, 0);
             this.blockedThreadLatch = blockedThreadLatch;
             this.delayedStartLatch = delayedStartLatch;
             this.blockForLimitedTime = blockForLimitedTime;
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
index 076da61..a5bd015 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java
@@ -51,6 +51,24 @@ public class TestSinkInitContext implements Sink.InitContext {
     private final SinkWriterMetricGroup metricGroup =
             InternalSinkWriterMetricGroup.mock(
                     metricListener.getMetricGroup(), operatorIOMetricGroup);
+    StreamTaskActionExecutor streamTaskActionExecutor =
+            new StreamTaskActionExecutor() {
+                @Override
+                public void run(RunnableWithException e) throws Exception {
+                    e.run();
+                }
+
+                @Override
+                public <E extends Throwable> void runThrowing(ThrowingRunnable<E> throwingRunnable)
+                        throws E {
+                    throwingRunnable.run();
+                }
+
+                @Override
+                public <R> R call(Callable<R> callable) throws Exception {
+                    return callable.call();
+                }
+            };
 
     static {
         processingTimeService = new TestProcessingTimeService();
@@ -63,24 +81,6 @@ public class TestSinkInitContext implements Sink.InitContext {
 
     @Override
     public MailboxExecutor getMailboxExecutor() {
-        StreamTaskActionExecutor streamTaskActionExecutor =
-                new StreamTaskActionExecutor() {
-                    @Override
-                    public void run(RunnableWithException e) throws Exception {
-                        e.run();
-                    }
-
-                    @Override
-                    public <E extends Throwable> void runThrowing(
-                            ThrowingRunnable<E> throwingRunnable) throws E {
-                        throwingRunnable.run();
-                    }
-
-                    @Override
-                    public <R> R call(Callable<R> callable) throws Exception {
-                        return callable.call();
-                    }
-                };
         return new MailboxExecutorImpl(
                 new TaskMailboxImpl(Thread.currentThread()),
                 Integer.MAX_VALUE,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContextAnyThreadMailbox.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContextAnyThreadMailbox.java
new file mode 100644
index 0000000..ed8c5ef
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContextAnyThreadMailbox.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink.writer;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+
+/**
+ * A mock implementation of a {@code Sink.InitContext} to be used in sink unit tests.
+ *
+ * <p>The only difference between this and {@link TestSinkInitContext} is that the mailbox thread
+ * methods of this context may be accessed from any thread. This is useful for testing fine-grained
+ * interleaving of threads that may be in the asynchronous part of {@code submitRequestEntries()} in
+ * the concrete sink against new mailbox threads entering {@code write()} in the base sink.
+ *
+ * <p>However, care must be taken to ensure deadlocks do not form in the test code, since we are
+ * artificially allowing multiple mailbox threads, when only one is supposed to exist.
+ */
+public class TestSinkInitContextAnyThreadMailbox extends TestSinkInitContext {
+    @Override
+    public MailboxExecutor getMailboxExecutor() {
+        return new MailboxExecutorImpl(
+                new AnyThreadTaskMailboxImpl(Thread.currentThread()),
+                Integer.MAX_VALUE,
+                streamTaskActionExecutor);
+    }
+
+    private static class AnyThreadTaskMailboxImpl extends TaskMailboxImpl {
+        public AnyThreadTaskMailboxImpl(Thread currentThread) {
+            super(currentThread);
+        }
+
+        @Override
+        public boolean isMailboxThread() {
+            return true;
+        }
+    }
+}