You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by da...@apache.org on 2022/11/10 08:37:04 UTC
[flink] branch release-1.16 updated: [FLINK-29827][Connectors/Base] Registering callback for asyncWriter on restore from state.
This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new f5f806044db [FLINK-29827][Connectors/Base] Registering callback for asyncWriter on restore from state.
f5f806044db is described below
commit f5f806044db373971b3cd611180d1e0227ceaedd
Author: Ahmed Hamdy <va...@amazon.com>
AuthorDate: Wed Nov 2 18:21:00 2022 +0000
[FLINK-29827][Connectors/Base] Registering callback for asyncWriter on restore from state.
---
.../base/sink/writer/AsyncSinkWriter.java | 34 ++++++------------
.../base/sink/writer/AsyncSinkWriterTest.java | 40 ++++++++++++++++++++--
2 files changed, 49 insertions(+), 25 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index 17db7a7db03..f6c2cc86e3b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -441,27 +441,28 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
}
private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
+ addEntryToBuffer(new RequestEntryWrapper<>(entry, getSizeInBytes(entry)), insertAtHead);
+ }
+
+ private void addEntryToBuffer(RequestEntryWrapper<RequestEntryT> entry, boolean insertAtHead) {
if (bufferedRequestEntries.isEmpty() && !existsActiveTimerCallback) {
registerCallback();
}
- RequestEntryWrapper<RequestEntryT> wrappedEntry =
- new RequestEntryWrapper<>(entry, getSizeInBytes(entry));
-
- if (wrappedEntry.getSize() > maxRecordSizeInBytes) {
+ if (entry.getSize() > maxRecordSizeInBytes) {
throw new IllegalArgumentException(
String.format(
"The request entry sent to the buffer was of size [%s], when the maxRecordSizeInBytes was set to [%s].",
- wrappedEntry.getSize(), maxRecordSizeInBytes));
+ entry.getSize(), maxRecordSizeInBytes));
}
if (insertAtHead) {
- bufferedRequestEntries.addFirst(wrappedEntry);
+ bufferedRequestEntries.addFirst(entry);
} else {
- bufferedRequestEntries.add(wrappedEntry);
+ bufferedRequestEntries.add(entry);
}
- bufferedRequestEntriesTotalSizeInBytes += wrappedEntry.getSize();
+ bufferedRequestEntriesTotalSizeInBytes += entry.getSize();
}
/**
@@ -501,23 +502,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
private void initializeState(Collection<BufferedRequestState<RequestEntryT>> states) {
for (BufferedRequestState<RequestEntryT> state : states) {
- initializeState(state);
- }
- }
-
- private void initializeState(BufferedRequestState<RequestEntryT> state) {
- this.bufferedRequestEntries.addAll(state.getBufferedRequestEntries());
-
- for (RequestEntryWrapper<RequestEntryT> wrapper : bufferedRequestEntries) {
- if (wrapper.getSize() > maxRecordSizeInBytes) {
- throw new IllegalStateException(
- String.format(
- "State contains record of size %d which exceeds sink maximum record size %d.",
- wrapper.getSize(), maxRecordSizeInBytes));
+ for (RequestEntryWrapper<RequestEntryT> wrapper : state.getBufferedRequestEntries()) {
+ addEntryToBuffer(wrapper, false);
}
}
-
- this.bufferedRequestEntriesTotalSizeInBytes += state.getStateSize();
}
@Override
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 5045e1e3d6d..bd25179b4ca 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -665,7 +665,7 @@ public class AsyncSinkWriterTest {
sink.write(String.valueOf(225)); // Buffer: 100/110B; 1/10 elements; 0 inflight
sink.flush(false);
List<BufferedRequestState<Integer>> states = sink.snapshotState(1);
- assertThatExceptionOfType(IllegalStateException.class)
+ assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
() ->
new AsyncSinkWriterImplBuilder()
@@ -673,7 +673,7 @@ public class AsyncSinkWriterTest {
.maxRecordSizeInBytes(15)
.buildWithState(states))
.withMessageContaining(
- "State contains record of size 100 which exceeds sink maximum record size 15.");
+ "The request entry sent to the buffer was of size [100], when the maxRecordSizeInBytes was set to [15].");
}
@Test
@@ -709,6 +709,42 @@ public class AsyncSinkWriterTest {
.containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6);
}
+ @Test
+ public void testWriterInitializedWithStateHasCallbackRegistered() throws Exception {
+ AsyncSinkWriterImpl initialSinkWriter =
+ new AsyncSinkWriterImplBuilder()
+ .context(sinkInitContext)
+ .maxBatchSize(10)
+ .maxInFlightRequests(20)
+ .maxBatchSizeInBytes(10_000)
+ .maxTimeInBufferMS(100)
+ .maxRecordSizeInBytes(10_000)
+ .simulateFailures(true)
+ .build();
+ assertThat(res.size()).isEqualTo(0);
+ TestProcessingTimeService tpts = sinkInitContext.getTestProcessingTimeService();
+ tpts.setCurrentTime(0L);
+ initialSinkWriter.write("1");
+ initialSinkWriter.write("2");
+ initialSinkWriter.write("3");
+ tpts.setCurrentTime(10L);
+
+ AsyncSinkWriterImpl restoredSinkWriter =
+ new AsyncSinkWriterImplBuilder()
+ .context(sinkInitContext)
+ .maxBatchSize(10)
+ .maxInFlightRequests(20)
+ .maxBatchSizeInBytes(10_000)
+ .maxTimeInBufferMS(10)
+ .maxRecordSizeInBytes(10_000)
+ .simulateFailures(true)
+ .buildWithState(initialSinkWriter.snapshotState(1));
+ restoredSinkWriter.write("4");
+
+ tpts.setCurrentTime(30L);
+ assertThat(res.size()).isEqualTo(4);
+ }
+
@Test
public void testThatOneAndOnlyOneCallbackIsEverRegistered() throws Exception {
AsyncSinkWriterImpl sink =