You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/10 08:37:04 UTC

[flink] branch release-1.16 updated: [FLINK-29827][Connectors/Base] Registering callback for asyncWriter on restore from state.

This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new f5f806044db [FLINK-29827][Connectors/Base] Registering callback for asyncWriter on restore from state.
f5f806044db is described below

commit f5f806044db373971b3cd611180d1e0227ceaedd
Author: Ahmed Hamdy <va...@amazon.com>
AuthorDate: Wed Nov 2 18:21:00 2022 +0000

    [FLINK-29827][Connectors/Base] Registering callback for asyncWriter on restore from state.
---
 .../base/sink/writer/AsyncSinkWriter.java          | 34 ++++++------------
 .../base/sink/writer/AsyncSinkWriterTest.java      | 40 ++++++++++++++++++++--
 2 files changed, 49 insertions(+), 25 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 17db7a7db03..f6c2cc86e3b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -441,27 +441,28 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
     }
 
     private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
+        addEntryToBuffer(new RequestEntryWrapper<>(entry, getSizeInBytes(entry)), insertAtHead);
+    }
+
+    private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> entry, boolean insertAtHead) {
         if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
             registerCallback();
         }
 
-        RequestEntryWrapper<RequestEntryT> wrappedEntry =
-                new RequestEntryWrapper<>(entry, getSizeInBytes(entry));
-
-        if (wrappedEntry.getSize() > maxRecordSizeInBytes) {
+        if (entry.getSize() > maxRecordSizeInBytes) {
             throw new IllegalArgumentException(
                     String.format(
                             "The request entry sent to the buffer was of size [%s], when the maxRecordSizeInBytes was set to [%s].",
-                            wrappedEntry.getSize(), maxRecordSizeInBytes));
+                            entry.getSize(), maxRecordSizeInBytes));
         }
 
         if (insertAtHead) {
-            bufferedRequestEntries.addFirst(wrappedEntry);
+            bufferedRequestEntries.addFirst(entry);
         } else {
-            bufferedRequestEntries.add(wrappedEntry);
+            bufferedRequestEntries.add(entry);
         }
 
-        bufferedRequestEntriesTotalSizeInBytes += wrappedEntry.getSize();
+        bufferedRequestEntriesTotalSizeInBytes += entry.getSize();
     }
 
     /**
@@ -501,23 +502,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
 
     private void initializeState(Collection<BufferedRequestState<RequestEntryT>> states) {
         for (BufferedRequestState<RequestEntryT> state : states) {
-            initializeState(state);
-        }
-    }
-
-    private void initializeState(BufferedRequestState<RequestEntryT> state) {
-        this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
-
-        for (RequestEntryWrapper<RequestEntryT> wrapper : bufferedRequestEntries) {
-            if (wrapper.getSize() > maxRecordSizeInBytes) {
-                throw new IllegalStateException(
-                        String.format(
-                                "State contains record of size %d which exceeds sink maximum record size %d.",
-                                wrapper.getSize(), maxRecordSizeInBytes));
+            for (RequestEntryWrapper<RequestEntryT> wrapper : state.getBufferedRequestEntries()) {
+                addEntryToBuffer(wrapper, false);
             }
         }
-
-        this.bufferedRequestEntriesTotalSizeInBytes += state.getStateSize();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 5045e1e3d6d..bd25179b4ca 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -665,7 +665,7 @@ public class AsyncSinkWriterTest {
         sink.write(String.valueOf(225)); // Buffer: 100/110B; 1/10 elements; 0 inflight
         sink.flush(false);
         List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
-        assertThatExceptionOfType(IllegalStateException.class)
+        assertThatExceptionOfType(IllegalArgumentException.class)
                 .isThrownBy(
                         () ->
                                 new AsyncSinkWriterImplBuilder()
@@ -673,7 +673,7 @@ public class AsyncSinkWriterTest {
                                         .maxRecordSizeInBytes(15)
                                         .buildWithState(states))
                 .withMessageContaining(
-                        "State contains record of size 100 which exceeds sink maximum record size 15.");
+                        "The request entry sent to the buffer was of size [100], when the maxRecordSizeInBytes was set to [15].");
     }
 
     @Test
@@ -709,6 +709,42 @@ public class AsyncSinkWriterTest {
                 .containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
     }
 
+    @Test
+    public void testWriterInitializedWithStateHasCallbackRegistered() throws Exception {
+        AsyncSinkWriterImpl initialSinkWriter =
+                new AsyncSinkWriterImplBuilder()
+                        .context(sinkInitContext)
+                        .maxBatchSize(10)
+                        .maxInFlightRequests(20)
+                        .maxBatchSizeInBytes(10_000)
+                        .maxTimeInBufferMS(100)
+                        .maxRecordSizeInBytes(10_000)
+                        .simulateFailures(true)
+                        .build();
+        assertThat(res.size()).isEqualTo(0);
+        TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+        tpts.setCurrentTime(0L);
+        initialSinkWriter.write("1");
+        initialSinkWriter.write("2");
+        initialSinkWriter.write("3");
+        tpts.setCurrentTime(10L);
+
+        AsyncSinkWriterImpl restoredSinkWriter =
+                new AsyncSinkWriterImplBuilder()
+                        .context(sinkInitContext)
+                        .maxBatchSize(10)
+                        .maxInFlightRequests(20)
+                        .maxBatchSizeInBytes(10_000)
+                        .maxTimeInBufferMS(10)
+                        .maxRecordSizeInBytes(10_000)
+                        .simulateFailures(true)
+                        .buildWithState(initialSinkWriter.snapshotState(1));
+        restoredSinkWriter.write("4");
+
+        tpts.setCurrentTime(30L);
+        assertThat(res.size()).isEqualTo(4);
+    }
+
     @Test
     public void testThatOneAndOnlyOneCallbackIsEverRegistered() throws Exception {
         AsyncSinkWriterImpl sink =