You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/01 15:02:19 UTC

[GitHub] [flink] dannycranmer commented on a change in pull request #18945: [FLINK-26417][connector/base] Add failed Inflight requests in async writer snapshot state

dannycranmer commented on a change in pull request #18945:
URL: https://github.com/apache/flink/pull/18945#discussion_r816850728



##########
File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -504,6 +506,16 @@ private void yieldIfThereExistsInFlightRequests() throws InterruptedException {
         }
     }
 
+    /**
+     * This method is called before a snapshot is requested. All inflight requests should be
+     * completed to capture failed requests in the snapshot.
+     */
+    @Override
+    public Collection<Void> prepareCommit() throws InterruptedException {
+        yieldIfThereExistsInFlightRequests();

Review comment:
       This should be done in a loop? Looks like this only yields once https://github.com/vahmed-hamdy/flink/blob/0ad125ee07e6030ac1505cfac1ed72349e1b7d9f/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java#L503

##########
File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -504,6 +506,16 @@ private void yieldIfThereExistsInFlightRequests() throws InterruptedException {
         }
     }
 
+    /**
+     * This method is called before a snapshot is requested. All inflight requests should be
+     * completed to capture failed requests in the snapshot.
+     */
+    @Override
+    public Collection<Void> prepareCommit() throws InterruptedException {
+        yieldIfThereExistsInFlightRequests();

Review comment:
       Based on this, looks like there is a gap in testing. Can you add a unit test to address this gap

##########
File path: flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -133,7 +133,7 @@ public void sinkToAllowBatchSizesEqualToByteWiseLimit()
     }
 
     @Test
-    public void testPreparingCommitAtSnapshotTimeEnsuresBufferedRecordsArePersistedToDestination()
+    public void testFlushingAtEndEnsuresBufferedRecordsArePersistedToDestination()
             throws IOException, InterruptedException {
         AsyncSinkWriterImpl sink =

Review comment:
       Looks like you have repurposed existing tests rather than adding new ones? Are the existing tests no longer valid? Why did you modify them rather than adding new ones?

##########
File path: flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -51,7 +52,8 @@
  */
 @PublicEvolving
 public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
-        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>> {
+        implements StatefulSink.StatefulSinkWriter<InputT, BufferedRequestState<RequestEntryT>>,
+                TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, Void> {

Review comment:
       Did you verify that Flink actually calls prepareCommit for non `TwoPhaseCommittingSink` Sinks?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org