You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/10 16:26:11 UTC

[GitHub] [flink] echauchot opened a new pull request, #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

echauchot opened a new pull request, #19935:
URL: https://github.com/apache/flink/pull/19935

   ## What is the purpose of the change
   
   Move OutputFormatBase to flink-core to offer flush mechanism to all output formats
   
   ## Brief change log
   
   Move OutputFormatBase from connectors-cassandra module to flink-core to offer flush mechanism to all output formats.
   Main change was to change (not yet public) send method signature so that it returns a `CompletionStage` and not a guava `ListenableFuture`. Implementers can chose their `CompletionStage` implementation as long as it implements `CompletionStage#toCompletableFuture()`
   
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as OutputFormatBaseTest and CassandraConnectorITCase (for first implementation)
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes (not applicable / docs / JavaDocs / not documented)
     - If yes, how is the feature documented? javadocs
   
   R: @zentol  as discussed in the other PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1154956837

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol merged pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol merged PR #19935:
URL: https://github.com/apache/flink/pull/19935


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915889897


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   > > I know but using whenComplete won't work as get() is never called on the result future
   > 
   > Sure it does. `whenComplete` is run once the future is completed (by the thread completing the future (unless its already completed at the time that `whenComplete` is called)), not when the result is requested.
   
   You're right I was too quick with my answer, I confused `whenComplete` with `thenAccept`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1181755076

   @zentol thanks for your review and for adding the visibility I forgot. Build is green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915890066


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915857842


##########
flink-core/src/main/java/org/apache/flink/api/common/io/SinkUtils.java:
##########
@@ -18,16 +18,13 @@
 
 package org.apache.flink.api.common.io;
 
-import org.apache.flink.annotation.Internal;
-
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /** Utility class for sinks. */
-@Internal
 public class SinkUtils implements Serializable {

Review Comment:
   Let's mark this as experimental, in line with the sink.
   (everything no annotated is implicitly internal ;))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915061580


##########
flink-core/src/main/java/org/apache/flink/api/common/io/SinkUtils.java:
##########
@@ -25,6 +27,7 @@
 import java.util.concurrent.TimeoutException;
 
 /** Utility class for sinks. */
+@Internal

