You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/08/19 18:50:29 UTC

[GitHub] [arrow] lidavidm opened a new pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

lidavidm opened a new pull request #8010:
URL: https://github.com/apache/arrow/pull/8010


   - Fix a bug where writes would hang forever for DoExchange
   - Make FlightRuntimeException#toString easier to read
   - Have DoPut reliably clean up the FlightStream when the call ends (instead of potentially closing it after gRPC thinks the call ends - this will be important for [ARROW-9586](https://issues.apache.org/jira/browse/ARROW-9586))
   - Don't automatically send a cancellation in FlightStream if an application doesn't read the entire stream. (This is another source of spurious exceptions on the server side). In particular, a server may want to cancel a call and not drain the stream (e.g. if it rejects a client upload).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#issuecomment-676605445


   https://issues.apache.org/jira/browse/ARROW-9587


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#issuecomment-692023570


   @emkornfield I've rebased this and it should be good once tests pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#issuecomment-685908702


   @rymurr any thoughts here? This is my attempt at trying to stomp out various memory leak/usage issues in Flight, and a precursor to a way to measure per-RPC Arrow allocations, which is useful in deployments of Flight (e.g. to pinpoint problematic queries & identify if we've accidentally leaked allocations).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
lidavidm commented on pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#issuecomment-683755698


   Some more fixes are needed here...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm closed pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
lidavidm closed pull request #8010:
URL: https://github.com/apache/arrow/pull/8010


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] emkornfield commented on pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
emkornfield commented on pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#issuecomment-691812239


   @lidavidm do you want another review or are you comfortable merging this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#discussion_r483976105



##########
File path: java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
##########
@@ -41,7 +41,7 @@
 abstract class BaseAllocator extends Accountant implements BufferAllocator {
 
   public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
-  public static final int DEBUG_LOG_LENGTH = 6;
+  public static final int DEBUG_LOG_LENGTH = 30;

Review comment:
       Good catch, this was leftover from debugging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] rymurr commented on a change in pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
rymurr commented on a change in pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#discussion_r483935015



##########
File path: java/memory/memory-core/src/main/java/org/apache/arrow/memory/BaseAllocator.java
##########
@@ -41,7 +41,7 @@
 abstract class BaseAllocator extends Accountant implements BufferAllocator {
 
   public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator";
-  public static final int DEBUG_LOG_LENGTH = 6;
+  public static final int DEBUG_LOG_LENGTH = 30;

Review comment:
       was this a permanent change or accidentally left in?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
##########
@@ -158,29 +167,52 @@ public FlightDescriptor getDescriptor() {
   /**
    * Closes the stream (freeing any existing resources).
    *
-   * <p>If the stream isn't complete and is cancellable, this method will cancel the stream first.</p>
+   * <p>If the stream isn't complete and is cancellable, this method will cancel and drain the stream first.
    */
   public void close() throws Exception {
     final List<AutoCloseable> closeables = new ArrayList<>();
-    // cancellation can throw, but we still want to clean up resources, so make it an AutoCloseable too
-    closeables.add(() -> {
-      if (!completed.isDone() && cancellable != null) {
-        cancel("Stream closed before end.", /* no exception to report */ null);
+    Throwable suppressor = null;
+    if (cancellable != null) {
+      // Client-side stream. Cancel the call, to help ensure gRPC doesn't deliver a message after close() ends.
+      // On the server side, we can't rely on draining the stream , because this gRPC bug means the completion callback
+      // may never run https://github.com/grpc/grpc-java/issues/5882
+      try {
+        synchronized (cancellable) {
+          if (!cancelled.isDone()) {
+            // Only cancel if the call is not done on the gRPC side
+            cancellable.cancel("Stream closed before end", /* no exception to report */null);
+          }
+        }
+        // Drain the stream without the lock (as next() implicitly needs the lock)
+        while (next()) { }
+      } catch (FlightRuntimeException e) {
+        suppressor = e;
       }
-    });
-    if (fulfilledRoot != null) {
-      closeables.add(fulfilledRoot);
     }
-    closeables.add(applicationMetadata);
-    closeables.addAll(queue);
-    if (dictionaries != null) {
-      dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector()));
+    // Perform these operations under a lock. This way the observer can't enqueue new messages while we're in the

Review comment:
       Maybe its just the wording of the comments but this seems like its theoretically possible for an observer to put a message between 187 and the lock gets aquired in 195. Is that true? The chance is prob pretty small and not easy to code for. Just wanted a bit of clarification




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8010: ARROW-9587: [FlightRPC][Java] clean up FlightStream/DoPut

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8010:
URL: https://github.com/apache/arrow/pull/8010#discussion_r483976063



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
##########
@@ -158,29 +167,52 @@ public FlightDescriptor getDescriptor() {
   /**
    * Closes the stream (freeing any existing resources).
    *
-   * <p>If the stream isn't complete and is cancellable, this method will cancel the stream first.</p>
+   * <p>If the stream isn't complete and is cancellable, this method will cancel and drain the stream first.
    */
   public void close() throws Exception {
     final List<AutoCloseable> closeables = new ArrayList<>();
-    // cancellation can throw, but we still want to clean up resources, so make it an AutoCloseable too
-    closeables.add(() -> {
-      if (!completed.isDone() && cancellable != null) {
-        cancel("Stream closed before end.", /* no exception to report */ null);
+    Throwable suppressor = null;
+    if (cancellable != null) {
+      // Client-side stream. Cancel the call, to help ensure gRPC doesn't deliver a message after close() ends.
+      // On the server side, we can't rely on draining the stream , because this gRPC bug means the completion callback
+      // may never run https://github.com/grpc/grpc-java/issues/5882
+      try {
+        synchronized (cancellable) {
+          if (!cancelled.isDone()) {
+            // Only cancel if the call is not done on the gRPC side
+            cancellable.cancel("Stream closed before end", /* no exception to report */null);
+          }
+        }
+        // Drain the stream without the lock (as next() implicitly needs the lock)
+        while (next()) { }
+      } catch (FlightRuntimeException e) {
+        suppressor = e;
       }
-    });
-    if (fulfilledRoot != null) {
-      closeables.add(fulfilledRoot);
     }
-    closeables.add(applicationMetadata);
-    closeables.addAll(queue);
-    if (dictionaries != null) {
-      dictionaries.getDictionaryIds().forEach(id -> closeables.add(dictionaries.lookup(id).getVector()));
+    // Perform these operations under a lock. This way the observer can't enqueue new messages while we're in the

Review comment:
       Yes - it's possible because the observer is run in a separate thread (by gRPC) than the application, so the observer can trigger when the application is in the middle of close(). On the client side, draining the stream (as done here) prevents this case, but on the server side, we can't rely on this, unfortunately, hence the lock to protect things.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org