You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/09/05 10:30:05 UTC

[GitHub] [arrow] rymurr commented on a change in pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

rymurr commented on a change in pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#discussion_r483935015



##########
File path: java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
##########
@@ -41,7 +41,7 @@
 abstract class BaseAllocator extends Accountant implements BufferAllocator {
 
   public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
-  public static final int DEBUG_LOG_LENGTH = 6;
+  public static final int DEBUG_LOG_LENGTH = 30;

Review comment:
       was this a permanent change or accidentally left in?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
##########
@@ -158,29 +167,52 @@ public FlightDescriptor getDescriptor() {
   /**
    * Closes the stream (freeing any existing resources).
    *
-   * <p>If the stream isn't complete and is cancellable, this method will cancel the stream first.</p>
+   * <p>If the stream isn't complete and is cancellable, this method will cancel and drain the stream first.
    */
   public void close() throws Exception {
     final List<AutoCloseable> closeables = new ArrayList<>();
-    // cancellation can throw, but we still want to clean up resources, so make it an AutoCloseable too
-    closeables.add(() -> {
-      if (!completed.isDone() && cancellable != null) {
-        cancel("Stream closed before end.", /* no exception to report */ null);
+    Throwable suppressor = null;
+    if (cancellable != null) {
+      // Client-side stream. Cancel the call, to help ensure gRPC doesn't deliver a message after close() ends.
+      // On the server side, we can't rely on draining the stream , because this gRPC bug means the completion callback
+      // may never run https://github.com/grpc/grpc-java/issues/5882
+      try {
+        synchronized (cancellable) {
+          if (!cancelled.isDone()) {
+            // Only cancel if the call is not done on the gRPC side
+            cancellable.cancel("Stream closed before end", /* no exception to report */null);
+          }
+        }
+        // Drain the stream without the lock (as next() implicitly needs the lock)
+        while (next()) { }
+      } catch (FlightRuntimeException e) {
+        suppressor = e;
       }
-    });
-    if (fulfilledRoot != null) {
-      closeables.add(fulfilledRoot);
     }
-    closeables.add(applicationMetadata);
-    closeables.addAll(queue);
-    if (dictionaries != null) {
-      dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector()));
+    // Perform these operations under a lock. This way the observer can't enqueue new messages while we're in the

Review comment:
       Maybe its just the wording of the comments but this seems like its theoretically possible for an observer to put a message between 187 and the lock gets aquired in 195. Is that true? The chance is prob pretty small and not easy to code for. Just wanted a bit of clarification




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org