Review Comment:
   thanks for the precision. Noted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r917800509


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -59,31 +63,45 @@ public void configure(Configuration parameters) {
 
     /** Opens a Session to Cassandra . */
     @Override
-    public void open(int taskNumber, int numTasks) {
+    protected void postOpen() {
         this.session = cluster.connect();
-        super.open(taskNumber, numTasks);
     }
 
     /** Closes all resources used by Cassandra connection. */
     @Override
-    public void close() throws IOException {
+    protected void postClose() {
         try {
-            super.close();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing session.", e);
+            if (session != null) {
+                session.close();
             }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing cluster.", e);
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
             }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable = new CompletableFuture<T>();
+        Futures.addCallback(
+                listenableFuture,
+                new FutureCallback<T>() {
+                    @Override
+                    public void onSuccess(T result) {
+                        completable.complete(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        completable.completeExceptionally(t);
+                    }
+                });

Review Comment:
   thanks for catching this. My bad ! How could I write this code that allocates with each `send` call ?!! :scream: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r917973672


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -59,31 +63,45 @@ public void configure(Configuration parameters) {
 
     /** Opens a Session to Cassandra . */
     @Override
-    public void open(int taskNumber, int numTasks) {
+    protected void postOpen() {
         this.session = cluster.connect();
-        super.open(taskNumber, numTasks);
     }
 
     /** Closes all resources used by Cassandra connection. */
     @Override
-    public void close() throws IOException {
+    protected void postClose() {
         try {
-            super.close();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing session.", e);
+            if (session != null) {
+                session.close();
             }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing cluster.", e);
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
             }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable = new CompletableFuture<T>();
+        Futures.addCallback(
+                listenableFuture,
+                new FutureCallback<T>() {
+                    @Override
+                    public void onSuccess(T result) {
+                        completable.complete(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        completable.completeExceptionally(t);
+                    }
+                });

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915754903


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   > This is overly complicated.
   > 
   > When you have a `CompletionStage` you can just apply a `whenComplete`. with the callback being re-written to a `BiConsumer<V, Throwable>`.
   
   I know but using `whenComplete` won't work as `get()` is never called on the result future. And we don't call `get()` on this future because otherwise the `writeRecord` method would become synchronous. This is why I set a listener so that the `callback` is called and the method still stays asynchronous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1179030111

   @zentol All discussions are resolved and CI passes. Is this PR ready for merging ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915880267


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   You're right I was too quick with my answer, I confused `whenComplete` with `thenAccept`. 



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   Fixed
   



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   > > I know but using whenComplete won't work as get() is never called on the result future
   > 
   > Sure it does. `whenComplete` is run once the future is completed (by the thread completing the future (unless its already completed at the time that `whenComplete` is called)), not when the result is requested.
   
   You're right I was too quick with my answer, I confused whenComplete with thenAccept.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915880905


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   Fixed
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915817226


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   >I know but using whenComplete won't work as get() is never called on the result future
   
   Sure it does. `whenComplete` is run once the future is completed (by the thread completing the future), not when the result is requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1154844395

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r917626972


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -59,31 +63,45 @@ public void configure(Configuration parameters) {
 
     /** Opens a Session to Cassandra . */
     @Override
-    public void open(int taskNumber, int numTasks) {
+    protected void postOpen() {
         this.session = cluster.connect();
-        super.open(taskNumber, numTasks);
     }
 
     /** Closes all resources used by Cassandra connection. */
     @Override
-    public void close() throws IOException {
+    protected void postClose() {
         try {
-            super.close();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing session.", e);
+            if (session != null) {
+                session.close();
             }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing cluster.", e);
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
             }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable = new CompletableFuture<T>();
+        Futures.addCallback(
+                listenableFuture,
+                new FutureCallback<T>() {
+                    @Override
+                    public void onSuccess(T result) {
+                        completable.complete(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        completable.completeExceptionally(t);
+                    }
+                });

Review Comment:
   this generates a new class on every call. Create a dedicated class that you pass the future to via the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r917800509


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -59,31 +63,45 @@ public void configure(Configuration parameters) {
 
     /** Opens a Session to Cassandra . */
     @Override
-    public void open(int taskNumber, int numTasks) {
+    protected void postOpen() {
         this.session = cluster.connect();
-        super.open(taskNumber, numTasks);
     }
 
     /** Closes all resources used by Cassandra connection. */
     @Override
-    public void close() throws IOException {
+    protected void postClose() {
         try {
-            super.close();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing session.", e);
+            if (session != null) {
+                session.close();
             }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing cluster.", e);
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
             }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable = new CompletableFuture<T>();
+        Futures.addCallback(
+                listenableFuture,
+                new FutureCallback<T>() {
+                    @Override
+                    public void onSuccess(T result) {
+                        completable.complete(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        completable.completeExceptionally(t);
+                    }
+                });

Review Comment:
   thanks for catching this. My bad ! How could I write this code that allocates with each `send` call ?!! :scream: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r917962878


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -59,31 +63,45 @@ public void configure(Configuration parameters) {
 
     /** Opens a Session to Cassandra . */
     @Override
-    public void open(int taskNumber, int numTasks) {
+    protected void postOpen() {
         this.session = cluster.connect();
-        super.open(taskNumber, numTasks);
     }
 
     /** Closes all resources used by Cassandra connection. */
     @Override
-    public void close() throws IOException {
+    protected void postClose() {
         try {
-            super.close();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing session.", e);
+            if (session != null) {
+                session.close();
             }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing cluster.", e);
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
             }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable = new CompletableFuture<T>();
+        Futures.addCallback(
+                listenableFuture,
+                new FutureCallback<T>() {
+                    @Override
+                    public void onSuccess(T result) {
+                        completable.complete(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        completable.completeExceptionally(t);
+                    }
+                });

Review Comment:
   My bad, thanks for pointing out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915889897


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   > > I know but using whenComplete won't work as get() is never called on the result future
   > 
   > Sure it does. `whenComplete` is run once the future is completed (by the thread completing the future (unless its already completed at the time that `whenComplete` is called)), not when the result is requested.
   
   You're right I was too quick with my answer, I confused `whenComplete` with `thenAccept`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1174709407

   @zentol friendly ping


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915880267


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   You're right I was too quick with my answer, I confused `whenComplete` with `thenAccept`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915085601


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -62,7 +70,10 @@ protected OutputFormatBase(int maxConcurrentRequests, Duration maxConcurrentRequ
         this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
     }
 
-    /** Opens the format and initializes the flush system. */
+    /**
+     * Open the format and initializes the flush system. Implementers must call {@code
+     * super.open()}.
+     */

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915089430


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -40,6 +47,7 @@
  *
  * @param <OUT> Type of the elements to write.
  */
+@PublicEvolving

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915104590


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -86,4 +94,57 @@ public void close() throws IOException {
             }
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable =
+                new CompletableFuture<T>() {

Review Comment:
   Agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915817226


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   >I know but using whenComplete won't work as get() is never called on the result future
   
   Sure it does. `whenComplete` is run once the future is completed (by the thread completing the future (unless its already completed at the time that `whenComplete` is called)), not when the result is requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915840596


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   ```
       @Test
       void testFutureCompletion() throws InterruptedException {
           CompletableFuture<String> future = new CompletableFuture<>();
   
           future.whenComplete((s, ignored) -> System.out.println(s));
   
           final Thread thread = new Thread(() -> future.complete("sup"));
           thread.start();
           thread.join();
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915099337


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -120,7 +145,10 @@ private void checkAsyncErrors() throws IOException {
         }
     }
 
-    /** Closes the format waiting for pending writes and reports errors. */
+    /**
+     * Close the format waiting for pending writes and reports errors. Implementers must call {@code
+     * super.close()}.

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1177448930

   rebased on master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r918708203


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -59,31 +65,52 @@ public void configure(Configuration parameters) {
 
     /** Opens a Session to Cassandra . */
     @Override
-    public void open(int taskNumber, int numTasks) {
+    protected void postOpen() {
         this.session = cluster.connect();
-        super.open(taskNumber, numTasks);
     }
 
     /** Closes all resources used by Cassandra connection. */
     @Override
-    public void close() throws IOException {
+    protected void postClose() {
         try {
-            super.close();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing session.", e);
+            if (session != null) {
+                session.close();
             }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing cluster.", e);
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
             }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
+        }
+    }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable = new CompletableFuture<T>();
+        Futures.addCallback(listenableFuture, new CompletableFutureCallback<>(completable));
+        return completable;
+    }
+
+    private static class CompletableFutureCallback<T> implements FutureCallback<T> {
+
+        CompletableFuture<T> completableFuture;

Review Comment:
   ```suggestion
           private final CompletableFuture<T> completableFuture;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1152536310

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "6e6751cbf94c00f35b023e2d945aaf76714ea3e6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "6e6751cbf94c00f35b023e2d945aaf76714ea3e6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 6e6751cbf94c00f35b023e2d945aaf76714ea3e6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1167372186

   @zentol do you have time to review this PR or do you want me to ping another reviewer ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r913705135


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   This is overly complicated.
   
   When you have a `CompletionStage` you can just apply a `whenComplete`. with the callback being re-written to a `BiConsumer<V, Throwable>`.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/SinkUtils.java:
##########
@@ -25,6 +27,7 @@
 import java.util.concurrent.TimeoutException;
 
 /** Utility class for sinks. */
+@Internal

Review Comment:
   Utils used by connectors shouldn't be `@Internal`, because it conflicts with the plans to externalize connectors.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -62,7 +70,10 @@ protected OutputFormatBase(int maxConcurrentRequests, Duration maxConcurrentRequ
         this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
     }
 
-    /** Opens the format and initializes the flush system. */
+    /**
+     * Open the format and initializes the flush system. Implementers must call {@code
+     * super.open()}.
+     */

Review Comment:
   You could enforce this by making open() final and adding another initialization hook for sub-classes that we call at the end of `open()`.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -40,6 +47,7 @@
  *
  * @param <OUT> Type of the elements to write.
  */
+@PublicEvolving

Review Comment:
   Lets keep this experimental for now.



##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -86,4 +94,57 @@ public void close() throws IOException {
             }
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable =
+                new CompletableFuture<T>() {

Review Comment:
   This whole implementation is a bit strange.
   
   a) we dont need to worry about cancel because the base works against the CompletationStage interface which doesn't support cancellation.
   b) get() references the listenableFuture, but when the listenableFuture completes we complete the future. This shouldn't work in both directions.
   
   Just create a plain CompFuture, and keep the behavior in the callback.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -120,7 +145,10 @@ private void checkAsyncErrors() throws IOException {
         }
     }
 
-    /** Closes the format waiting for pending writes and reports errors. */
+    /**
+     * Close the format waiting for pending writes and reports errors. Implementers must call {@code
+     * super.close()}.

Review Comment:
   Same comment as for `open()`.



##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   This will also remove the need for the odd `send()` javadoc that `toCompletableFuture()` must be properly implemented. (Which is weird in any case since , you know, every method of an interface should be properly implemented.) Realistically this will just be a CompletableFuture anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1178703675

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r917800509


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -59,31 +63,45 @@ public void configure(Configuration parameters) {
 
     /** Opens a Session to Cassandra . */
     @Override
-    public void open(int taskNumber, int numTasks) {
+    protected void postOpen() {
         this.session = cluster.connect();
-        super.open(taskNumber, numTasks);
     }
 
     /** Closes all resources used by Cassandra connection. */
     @Override
-    public void close() throws IOException {
+    protected void postClose() {
         try {
-            super.close();
-        } finally {
-            try {
-                if (session != null) {
-                    session.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing session.", e);
+            if (session != null) {
+                session.close();
             }
-            try {
-                if (cluster != null) {
-                    cluster.close();
-                }
-            } catch (Exception e) {
-                LOG.error("Error while closing cluster.", e);
+        } catch (Exception e) {
+            LOG.error("Error while closing session.", e);
+        }
+        try {
+            if (cluster != null) {
+                cluster.close();
             }
+        } catch (Exception e) {
+            LOG.error("Error while closing cluster.", e);
         }
     }
+
+    protected static <T> CompletableFuture<T> listenableFutureToCompletableFuture(
+            final ListenableFuture<T> listenableFuture) {
+        CompletableFuture<T> completable = new CompletableFuture<T>();
+        Futures.addCallback(
+                listenableFuture,
+                new FutureCallback<T>() {
+                    @Override
+                    public void onSuccess(T result) {
+                        completable.complete(result);
+                    }
+
+                    @Override
+                    public void onFailure(Throwable t) {
+                        completable.completeExceptionally(t);
+                    }
+                });

Review Comment:
   thanks for catching this. My bad ! How could I write this code that allocates with each send call ?!! :scream: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1156532710

   Passes now, it was a flaky test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1154920901

   I don't understand the `test_ci finegrained_resource_management` failure it seems unrelated to this PR. Anyway I launched the corresponding command line locally removing the alibaba mentions and it passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915755957


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   > This will also remove the need for the odd `send()` javadoc that `toCompletableFuture()` must be properly implemented. (Which is weird in any case since , you know, every method of an interface should be properly implemented.) Realistically this will just be a CompletableFuture anyway.
   
   I agree, the javadoc precision is useless because redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] zentol commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
zentol commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915817226


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   >I know but using whenComplete won't work as get() is never called on the result future
   
   Sure it does. `whenComplete` is run once the future is completed, not when the result is requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915754903


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -97,21 +108,35 @@ private void tryAcquire(int permits) throws IOException {
         }
     }
 
+    /**
+     * Asynchronously write a record and deal with {@link OutputFormatBase#maxConcurrentRequests}.
+     * To specify how a record is written, please override the {@link OutputFormatBase#send(Object)}
+     * method.
+     */
     @Override
-    public void writeRecord(OUT record) throws IOException {
+    public final void writeRecord(OUT record) throws IOException {
         checkAsyncErrors();
         tryAcquire(1);
-        final ListenableFuture<V> result;
+        final CompletionStage<V> result;
         try {
             result = send(record);
         } catch (Throwable e) {
             semaphore.release();
             throw e;
         }
-        Futures.addCallback(result, callback);
+        Futures.addCallback(
+                completableFutureToListenableFuture(result.toCompletableFuture()),
+                callback,
+                Executors.directExecutor());

Review Comment:
   > This is overly complicated.
   > 
   > When you have a `CompletionStage` you can just apply a `whenComplete`. with the callback being re-written to a `BiConsumer<V, Throwable>`.
   
   I know but using `whenComplete` won't work as `get()` is never called on the result future. And we don't call `get()` on this future because otherwise the `writeRecord` method would become synchronous. This is why I set a listener so that the `callback` is called and the method still stays asynchronous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915883185


##########
flink-core/src/main/java/org/apache/flink/api/common/io/SinkUtils.java:
##########
@@ -18,16 +18,13 @@
 
 package org.apache.flink.api.common.io;
 
-import org.apache.flink.annotation.Internal;
-
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 /** Utility class for sinks. */
-@Internal
 public class SinkUtils implements Serializable {

Review Comment:
   :+1: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1177697600

   @flinkbot run azure
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on a diff in pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on code in PR #19935:
URL: https://github.com/apache/flink/pull/19935#discussion_r915085359


##########
flink-core/src/main/java/org/apache/flink/api/common/io/OutputFormatBase.java:
##########
@@ -62,7 +70,10 @@ protected OutputFormatBase(int maxConcurrentRequests, Duration maxConcurrentRequ
         this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
     }
 
-    /** Opens the format and initializes the flush system. */
+    /**
+     * Open the format and initializes the flush system. Implementers must call {@code
+     * super.open()}.
+     */

Review Comment:
   thanks for the suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] echauchot commented on pull request #19935: [FLINK-27884] Move OutputFormatBase to flink-core to offer flush mechanism to all output formats

Posted by GitBox <gi...@apache.org>.
echauchot commented on PR #19935:
URL: https://github.com/apache/flink/pull/19935#issuecomment-1156535246

   > I don't understand the `test_ci finegrained_resource_management` failure it seems unrelated to this PR. Anyway I launched the corresponding command line locally removing the alibaba mentions and it passes.
   
   Passes now, it was a flaky test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org