You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/05 11:56:48 UTC

[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r913705135


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   This is overly complicated.
   
   When you have a `CompletionStage` you can just apply a `whenComplete`. with the callback being re-written to a `BiConsumer<V, Throwable>`.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/SinkUtils.java:
##########
@@ -25,6 +27,7 @@
 import java.util.concurrent.TimeoutException;
 
 /** Utility class for sinks. */
+@Internal

Review Comment:
   Utils used by connectors shouldn't be `@Internal`, because it conflicts with the plans to externalize connectors.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -62,7 +70,10 @@ protected OutputFormatBase(int maxConcurrentRequests, Duration maxConcurrentRequ
         this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
     }
 
-    /** Opens the format and initializes the flush system. */
+    /**
+     * Open the format and initializes the flush system. Implementers must call {@code
+     * super.open()}.
+     */

Review Comment:
   You could enforce this by making open() final and adding another initialization hook for sub-classes that we call at the end of `open()`.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -40,6 +47,7 @@
  *
  * @param <OUT> Type of the elements to write.
  */
+@PublicEvolving

Review Comment:
   Lets keep this experimental for now.



##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -86,4 +94,57 @@ public void close() throws IOException {
             }
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable =
+                new CompletableFuture<T>() {

Review Comment:
   This whole implementation is a bit strange.
   
   a) we dont need to worry about cancel because the base works against the CompletationStage interface which doesn't support cancellation.
   b) get() references the listenableFuture, but when the listenableFuture completes we complete the future. This shouldn't work in both directions.
   
   Just create a plain CompFuture, and keep the behavior in the callback.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -120,7 +145,10 @@ private void checkAsyncErrors() throws IOException {
         }
     }
 
-    /** Closes the format waiting for pending writes and reports errors. */
+    /**
+     * Close the format waiting for pending writes and reports errors. Implementers must call {@code
+     * super.close()}.

Review Comment:
   Same comment as for `open()`.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   This will also remove the need for the odd `send()` javadoc that `toCompletableFuture()` must be properly implemented. (Which is weird in any case since , you know, every method of an interface should be properly implemented.) Realistically this will just be a CompletableFuture anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org