You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/05/04 03:20:54 UTC

[GitHub] [arrow] emkornfield commented on a change in pull request #7012: ARROW-8555: [FlightRPC][Java] implement DoExchange

emkornfield commented on a change in pull request #7012:
URL: https://github.com/apache/arrow/pull/7012#discussion_r419199838



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -155,79 +152,35 @@ public void setOnCancelHandler(Runnable handler) {
       this.onCancelHandler = handler;
     }
 
-    @Override
-    public boolean isReady() {
-      return responseObserver.isReady();
-    }
-
     @Override
     public boolean isCancelled() {
       return responseObserver.isCancelled();
     }
 
     @Override
-    public void start(VectorSchemaRoot root) {
-      start(root, new MapDictionaryProvider());
-    }
-
-    @Override
-    public void start(VectorSchemaRoot root, DictionaryProvider provider) {
-      unloader = new VectorUnloader(root, true, true);
-
-      try {
-        DictionaryUtils.generateSchemaMessages(root.getSchema(), null, provider, responseObserver::onNext);
-      } catch (Exception e) {
-        // Only happens if closing buffers somehow fails - indicates application is an unknown state so propagate
-        // the exception
-        throw new RuntimeException("Could not generate and send all schema messages", e);
-      }
-    }
-
-    @Override
-    public void putNext() {
-      putNext(null);
-    }
-
-    @Override
-    public void putNext(ArrowBuf metadata) {
-      Preconditions.checkNotNull(unloader);
-      // close is a no-op if the message has been written to gRPC, otherwise frees the associated buffers
-      // in some code paths (e.g. if the call is cancelled), gRPC does not write the message, so we need to clean up
-      // ourselves. Normally, writing the ArrowMessage will transfer ownership of the data to gRPC/Netty.
-      try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), metadata)) {
-        responseObserver.onNext(message);
-      } catch (Exception e) {
-        // This exception comes from ArrowMessage#close, not responseObserver#onNext.
-        // Generally this should not happen - ArrowMessage's implementation only closes non-throwing things.
-        // The user can't reasonably do anything about this, but if something does throw, we shouldn't let
-        // execution continue since other state (e.g. allocators) may be in an odd state.
-        throw new RuntimeException("Could not free ArrowMessage", e);
-      }
+    protected void waitUntilStreamReady() {
+      // Don't do anything - service implementations are expected to manage backpressure themselves

Review comment:
       Is that clearly documented someplace?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org