You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/09/05 18:31:48 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

lidavidm commented on a change in pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#discussion_r483976063



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
##########
@@ -158,29 +167,52 @@ public FlightDescriptor getDescriptor() {
   /**
    * Closes the stream (freeing any existing resources).
    *
-   * <p>If the stream isn't complete and is cancellable, this method will cancel the stream first.</p>
+   * <p>If the stream isn't complete and is cancellable, this method will cancel and drain the stream first.
    */
   public void close() throws Exception {
     final List<AutoCloseable> closeables = new ArrayList<>();
-    // cancellation can throw, but we still want to clean up resources, so make it an AutoCloseable too
-    closeables.add(() -> {
-      if (!completed.isDone() && cancellable != null) {
-        cancel("Stream closed before end.", /* no exception to report */ null);
+    Throwable suppressor = null;
+    if (cancellable != null) {
+      // Client-side stream. Cancel the call, to help ensure gRPC doesn't deliver a message after close() ends.
+      // On the server side, we can't rely on draining the stream , because this gRPC bug means the completion callback
+      // may never run https://github.com/grpc/grpc-java/issues/5882
+      try {
+        synchronized (cancellable) {
+          if (!cancelled.isDone()) {
+            // Only cancel if the call is not done on the gRPC side
+            cancellable.cancel("Stream closed before end", /* no exception to report */null);
+          }
+        }
+        // Drain the stream without the lock (as next() implicitly needs the lock)
+        while (next()) { }
+      } catch (FlightRuntimeException e) {
+        suppressor = e;
       }
-    });
-    if (fulfilledRoot != null) {
-      closeables.add(fulfilledRoot);
     }
-    closeables.add(applicationMetadata);
-    closeables.addAll(queue);
-    if (dictionaries != null) {
-      dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector()));
+    // Perform these operations under a lock. This way the observer can't enqueue new messages while we're in the

Review comment:
       Yes - it's possible because the observer is run in a separate thread (by gRPC) than the application, so the observer can trigger when the application is in the middle of close(). On the client side, draining the stream (as done here) prevents this case, but on the server side, we can't rely on this, unfortunately, hence the lock to protect things.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org