You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/10/15 23:53:08 UTC

[GitHub] [arrow] jduo opened a new pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

jduo opened a new pull request #8476:
URL: https://github.com/apache/arrow/pull/8476


   Expose onIsReady() callback on OutboundStreamListener
   
   - Add callback to run when the client is ready to receive new data during getStream.
   - This removes the need for FlightProducers to implement polling code on isReady.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jduo commented on pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
jduo commented on pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#issuecomment-710156896


   > LGTM, one minor typo - and is it worth adding a test? (Though this is a very thin wrapper around gRPC.)
   
   Thanks. I'm looking to repurpose some existing tests that loop on isReady.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#discussion_r505948417



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
##########
@@ -36,6 +36,18 @@
    */
   boolean isReady();
 
+  /**
+   * Set a callback for when the client cancels is ready for new calls to putNext(), i.e. {@link #isReady()} ()}

Review comment:
       ```suggestion
      * Set a callback for when the listener is ready for new calls to putNext(), i.e. {@link #isReady()} ()}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#discussion_r507708211



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper interface to dynamically handle backpressure when implementing FlightProducers.
+ */
+public interface BackpressureStrategy {
+  /**
+   * The state of the client after a call to waitForListener.
+   */
+  enum WaitResult {
+    /**
+     * Listener is ready.
+     */
+    READY,
+
+    /**
+     * Listener was cancelled by the client.
+     */
+    CANCELLED,
+
+    /**
+     * Timed out waiting for the listener to change state.
+     */
+    TIMEOUT
+  }
+
+  /**
+   * Set up operations to work against the given listener.
+   *
+   * This must be called exactly once and before any calls to {@link #waitForListener(long)} and
+   * {@link OutboundStreamListener#start(VectorSchemaRoot)}
+   * @param listener The listener this strategy applies to.
+   */
+  void register(FlightProducer.ServerStreamListener listener);
+
+  /**
+   * Waits for the listener to be ready or cancelled up to the given timeout.
+   *
+   * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
+   * @return The result of the wait.
+   */
+  WaitResult waitForListener(long timeout);
+
+  /**
+   * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
+   */
+  class CallbackBackpressureStrategy implements BackpressureStrategy {
+    private final Object lock = new Object();
+    private FlightProducer.ServerStreamListener listener;
+
+    @Override
+    public void register(FlightProducer.ServerStreamListener listener) {
+      this.listener = listener;
+      listener.setOnReadyHandler(this::onReadyOrCancel);
+      listener.setOnCancelHandler(this::onReadyOrCancel);
+    }
+
+    @Override
+    public WaitResult waitForListener(long timeout) {
+      Preconditions.checkNotNull(listener);
+      final long startTime = System.currentTimeMillis();
+      synchronized (lock) {
+        while (!listener.isReady() && !listener.isCancelled()) {
+          try {
+            lock.wait(timeout);
+            if (System.currentTimeMillis() > startTime + timeout) {
+              return WaitResult.TIMEOUT;
+            }
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            return WaitResult.CANCELLED;
+          }
+        }
+
+        if (listener.isReady()) {
+          return WaitResult.READY;
+        } else if (listener.isCancelled()) {
+          return WaitResult.CANCELLED;
+        } else if (System.currentTimeMillis() > startTime + timeout) {
+          return WaitResult.TIMEOUT;
+        }
+        throw new RuntimeException("Invalid state when waiting for listener.");

Review comment:
       Ah yes, you're right. Slight nit then, maybe IllegalStateException is more appropriate here?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -88,12 +87,15 @@ public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightIn
   public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserverSimple) {
     final ServerCallStreamObserver<ArrowMessage> responseObserver =
         (ServerCallStreamObserver<ArrowMessage>) responseObserverSimple;
+
     final GetListener listener = new GetListener(responseObserver, this::handleExceptionWithMiddleware);
-    try {
-      producer.getStream(makeContext(responseObserver), new Ticket(ticket), listener);
-    } catch (Exception ex) {
-      listener.error(ex);
-    }
+    executors.submit(() -> {

Review comment:
       Sorry, yes, I meant within GetListener, but I suppose setting the handler unconditionally doesn't matter to gRPC - it'll never get called in the blocking case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#discussion_r506941155



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper interface to dynamically handle backpressure when implementing FlightProducers.
+ */
+public interface BackpressureStrategy {
+  /**
+   * The state of the client after a call to waitForListener.
+   */
+  enum WaitResult {
+    /**
+     * Listener is ready.
+     */
+    READY,
+
+    /**
+     * Listener was cancelled by the client.
+     */
+    CANCELLED,
+
+    /**
+     * Timed out waiting for the listener to change state.
+     */
+    TIMEOUT
+  }
+
+  /**
+   * Set up operations to work against the given listener.
+   *
+   * This must be called exactly once and before any calls to {@link #waitForListener(long)} and
+   * {@link OutboundStreamListener#start(VectorSchemaRoot)}
+   * @param listener The listener this strategy applies to.
+   */
+  void register(FlightProducer.ServerStreamListener listener);
+
+  /**
+   * Waits for the listener to be ready or cancelled up to the given timeout.
+   *
+   * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
+   * @return The result of the wait.
+   */
+  WaitResult waitForListener(long timeout);
+
+  /**
+   * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
+   */
+  class CallbackBackpressureStrategy implements BackpressureStrategy {
+    private final Object lock = new Object();
+    private FlightProducer.ServerStreamListener listener;
+
+    @Override
+    public void register(FlightProducer.ServerStreamListener listener) {
+      this.listener = listener;
+      listener.setOnReadyHandler(this::onReadyOrCancel);
+      listener.setOnCancelHandler(this::onReadyOrCancel);
+    }
+
+    @Override
+    public WaitResult waitForListener(long timeout) {
+      Preconditions.checkNotNull(listener);
+      final long startTime = System.currentTimeMillis();
+      synchronized (lock) {
+        while (!listener.isReady() && !listener.isCancelled()) {
+          try {
+            lock.wait(timeout);
+            if (System.currentTimeMillis() > startTime + timeout) {
+              return WaitResult.TIMEOUT;
+            }
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            return WaitResult.CANCELLED;
+          }
+        }
+
+        if (listener.isReady()) {
+          return WaitResult.READY;
+        } else if (listener.isCancelled()) {
+          return WaitResult.CANCELLED;
+        } else if (System.currentTimeMillis() > startTime + timeout) {
+          return WaitResult.TIMEOUT;
+        }
+        throw new RuntimeException("Invalid state when waiting for listener.");

Review comment:
       According to the gRPC docs, this isn't an invalid state - gRPC can wake you up spuriously.
   
   https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/CallStreamObserver.html#setOnReadyHandler-java.lang.Runnable-

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -147,11 +149,23 @@ private void onCancel() {
       }
     }
 
+    private void onReady() {
+      logger.debug("Stream is ready for new messages.");

Review comment:
       This is going to spam logs a lot - best not to add it unless it's really valuable.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -88,12 +87,15 @@ public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightIn
   public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserverSimple) {
     final ServerCallStreamObserver<ArrowMessage> responseObserver =
         (ServerCallStreamObserver<ArrowMessage>) responseObserverSimple;
+
     final GetListener listener = new GetListener(responseObserver, this::handleExceptionWithMiddleware);
-    try {
-      producer.getStream(makeContext(responseObserver), new Ticket(ticket), listener);
-    } catch (Exception ex) {
-      listener.error(ex);
-    }
+    executors.submit(() -> {

Review comment:
       I think you'll have to change setOnReadyHandler below to call responseObserver.setOnReadyHandler itself

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper interface to dynamically handle backpressure when implementing FlightProducers.
+ */
+public interface BackpressureStrategy {
+  /**
+   * The state of the client after a call to waitForListener.
+   */
+  enum WaitResult {
+    /**
+     * Listener is ready.
+     */
+    READY,
+
+    /**
+     * Listener was cancelled by the client.
+     */
+    CANCELLED,
+
+    /**
+     * Timed out waiting for the listener to change state.
+     */
+    TIMEOUT
+  }
+
+  /**
+   * Set up operations to work against the given listener.
+   *
+   * This must be called exactly once and before any calls to {@link #waitForListener(long)} and
+   * {@link OutboundStreamListener#start(VectorSchemaRoot)}
+   * @param listener The listener this strategy applies to.
+   */
+  void register(FlightProducer.ServerStreamListener listener);
+
+  /**
+   * Waits for the listener to be ready or cancelled up to the given timeout.
+   *
+   * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
+   * @return The result of the wait.
+   */
+  WaitResult waitForListener(long timeout);
+
+  /**
+   * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
+   */
+  class CallbackBackpressureStrategy implements BackpressureStrategy {
+    private final Object lock = new Object();
+    private FlightProducer.ServerStreamListener listener;
+
+    @Override
+    public void register(FlightProducer.ServerStreamListener listener) {
+      this.listener = listener;
+      listener.setOnReadyHandler(this::onReadyOrCancel);
+      listener.setOnCancelHandler(this::onReadyOrCancel);
+    }
+
+    @Override
+    public WaitResult waitForListener(long timeout) {
+      Preconditions.checkNotNull(listener);
+      final long startTime = System.currentTimeMillis();
+      synchronized (lock) {
+        while (!listener.isReady() && !listener.isCancelled()) {
+          try {
+            lock.wait(timeout);
+            if (System.currentTimeMillis() > startTime + timeout) {
+              return WaitResult.TIMEOUT;
+            }
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            return WaitResult.CANCELLED;
+          }
+        }
+
+        if (listener.isReady()) {
+          return WaitResult.READY;
+        } else if (listener.isCancelled()) {
+          return WaitResult.CANCELLED;
+        } else if (System.currentTimeMillis() > startTime + timeout) {
+          return WaitResult.TIMEOUT;
+        }
+        throw new RuntimeException("Invalid state when waiting for listener.");

Review comment:
       I'd say we can probably just recurse into waitForListener or wrap the method body in a loop.

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBackPressure.java
##########
@@ -158,4 +189,64 @@ private static void consume(FlightStream stream, int batches) {
       batches--;
     }
   }
+
+  private interface SleepTimeRecordingBackpressureStrategy extends BackpressureStrategy {
+    /**
+     * Returns the total time spent waiting on the listener to be ready.
+     * @return the total time spent waiting on the listener to be ready.
+     */
+    long getSleepTime();
+  }
+
+  /**
+   * Implementation of a backpressure strategy that polls on isReady and records amount of time spent in Thread.sleep().
+   */
+  private static class PollingBackpressureStrategy implements SleepTimeRecordingBackpressureStrategy {
+    private final AtomicLong sleepTime = new AtomicLong(0);
+    private FlightProducer.ServerStreamListener listener;
+
+    @Override
+    public long getSleepTime() {
+      return sleepTime.get();
+    }
+
+    @Override
+    public void register(FlightProducer.ServerStreamListener listener) {
+      this.listener = listener;
+    }
+
+    @Override
+    public WaitResult waitForListener(long timeout) {
+      while (!listener.isReady()) {
+        try {
+          Thread.sleep(1);
+          sleepTime.addAndGet(1L);
+        } catch (InterruptedException ignore) {
+        }
+      }
+      return WaitResult.READY;
+    }
+  }
+
+  /**
+   * Implementation of a backpressure strategy that polls on uses callbacks to detect changes in client readiness state

Review comment:
       ```suggestion
      * Implementation of a backpressure strategy that uses callbacks to detect changes in client readiness state
   ```

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -88,12 +87,15 @@ public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightIn
   public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserverSimple) {
     final ServerCallStreamObserver<ArrowMessage> responseObserver =
         (ServerCallStreamObserver<ArrowMessage>) responseObserverSimple;
+
     final GetListener listener = new GetListener(responseObserver, this::handleExceptionWithMiddleware);
-    try {
-      producer.getStream(makeContext(responseObserver), new Ticket(ticket), listener);
-    } catch (Exception ex) {
-      listener.error(ex);
-    }
+    executors.submit(() -> {

Review comment:
       I'd rather we not use the executor here unconditionally. A fully asynchronous implementation would set the callbacks and not need to execute in a separate thread at all; an implementation that wants to appear blocking but use the callbacks (as with BackpressureStrategy) should manage its own thread pool. We've had production issues when it turned out that the internal executor in FlightService was queueing requests without bound, defeating the rate limit/concurrency controls that had been placed on the gRPC executor.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
##########
@@ -36,6 +36,18 @@
    */
   boolean isReady();
 
+  /**
+   * Set a callback for when the listener is ready for new calls to putNext(), i.e. {@link #isReady()} ()}

Review comment:
       The formatting here and below seems off.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jduo commented on a change in pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
jduo commented on a change in pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#discussion_r506973728



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper interface to dynamically handle backpressure when implementing FlightProducers.
+ */
+public interface BackpressureStrategy {
+  /**
+   * The state of the client after a call to waitForListener.
+   */
+  enum WaitResult {
+    /**
+     * Listener is ready.
+     */
+    READY,
+
+    /**
+     * Listener was cancelled by the client.
+     */
+    CANCELLED,
+
+    /**
+     * Timed out waiting for the listener to change state.
+     */
+    TIMEOUT
+  }
+
+  /**
+   * Set up operations to work against the given listener.
+   *
+   * This must be called exactly once and before any calls to {@link #waitForListener(long)} and
+   * {@link OutboundStreamListener#start(VectorSchemaRoot)}
+   * @param listener The listener this strategy applies to.
+   */
+  void register(FlightProducer.ServerStreamListener listener);
+
+  /**
+   * Waits for the listener to be ready or cancelled up to the given timeout.
+   *
+   * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
+   * @return The result of the wait.
+   */
+  WaitResult waitForListener(long timeout);
+
+  /**
+   * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
+   */
+  class CallbackBackpressureStrategy implements BackpressureStrategy {
+    private final Object lock = new Object();
+    private FlightProducer.ServerStreamListener listener;
+
+    @Override
+    public void register(FlightProducer.ServerStreamListener listener) {
+      this.listener = listener;
+      listener.setOnReadyHandler(this::onReadyOrCancel);
+      listener.setOnCancelHandler(this::onReadyOrCancel);
+    }
+
+    @Override
+    public WaitResult waitForListener(long timeout) {
+      Preconditions.checkNotNull(listener);
+      final long startTime = System.currentTimeMillis();
+      synchronized (lock) {
+        while (!listener.isReady() && !listener.isCancelled()) {
+          try {
+            lock.wait(timeout);
+            if (System.currentTimeMillis() > startTime + timeout) {
+              return WaitResult.TIMEOUT;
+            }
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            return WaitResult.CANCELLED;
+          }
+        }
+
+        if (listener.isReady()) {
+          return WaitResult.READY;
+        } else if (listener.isCancelled()) {
+          return WaitResult.CANCELLED;
+        } else if (System.currentTimeMillis() > startTime + timeout) {
+          return WaitResult.TIMEOUT;
+        }
+        throw new RuntimeException("Invalid state when waiting for listener.");

Review comment:
       I was looking at this and I think this gets covered by the upper loop -- if we ran the onReadyCallback, we'd wake out of the Object.wait() call, but then we would check if it is really ready as part of the loop condition, and if not, go back into the wait and wait for the callback again. (Pretty much the same as a standard use of wait/notify() really).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm closed pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
lidavidm closed pull request #8476:
URL: https://github.com/apache/arrow/pull/8476


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jduo commented on pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
jduo commented on pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#issuecomment-709648406


   FYI @lidavidm 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jduo commented on pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
jduo commented on pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#issuecomment-710777211


   > > LGTM, one minor typo - and is it worth adding a test? (Though this is a very thin wrapper around gRPC.)
   > 
   > Thanks. I'm looking to repurpose some existing tests that loop on isReady.
   
   - I've created a BackpressureStrategy interface and a simple callback-based implementation.
   - I've used this to repurpose the tests in TestBackPressure to test both polling (existing) and callback-based solutions.
   - New tests in TestBackpressure are marked Ignored since the tests they were based on were also ignored due to flakiness. I ran them locally though and they passed with similar performance.
   - I needed to change getStream() to run in a background thread. Currently it gets run in the onHalfClosed callback handler in gRPC. In the test implementations, getStream() is a synchronous implementation which prevented the new callback handler from running. In practice, I'd expect real world getStream() implementations to be written asynchronously though, so perhaps we should change our test implementations of getStream() to be asynchronous instead.
   
   This line of code eventually calls doGet, which must complete before onReady is called (thus, if doGet blocks until a notification from onReady, it never completes. But polling on isReady does): https://github.com/grpc/grpc-java/blob/0b6f29371bd96614fdbdcd3638d4bb6312258da3/stub/src/main/java/io/grpc/stub/ServerCalls.java#L182


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] lidavidm commented on a change in pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
lidavidm commented on a change in pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#discussion_r507710515



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helper interface to dynamically handle backpressure when implementing FlightProducers.
+ */
+public interface BackpressureStrategy {
+  /**
+   * The state of the client after a call to waitForListener.
+   */
+  enum WaitResult {
+    /**
+     * Listener is ready.
+     */
+    READY,
+
+    /**
+     * Listener was cancelled by the client.
+     */
+    CANCELLED,
+
+    /**
+     * Timed out waiting for the listener to change state.
+     */
+    TIMEOUT
+  }
+
+  /**
+   * Set up operations to work against the given listener.
+   *
+   * This must be called exactly once and before any calls to {@link #waitForListener(long)} and
+   * {@link OutboundStreamListener#start(VectorSchemaRoot)}
+   * @param listener The listener this strategy applies to.
+   */
+  void register(FlightProducer.ServerStreamListener listener);
+
+  /**
+   * Waits for the listener to be ready or cancelled up to the given timeout.
+   *
+   * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
+   * @return The result of the wait.
+   */
+  WaitResult waitForListener(long timeout);
+
+  /**
+   * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
+   */
+  class CallbackBackpressureStrategy implements BackpressureStrategy {
+    private final Object lock = new Object();
+    private FlightProducer.ServerStreamListener listener;
+
+    @Override
+    public void register(FlightProducer.ServerStreamListener listener) {
+      this.listener = listener;
+      listener.setOnReadyHandler(this::onReadyOrCancel);
+      listener.setOnCancelHandler(this::onReadyOrCancel);
+    }
+
+    @Override
+    public WaitResult waitForListener(long timeout) {
+      Preconditions.checkNotNull(listener);
+      final long startTime = System.currentTimeMillis();
+      synchronized (lock) {
+        while (!listener.isReady() && !listener.isCancelled()) {
+          try {
+            lock.wait(timeout);
+            if (System.currentTimeMillis() > startTime + timeout) {
+              return WaitResult.TIMEOUT;
+            }
+          } catch (InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            return WaitResult.CANCELLED;
+          }
+        }
+
+        if (listener.isReady()) {
+          return WaitResult.READY;
+        } else if (listener.isCancelled()) {
+          return WaitResult.CANCELLED;
+        } else if (System.currentTimeMillis() > startTime + timeout) {
+          return WaitResult.TIMEOUT;
+        }
+        throw new RuntimeException("Invalid state when waiting for listener.");

Review comment:
       Ah yes, you're right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#issuecomment-709655899


   https://issues.apache.org/jira/browse/ARROW-10106


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jduo commented on a change in pull request #8476: ARROW-10106: [FlightRPC][Java] Expose onIsReady() callback

Posted by GitBox <gi...@apache.org>.
jduo commented on a change in pull request #8476:
URL: https://github.com/apache/arrow/pull/8476#discussion_r506961358



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -88,12 +87,15 @@ public void listFlights(Flight.Criteria criteria, StreamObserver<Flight.FlightIn
   public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserverSimple) {
     final ServerCallStreamObserver<ArrowMessage> responseObserver =
         (ServerCallStreamObserver<ArrowMessage>) responseObserverSimple;
+
     final GetListener listener = new GetListener(responseObserver, this::handleExceptionWithMiddleware);
-    try {
-      producer.getStream(makeContext(responseObserver), new Ticket(ticket), listener);
-    } catch (Exception ex) {
-      listener.error(ex);
-    }
+    executors.submit(() -> {

Review comment:
       Do you mean within GetListener? We register #onReady() on the responseObserver during GetListener's constructor, which will delegate to the onReadyHandler if it's been set.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org