You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/04/29 12:53:03 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #7012: ARROW-8555: [FlightRPC][Java] implement DoExchange

lidavidm commented on a change in pull request #7012:
URL: https://github.com/apache/arrow/pull/7012#discussion_r417290242



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -293,6 +292,76 @@ public void onCompleted() {
     return stream;
   }
 
+  /**
+   * Initiate a bidirectional data exchange with the server.
+   *
+   * @param descriptor A descriptor for the data stream.
+   * @param options RPC call options.
+   * @return A pair of a readable stream and a writable stream.
+   */
+  public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption... options) {
+    Preconditions.checkNotNull(descriptor);
+    final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
+
+    try {
+      final ClientCall<ArrowMessage, ArrowMessage> call = interceptedChannel.newCall(doExchangeDescriptor, callOptions);
+      final FlightStream stream = new FlightStream(allocator, PENDING_REQUESTS, call::cancel, call::request);
+      final ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
+              ClientCalls.asyncBidiStreamingCall(call, stream.asObserver());
+      final ClientStreamListener writer = new PutObserver(
+          descriptor, observer, stream.completed::isDone,
+          () -> {
+            try {
+              stream.completed.get();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw CallStatus.INTERNAL.withDescription("Client error: interrupted").withCause(e).toRuntimeException();
+            } catch (ExecutionException e) {
+              throw CallStatus.INTERNAL.withDescription("Client error: " + e).withCause(e).toRuntimeException();
+            }
+          });
+      // Send the descriptor to start.
+      try (final ArrowMessage message = new ArrowMessage(descriptor.toProtocol())) {
+        observer.onNext(message);
+      } catch (Exception e) {
+        throw CallStatus.INTERNAL
+            .withCause(e)
+            .withDescription("Could not write descriptor message: " + e)

Review comment:
       Yes, you're right. I changed it to instead include the descriptor that couldn't be written.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org