You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2020/05/15 03:48:44 UTC

[arrow] branch master updated: ARROW-8555: [FlightRPC][Java] implement DoExchange

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d8b8cc7  ARROW-8555: [FlightRPC][Java] implement DoExchange
d8b8cc7 is described below

commit d8b8cc7d714a6e7eae4eb4ebc4edc3649de4bef2
Author: David Li <li...@gmail.com>
AuthorDate: Thu May 14 20:48:23 2020 -0700

    ARROW-8555: [FlightRPC][Java] implement DoExchange
    
    This is a complete implementation of DoExchange for Java. It is not tested against the C++ implementation yet, however, it still passes integration tests, so the internal refactoring should not have broken compatibility with existing clients/servers.
    
    In this PR, I've refactored DoGet/DoPut/DoExchange on the client and server to share their implementation as much as possible. DoGet/DoPut retain their behavior of "eagerly" reading/writing schemas, but DoExchange allows the client/server to delay writing the schema until ready. This is checked in the unit tests.
    
    I also ran into some test flakes and tried to address them, by making sure we clean up things in the right order, and adding missing `close()` calls in some existing tests.
    
    Closes #7012 from lidavidm/doexchange-java
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: Micah Kornfield <em...@gmail.com>
---
 java/flight/flight-core/pom.xml                    |   8 +
 .../java/org/apache/arrow/flight/ArrowMessage.java |  35 +-
 .../java/org/apache/arrow/flight/CallStatus.java   |   2 +-
 .../apache/arrow/flight/FlightBindingService.java  |  35 +-
 .../java/org/apache/arrow/flight/FlightClient.java | 197 ++++++----
 .../java/org/apache/arrow/flight/FlightMethod.java |   3 +
 .../org/apache/arrow/flight/FlightProducer.java    |  50 +--
 .../java/org/apache/arrow/flight/FlightServer.java |   6 +-
 .../org/apache/arrow/flight/FlightService.java     | 175 +++++----
 .../java/org/apache/arrow/flight/FlightStream.java | 105 ++++--
 .../arrow/flight/OutboundStreamListener.java       |  82 +++++
 .../arrow/flight/OutboundStreamListenerImpl.java   | 119 +++++++
 .../apache/arrow/flight/TestBasicOperation.java    |  19 +-
 .../org/apache/arrow/flight/TestDoExchange.java    | 395 +++++++++++++++++++++
 .../org/apache/arrow/flight/TestErrorMetadata.java |  10 +-
 .../org/apache/arrow/flight/TestServerOptions.java |  17 +-
 16 files changed, 1018 insertions(+), 240 deletions(-)

diff --git a/java/flight/flight-core/pom.xml b/java/flight/flight-core/pom.xml
index 8301c71..43ac6cc 100644
--- a/java/flight/flight-core/pom.xml
+++ b/java/flight/flight-core/pom.xml
@@ -132,6 +132,14 @@
       <version>1.12.0</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.arrow</groupId>
+      <artifactId>arrow-vector</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <extensions>
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
index fd59dd5..1758215 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
@@ -154,6 +154,24 @@ class ArrowMessage implements AutoCloseable {
     this.appMetadata = null;
   }
 
+  /**
+   * Create an ArrowMessage containing only application metadata.
+   * @param appMetadata The application-provided metadata buffer.
+   */
+  public ArrowMessage(ArrowBuf appMetadata) {
+    this.message = null;
+    this.bufs = ImmutableList.of();
+    this.descriptor = null;
+    this.appMetadata = appMetadata;
+  }
+
+  public ArrowMessage(FlightDescriptor descriptor) {
+    this.message = null;
+    this.bufs = ImmutableList.of();
+    this.descriptor = descriptor;
+    this.appMetadata = null;
+  }
+
   private ArrowMessage(FlightDescriptor descriptor, MessageMetadataResult message, ArrowBuf appMetadata,
                        ArrowBuf buf) {
     this.message = message;
@@ -171,6 +189,10 @@ class ArrowMessage implements AutoCloseable {
   }
 
   public HeaderType getMessageType() {
+    if (message == null) {
+      // Null message occurs for metadata-only messages (in DoExchange)
+      return HeaderType.NONE;
+    }
     return HeaderType.getHeader(message.headerType());
   }
 
@@ -271,8 +293,19 @@ class ArrowMessage implements AutoCloseable {
    * @return InputStream
    */
   private InputStream asInputStream(BufferAllocator allocator) {
-    try {
+    if (message == null) {
+      // If we have no IPC message, it's a pure-metadata message
+      final FlightData.Builder builder = FlightData.newBuilder();
+      if (descriptor != null) {
+        builder.setFlightDescriptor(descriptor);
+      }
+      if (appMetadata != null) {
+        builder.setAppMetadata(ByteString.copyFrom(appMetadata.nioBuffer()));
+      }
+      return NO_BODY_MARSHALLER.stream(builder.build());
+    }
 
+    try {
       final ByteString bytes = ByteString.copyFrom(message.getMessageBuffer(),
           message.bytesAfterMessage());
 
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
index a43b824..991d0ed 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/CallStatus.java
@@ -137,7 +137,7 @@ public class CallStatus {
         "code=" + code +
         ", cause=" + cause +
         ", description='" + description +
-        ", metadata='" + metadata + '\'' +
+        "', metadata='" + metadata + '\'' +
         '}';
   }
 }
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightBindingService.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightBindingService.java
index 13051e7..ba5249b 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightBindingService.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightBindingService.java
@@ -45,7 +45,9 @@ class FlightBindingService implements BindableService {
 
   private static final String DO_GET = MethodDescriptor.generateFullMethodName(FlightConstants.SERVICE, "DoGet");
   private static final String DO_PUT = MethodDescriptor.generateFullMethodName(FlightConstants.SERVICE, "DoPut");
-  private static final Set<String> OVERRIDE_METHODS = ImmutableSet.of(DO_GET, DO_PUT);
+  private static final String DO_EXCHANGE = MethodDescriptor.generateFullMethodName(
+      FlightConstants.SERVICE, "DoExchange");
+  private static final Set<String> OVERRIDE_METHODS = ImmutableSet.of(DO_GET, DO_PUT, DO_EXCHANGE);
 
   private final FlightService delegate;
   private final BufferAllocator allocator;
@@ -78,19 +80,31 @@ class FlightBindingService implements BindableService {
         .build();
   }
 
+  public static MethodDescriptor<ArrowMessage, ArrowMessage> getDoExchangeDescriptor(BufferAllocator allocator) {
+    return MethodDescriptor.<ArrowMessage, ArrowMessage>newBuilder()
+            .setType(MethodType.BIDI_STREAMING)
+            .setFullMethodName(DO_EXCHANGE)
+            .setSampledToLocalTracing(false)
+            .setRequestMarshaller(ArrowMessage.createMarshaller(allocator))
+            .setResponseMarshaller(ArrowMessage.createMarshaller(allocator))
+            .setSchemaDescriptor(FlightServiceGrpc.getDoExchangeMethod().getSchemaDescriptor())
+            .build();
+  }
+
   @Override
   public ServerServiceDefinition bindService() {
     final ServerServiceDefinition baseDefinition = delegate.bindService();
 
     final MethodDescriptor<Flight.Ticket, ArrowMessage> doGetDescriptor = getDoGetDescriptor(allocator);
-
     final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor = getDoPutDescriptor(allocator);
+    final MethodDescriptor<ArrowMessage, ArrowMessage> doExchangeDescriptor = getDoExchangeDescriptor(allocator);
 
     // Make sure we preserve SchemaDescriptor fields on methods so that gRPC reflection still works.
     final ServiceDescriptor.Builder serviceDescriptorBuilder = ServiceDescriptor.newBuilder(FlightConstants.SERVICE)
         .setSchemaDescriptor(baseDefinition.getServiceDescriptor().getSchemaDescriptor());
     serviceDescriptorBuilder.addMethod(doGetDescriptor);
     serviceDescriptorBuilder.addMethod(doPutDescriptor);
+    serviceDescriptorBuilder.addMethod(doExchangeDescriptor);
     for (MethodDescriptor<?, ?> definition : baseDefinition.getServiceDescriptor().getMethods()) {
       if (OVERRIDE_METHODS.contains(definition.getFullMethodName())) {
         continue;
@@ -103,6 +117,7 @@ class FlightBindingService implements BindableService {
     ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition.builder(serviceDescriptor);
     serviceBuilder.addMethod(doGetDescriptor, ServerCalls.asyncServerStreamingCall(new DoGetMethod(delegate)));
     serviceBuilder.addMethod(doPutDescriptor, ServerCalls.asyncBidiStreamingCall(new DoPutMethod(delegate)));
+    serviceBuilder.addMethod(doExchangeDescriptor, ServerCalls.asyncBidiStreamingCall(new DoExchangeMethod(delegate)));
 
     // copy over not-overridden methods.
     for (ServerMethodDefinition<?, ?> definition : baseDefinition.getMethods()) {
@@ -116,7 +131,7 @@ class FlightBindingService implements BindableService {
     return serviceBuilder.build();
   }
 
-  private class DoGetMethod implements ServerCalls.ServerStreamingMethod<Flight.Ticket, ArrowMessage> {
+  private static class DoGetMethod implements ServerCalls.ServerStreamingMethod<Flight.Ticket, ArrowMessage> {
 
     private final FlightService delegate;
 
@@ -130,7 +145,7 @@ class FlightBindingService implements BindableService {
     }
   }
 
-  private class DoPutMethod implements ServerCalls.BidiStreamingMethod<ArrowMessage, PutResult> {
+  private static class DoPutMethod implements ServerCalls.BidiStreamingMethod<ArrowMessage, PutResult> {
     private final FlightService delegate;
 
     public DoPutMethod(FlightService delegate) {
@@ -141,7 +156,19 @@ class FlightBindingService implements BindableService {
     public StreamObserver<ArrowMessage> invoke(StreamObserver<PutResult> responseObserver) {
       return delegate.doPutCustom(responseObserver);
     }
+  }
+
+  private static class DoExchangeMethod implements ServerCalls.BidiStreamingMethod<ArrowMessage, ArrowMessage> {
+    private final FlightService delegate;
 
+    public DoExchangeMethod(FlightService delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public StreamObserver<ArrowMessage> invoke(StreamObserver<ArrowMessage> responseObserver) {
+      return delegate.doExchangeCustom(responseObserver);
+    }
   }
 
 }
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
index 93f89f9..fe9cfe2 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
@@ -22,7 +22,9 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 
 import javax.net.ssl.SSLException;
 
@@ -38,14 +40,11 @@ import org.apache.arrow.flight.impl.Flight.Empty;
 import org.apache.arrow.flight.impl.FlightServiceGrpc;
 import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceBlockingStub;
 import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceStub;
-import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 
 import io.grpc.Channel;
 import io.grpc.ClientCall;
@@ -79,6 +78,7 @@ public class FlightClient implements AutoCloseable {
   private final ClientAuthInterceptor authInterceptor = new ClientAuthInterceptor();
   private final MethodDescriptor<Flight.Ticket, ArrowMessage> doGetDescriptor;
   private final MethodDescriptor<ArrowMessage, Flight.PutResult> doPutDescriptor;
+  private final MethodDescriptor<ArrowMessage, ArrowMessage> doExchangeDescriptor;
 
   /**
    * Create a Flight client from an allocator and a gRPC channel.
@@ -98,6 +98,7 @@ public class FlightClient implements AutoCloseable {
     asyncStub = FlightServiceGrpc.newStub(interceptedChannel);
     doGetDescriptor = FlightBindingService.getDoGetDescriptor(allocator);
     doPutDescriptor = FlightBindingService.getDoPutDescriptor(allocator);
+    doExchangeDescriptor = FlightBindingService.getDoExchangeDescriptor(allocator);
   }
 
   /**
@@ -195,31 +196,29 @@ public class FlightClient implements AutoCloseable {
    * @param root VectorSchemaRoot the root containing data
    * @param metadataListener A handler for metadata messages from the server.
    * @param options RPC-layer hints for this call.
-   * @return ClientStreamListener an interface to control uploading data
+   * @return ClientStreamListener an interface to control uploading data.
+   *     {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will already have been called.
    */
   public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRoot root, DictionaryProvider provider,
       PutListener metadataListener, CallOption... options) {
-    Preconditions.checkNotNull(descriptor);
-    Preconditions.checkNotNull(root);
+    Preconditions.checkNotNull(descriptor, "descriptor must not be null");
+    Preconditions.checkNotNull(root, "root must not be null");
+    Preconditions.checkNotNull(provider, "provider must not be null");
+    Preconditions.checkNotNull(metadataListener, "metadataListener must not be null");
+    final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
 
     try {
-      SetStreamObserver resultObserver = new SetStreamObserver(allocator, metadataListener);
-      final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
+      final SetStreamObserver resultObserver = new SetStreamObserver(allocator, metadataListener);
       ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
           ClientCalls.asyncBidiStreamingCall(
               interceptedChannel.newCall(doPutDescriptor, callOptions), resultObserver);
-      // send the schema to start.
-      DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, provider, observer::onNext);
-      return new PutObserver(new VectorUnloader(
-          root, true /* include # of nulls in vectors */, true /* must align buffers to be C++-compatible */),
-          observer, metadataListener);
+      final ClientStreamListener writer = new PutObserver(
+          descriptor, observer, metadataListener::isCancelled, metadataListener::getResult);
+      // Send the schema to start.
+      writer.start(root, provider);
+      return writer;
     } catch (StatusRuntimeException sre) {
       throw StatusUtils.fromGrpcRuntimeException(sre);
-    } catch (Exception e) {
-      // Only happens if DictionaryUtils#generateSchemaMessages fails. This should only happen if closing buffers fails,
-      // which means the application is in an unknown state, so propagate the exception.
-      throw CallStatus.INTERNAL.withDescription("Could not send all schema messages: " + e.toString()).withCause(e)
-          .toRuntimeException();
     }
   }
 
@@ -293,6 +292,82 @@ public class FlightClient implements AutoCloseable {
     return stream;
   }
 
+  /**
+   * Initiate a bidirectional data exchange with the server.
+   *
+   * @param descriptor A descriptor for the data stream.
+   * @param options RPC call options.
+   * @return A pair of a readable stream and a writable stream.
+   */
+  public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption... options) {
+    Preconditions.checkNotNull(descriptor, "descriptor must not be null");
+    final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
+
+    try {
+      final ClientCall<ArrowMessage, ArrowMessage> call = interceptedChannel.newCall(doExchangeDescriptor, callOptions);
+      final FlightStream stream = new FlightStream(allocator, PENDING_REQUESTS, call::cancel, call::request);
+      final ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
+              ClientCalls.asyncBidiStreamingCall(call, stream.asObserver());
+      final ClientStreamListener writer = new PutObserver(
+          descriptor, observer, stream.completed::isDone,
+          () -> {
+            try {
+              stream.completed.get();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw CallStatus.INTERNAL
+                  .withDescription("Client error: interrupted while completing call")
+                  .withCause(e)
+                  .toRuntimeException();
+            } catch (ExecutionException e) {
+              throw CallStatus.INTERNAL
+                  .withDescription("Client error: internal while completing call")
+                  .withCause(e)
+                  .toRuntimeException();
+            }
+          });
+      // Send the descriptor to start.
+      try (final ArrowMessage message = new ArrowMessage(descriptor.toProtocol())) {
+        observer.onNext(message);
+      } catch (Exception e) {
+        throw CallStatus.INTERNAL
+            .withCause(e)
+            .withDescription("Could not write descriptor " + descriptor)
+            .toRuntimeException();
+      }
+      return new ExchangeReaderWriter(stream, writer);
+    } catch (StatusRuntimeException sre) {
+      throw StatusUtils.fromGrpcRuntimeException(sre);
+    }
+  }
+
+  /** A pair of a reader and a writer for a DoExchange call. */
+  public static class ExchangeReaderWriter implements AutoCloseable {
+    private final FlightStream reader;
+    private final ClientStreamListener writer;
+
+    ExchangeReaderWriter(FlightStream reader, ClientStreamListener writer) {
+      this.reader = reader;
+      this.writer = writer;
+    }
+
+    /** Get the reader for the call. */
+    public FlightStream getReader() {
+      return reader;
+    }
+
+    /** Get the writer for the call. */
+    public ClientStreamListener getWriter() {
+      return writer;
+    }
+
+    /** Shut down the streams in this call. */
+    @Override
+    public void close() throws Exception {
+      reader.close();
+    }
+  }
+
   private static class SetStreamObserver implements StreamObserver<Flight.PutResult> {
     private final BufferAllocator allocator;
     private final StreamListener<PutResult> listener;
@@ -321,81 +396,51 @@ public class FlightClient implements AutoCloseable {
     }
   }
 
-  private static class PutObserver implements ClientStreamListener {
-
-    private final ClientCallStreamObserver<ArrowMessage> observer;
-    private final VectorUnloader unloader;
-    private final PutListener listener;
-
-    public PutObserver(VectorUnloader unloader, ClientCallStreamObserver<ArrowMessage> observer,
-        PutListener listener) {
-      this.observer = observer;
-      this.unloader = unloader;
-      this.listener = listener;
-    }
+  /**
+   * The implementation of a {@link ClientStreamListener} for writing data to a Flight server.
+   */
+  static class PutObserver extends OutboundStreamListenerImpl implements ClientStreamListener {
+    private final BooleanSupplier isCancelled;
+    private final Runnable getResult;
 
-    @Override
-    public void putNext() {
-      putNext(null);
+    /**
+     * Create a new client stream listener.
+     *
+     * @param descriptor The descriptor for the stream.
+     * @param observer The write-side gRPC StreamObserver.
+     * @param isCancelled A flag to check if the call has been cancelled.
+     * @param getResult A flag that blocks until the overall call completes.
+     */
+    PutObserver(FlightDescriptor descriptor, ClientCallStreamObserver<ArrowMessage> observer,
+                BooleanSupplier isCancelled, Runnable getResult) {
+      super(descriptor, observer);
+      Preconditions.checkNotNull(descriptor, "descriptor must be provided");
+      Preconditions.checkNotNull(isCancelled, "isCancelled must be provided");
+      Preconditions.checkNotNull(getResult, "getResult must be provided");
+      this.isCancelled = isCancelled;
+      this.getResult = getResult;
+      this.unloader = null;
     }
 
     @Override
-    public void putNext(ArrowBuf appMetadata) {
-      ArrowRecordBatch batch = unloader.getRecordBatch();
+    protected void waitUntilStreamReady() {
       // Check isCancelled as well to avoid inadvertently blocking forever
       // (so long as PutListener properly implements it)
-      while (!observer.isReady() && !listener.isCancelled()) {
+      while (!responseObserver.isReady() && !isCancelled.getAsBoolean()) {
         /* busy wait */
       }
-      // ArrowMessage takes ownership of appMetadata and batch
-      // gRPC should take ownership of ArrowMessage, but in some cases it doesn't, so guard against it
-      // ArrowMessage#close is a no-op if gRPC did its job
-      try (final ArrowMessage message = new ArrowMessage(batch, appMetadata)) {
-        observer.onNext(message);
-      } catch (Exception e) {
-        throw StatusUtils.fromThrowable(e);
-      }
-    }
-
-    @Override
-    public void error(Throwable ex) {
-      observer.onError(StatusUtils.toGrpcException(ex));
-    }
-
-    @Override
-    public void completed() {
-      observer.onCompleted();
     }
 
     @Override
     public void getResult() {
-      listener.getResult();
+      getResult.run();
     }
   }
 
   /**
-   * Interface for subscribers to a stream returned by the server.
+   * Interface for writers to an Arrow data stream.
    */
-  public interface ClientStreamListener {
-
-    /**
-     * Send the current data in the corresponding {@link VectorSchemaRoot} to the server.
-     */
-    void putNext();
-
-    /**
-     * Send the current data in the corresponding {@link VectorSchemaRoot} to the server, along with
-     * application-specific metadata. This takes ownership of the buffer.
-     */
-    void putNext(ArrowBuf appMetadata);
-
-    /**
-     * Indicate an error to the server. Terminates the stream; do not call {@link #completed()}.
-     */
-    void error(Throwable ex);
-
-    /** Indicate the stream is finished on the client side. */
-    void completed();
+  public interface ClientStreamListener extends OutboundStreamListener {
 
     /**
      * Wait for the stream to finish on the server side. You must call this to be notified of any errors that may have
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightMethod.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightMethod.java
index 13d72db..5d2915b 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightMethod.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightMethod.java
@@ -31,6 +31,7 @@ public enum FlightMethod {
   DO_PUT,
   DO_ACTION,
   LIST_ACTIONS,
+  DO_EXCHANGE,
   ;
 
   /**
@@ -55,6 +56,8 @@ public enum FlightMethod {
       return DO_ACTION;
     } else if (FlightServiceGrpc.getListActionsMethod().getFullMethodName().equals(methodName)) {
       return LIST_ACTIONS;
+    } else if (FlightServiceGrpc.getDoExchangeMethod().getFullMethodName().equals(methodName)) {
+      return DO_EXCHANGE;
     }
     throw new IllegalArgumentException("Not a Flight method name in gRPC: " + methodName);
   }
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightProducer.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightProducer.java
index ee064ad..5e5b265 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightProducer.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightProducer.java
@@ -19,10 +19,6 @@ package org.apache.arrow.flight;
 
 import java.util.Map;
 
-import org.apache.arrow.memory.ArrowBuf;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.dictionary.DictionaryProvider;
-
 /**
  * API to Implement an Arrow Flight producer.
  */
@@ -78,6 +74,10 @@ public interface FlightProducer {
   Runnable acceptPut(CallContext context,
       FlightStream flightStream, StreamListener<PutResult> ackStream);
 
+  default void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) {
+    throw CallStatus.UNIMPLEMENTED.withDescription("DoExchange is unimplemented").toRuntimeException();
+  }
+
   /**
    * Generic handler for application-defined RPCs.
    *
@@ -98,7 +98,7 @@ public interface FlightProducer {
   /**
    * An interface for sending Arrow data back to a client.
    */
-  interface ServerStreamListener {
+  interface ServerStreamListener extends OutboundStreamListener {
 
     /**
      * Check whether the call has been cancelled. If so, stop sending data.
@@ -106,46 +106,6 @@ public interface FlightProducer {
     boolean isCancelled();
 
     /**
-     * A hint indicating whether the client is ready to receive data without excessive buffering.
-     */
-    boolean isReady();
-
-    /**
-     * Start sending data, using the schema of the given {@link VectorSchemaRoot}.
-     *
-     * <p>This method must be called before all others.
-     */
-    void start(VectorSchemaRoot root);
-
-    /**
-     * Start sending data, using the schema of the given {@link VectorSchemaRoot}.
-     *
-     * <p>This method must be called before all others.
-     */
-    void start(VectorSchemaRoot root, DictionaryProvider dictionaries);
-
-    /**
-     * Send the current contents of the associated {@link VectorSchemaRoot}.
-     */
-    void putNext();
-
-    /**
-     * Send the current contents of the associated {@link VectorSchemaRoot} alongside application-defined metadata.
-     * @param metadata The metadata to send. Ownership of the buffer is transferred to the Flight implementation.
-     */
-    void putNext(ArrowBuf metadata);
-
-    /**
-     * Indicate an error to the client. Terminates the stream; do not call {@link #completed()} afterwards.
-     */
-    void error(Throwable ex);
-
-    /**
-     * Indicate that transmission is finished.
-     */
-    void completed();
-
-    /**
      * Set a callback for when the client cancels a call, i.e. {@link #isCancelled()} has become true.
      *
      * <p>Note that this callback may only be called some time after {@link #isCancelled()} becomes true, and may never
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
index 8523416..3c8b7ae 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java
@@ -42,6 +42,8 @@ import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.util.Preconditions;
 import org.apache.arrow.util.VisibleForTesting;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import io.grpc.Server;
 import io.grpc.ServerInterceptors;
 import io.grpc.netty.NettyServerBuilder;
@@ -243,7 +245,9 @@ public class FlightServer implements AutoCloseable {
         exec = executor;
         grpcExecutor = null;
       } else {
-        exec = Executors.newCachedThreadPool();
+        exec = Executors.newCachedThreadPool(
+            // Name threads for better debuggability
+            new ThreadFactoryBuilder().setNameFormat("flight-server-default-executor-%d").build());
         grpcExecutor = exec;
       }
       final FlightBindingService flightService = new FlightBindingService(allocator, producer, authHandler, exec);
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
index 955d51f..30c7d30 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
@@ -33,13 +33,8 @@ import org.apache.arrow.flight.grpc.ServerInterceptorAdapter;
 import org.apache.arrow.flight.grpc.StatusUtils;
 import org.apache.arrow.flight.impl.Flight;
 import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceImplBase;
-import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.util.Preconditions;
-import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.VectorUnloader;
-import org.apache.arrow.vector.dictionary.DictionaryProvider;
-import org.apache.arrow.vector.dictionary.DictionaryProvider.MapDictionaryProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,10 +85,12 @@ class FlightService extends FlightServiceImplBase {
     // Do NOT call StreamPipe#onCompleted, as the FlightProducer implementation may be asynchronous
   }
 
-  public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserver) {
+  public void doGetCustom(Flight.Ticket ticket, StreamObserver<ArrowMessage> responseObserverSimple) {
+    final ServerCallStreamObserver<ArrowMessage> responseObserver =
+        (ServerCallStreamObserver<ArrowMessage>) responseObserverSimple;
     final GetListener listener = new GetListener(responseObserver, this::handleExceptionWithMiddleware);
     try {
-      producer.getStream(makeContext((ServerCallStreamObserver<?>) responseObserver), new Ticket(ticket), listener);
+      producer.getStream(makeContext(responseObserver), new Ticket(ticket), listener);
     } catch (Exception ex) {
       listener.error(ex);
     }
@@ -126,7 +123,7 @@ class FlightService extends FlightServiceImplBase {
     // Do NOT call StreamPipe#onCompleted, as the FlightProducer implementation may be asynchronous
   }
 
-  private static class GetListener implements ServerStreamListener {
+  private static class GetListener extends OutboundStreamListenerImpl implements ServerStreamListener {
     private ServerCallStreamObserver<ArrowMessage> responseObserver;
     private final Consumer<Throwable> errorHandler;
     private Runnable onCancelHandler = null;
@@ -134,11 +131,11 @@ class FlightService extends FlightServiceImplBase {
     private volatile VectorUnloader unloader;
     private boolean completed;
 
-    public GetListener(StreamObserver<ArrowMessage> responseObserver, Consumer<Throwable> errorHandler) {
-      super();
+    public GetListener(ServerCallStreamObserver<ArrowMessage> responseObserver, Consumer<Throwable> errorHandler) {
+      super(null, responseObserver);
       this.errorHandler = errorHandler;
       this.completed = false;
-      this.responseObserver = (ServerCallStreamObserver<ArrowMessage>) responseObserver;
+      this.responseObserver = responseObserver;
       this.responseObserver.setOnCancelHandler(this::onCancel);
       this.responseObserver.disableAutoInboundFlowControl();
     }
@@ -156,60 +153,20 @@ class FlightService extends FlightServiceImplBase {
     }
 
     @Override
-    public boolean isReady() {
-      return responseObserver.isReady();
-    }
-
-    @Override
     public boolean isCancelled() {
       return responseObserver.isCancelled();
     }
 
     @Override
-    public void start(VectorSchemaRoot root) {
-      start(root, new MapDictionaryProvider());
-    }
-
-    @Override
-    public void start(VectorSchemaRoot root, DictionaryProvider provider) {
-      unloader = new VectorUnloader(root, true, true);
-
-      try {
-        DictionaryUtils.generateSchemaMessages(root.getSchema(), null, provider, responseObserver::onNext);
-      } catch (Exception e) {
-        // Only happens if closing buffers somehow fails - indicates application is an unknown state so propagate
-        // the exception
-        throw new RuntimeException("Could not generate and send all schema messages", e);
-      }
-    }
-
-    @Override
-    public void putNext() {
-      putNext(null);
-    }
-
-    @Override
-    public void putNext(ArrowBuf metadata) {
-      Preconditions.checkNotNull(unloader);
-      // close is a no-op if the message has been written to gRPC, otherwise frees the associated buffers
-      // in some code paths (e.g. if the call is cancelled), gRPC does not write the message, so we need to clean up
-      // ourselves. Normally, writing the ArrowMessage will transfer ownership of the data to gRPC/Netty.
-      try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), metadata)) {
-        responseObserver.onNext(message);
-      } catch (Exception e) {
-        // This exception comes from ArrowMessage#close, not responseObserver#onNext.
-        // Generally this should not happen - ArrowMessage's implementation only closes non-throwing things.
-        // The user can't reasonably do anything about this, but if something does throw, we shouldn't let
-        // execution continue since other state (e.g. allocators) may be in an odd state.
-        throw new RuntimeException("Could not free ArrowMessage", e);
-      }
+    protected void waitUntilStreamReady() {
+      // Don't do anything - service implementations are expected to manage backpressure themselves
     }
 
     @Override
     public void error(Throwable ex) {
       if (!completed) {
         completed = true;
-        responseObserver.onError(StatusUtils.toGrpcException(ex));
+        super.error(ex);
       } else {
         errorHandler.accept(ex);
       }
@@ -217,17 +174,13 @@ class FlightService extends FlightServiceImplBase {
 
     @Override
     public void completed() {
-      if (unloader == null) {
-        throw new IllegalStateException("Can't complete stream before starting it");
-      }
       if (!completed) {
         completed = true;
-        responseObserver.onCompleted();
+        super.completed();
       } else {
         errorHandler.accept(new IllegalStateException("Tried to complete already-completed call"));
       }
     }
-
   }
 
   public StreamObserver<ArrowMessage> doPutCustom(final StreamObserver<Flight.PutResult> responseObserverSimple) {
@@ -248,14 +201,15 @@ class FlightService extends FlightServiceImplBase {
       } catch (Exception ex) {
         ackStream.onError(ex);
       } finally {
-        // ARROW-6136: Close the stream if and only if acceptPut hasn't closed it itself
-        // We don't do this for other streams since the implementation may be asynchronous
-        ackStream.ensureCompleted();
+        // Close this stream before telling gRPC that the call is complete. That way we don't race with server shutdown.
         try {
           fs.close();
         } catch (Exception e) {
           handleExceptionWithMiddleware(e);
         }
+        // ARROW-6136: Close the stream if and only if acceptPut hasn't closed it itself
+        // We don't do this for other streams since the implementation may be asynchronous
+        ackStream.ensureCompleted();
       }
     });
 
@@ -302,6 +256,103 @@ class FlightService extends FlightServiceImplBase {
     }
   }
 
+  /** Ensures that other resources are cleaned up when the service finishes its call.  */
+  private static class ExchangeListener extends GetListener {
+    private final AutoCloseable resource;
+    private boolean closed = false;
+    private Runnable onCancelHandler = null;
+
+    public ExchangeListener(ServerCallStreamObserver<ArrowMessage> responseObserver, Consumer<Throwable> errorHandler,
+                            AutoCloseable resource) {
+      super(responseObserver, errorHandler);
+      this.resource = resource;
+      super.setOnCancelHandler(() -> {
+        try {
+          if (onCancelHandler != null) {
+            onCancelHandler.run();
+          }
+        } finally {
+          cleanup();
+        }
+      });
+    }
+
+    private void cleanup() {
+      if (closed) {
+        // Prevent double-free. gRPC will call the OnCancelHandler even on a normal call end, which means that
+        // we'll double-free without this guard.
+        return;
+      }
+      closed = true;
+      try {
+        this.resource.close();
+      } catch (Exception e) {
+        throw CallStatus.INTERNAL
+            .withCause(e)
+            .withDescription("Server internal error cleaning up resources")
+            .toRuntimeException();
+      }
+    }
+
+    @Override
+    public void error(Throwable ex) {
+      try {
+        this.cleanup();
+      } finally {
+        super.error(ex);
+      }
+    }
+
+    @Override
+    public void completed() {
+      try {
+        this.cleanup();
+      } finally {
+        super.completed();
+      }
+    }
+
+    @Override
+    public void setOnCancelHandler(Runnable handler) {
+      onCancelHandler = handler;
+    }
+  }
+
+  public StreamObserver<ArrowMessage> doExchangeCustom(StreamObserver<ArrowMessage> responseObserverSimple) {
+    final ServerCallStreamObserver<ArrowMessage> responseObserver =
+        (ServerCallStreamObserver<ArrowMessage>) responseObserverSimple;
+    final FlightStream fs = new FlightStream(allocator, PENDING_REQUESTS, (String message, Throwable cause) -> {
+      responseObserver.onError(Status.CANCELLED.withCause(cause).withDescription(message).asException());
+    }, responseObserver::request);
+    // When service completes the call, this cleans up the FlightStream
+    final ExchangeListener listener = new ExchangeListener(
+        responseObserver,
+        this::handleExceptionWithMiddleware,
+        () -> {
+          // Force the stream to "complete" so it will close without incident. At this point, we don't care since
+          // we are about to end the call. (Normally it will raise an error.)
+          fs.completed.complete(null);
+          fs.close();
+        });
+    responseObserver.disableAutoInboundFlowControl();
+    responseObserver.request(1);
+    final StreamObserver<ArrowMessage> observer = fs.asObserver();
+    try {
+      executors.submit(() -> {
+        try {
+          producer.doExchange(makeContext(responseObserver), fs, listener);
+        } catch (Exception ex) {
+          listener.error(ex);
+        }
+        // We do not clean up or close anything here, to allow long-running asynchronous implementations.
+        // It is the service's responsibility to call completed() or error(), which will then clean up the FlightStream.
+      });
+    } catch (Exception ex) {
+      listener.error(ex);
+    }
+    return observer;
+  }
+
   /**
    * Call context for the service.
    */
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
index 2302230..cbdbf05 100644
--- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -60,16 +61,16 @@ public class FlightStream implements AutoCloseable {
   private final Cancellable cancellable;
   private final LinkedBlockingQueue<AutoCloseable> queue = new LinkedBlockingQueue<>();
   private final SettableFuture<VectorSchemaRoot> root = SettableFuture.create();
+  private final SettableFuture<FlightDescriptor> descriptor = SettableFuture.create();
   private final int pendingTarget;
   private final Requestor requestor;
+  final CompletableFuture<Void> completed;
 
   private volatile int pending = 1;
-  private boolean completed = false;
   private volatile VectorSchemaRoot fulfilledRoot;
   private DictionaryProvider.MapDictionaryProvider dictionaries;
   private volatile VectorLoader loader;
   private volatile Throwable ex;
-  private volatile FlightDescriptor descriptor;
   private volatile ArrowBuf applicationMetadata = null;
 
   /**
@@ -86,6 +87,7 @@ public class FlightStream implements AutoCloseable {
     this.cancellable = cancellable;
     this.requestor = requestor;
     this.dictionaries = new DictionaryProvider.MapDictionaryProvider();
+    this.completed = new CompletableFuture<>();
   }
 
   /**
@@ -136,9 +138,15 @@ public class FlightStream implements AutoCloseable {
    * client sends the descriptor.
    */
   public FlightDescriptor getDescriptor() {
-    // This blocks until the schema message (with the descriptor) is sent.
-    getRoot();
-    return descriptor;
+    // This blocks until the first message from the client is received.
+    try {
+      return descriptor.get();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw CallStatus.INTERNAL.withCause(e).withDescription("Interrupted").toRuntimeException();
+    } catch (ExecutionException e) {
+      throw CallStatus.INTERNAL.withCause(e).withDescription("Error getting descriptor").toRuntimeException();
+    }
   }
 
   /**
@@ -150,11 +158,13 @@ public class FlightStream implements AutoCloseable {
     final List<AutoCloseable> closeables = new ArrayList<>();
     // cancellation can throw, but we still want to clean up resources, so make it an AutoCloseable too
     closeables.add(() -> {
-      if (!completed && cancellable != null) {
+      if (!completed.isDone() && cancellable != null) {
         cancel("Stream closed before end.", /* no exception to report */ null);
       }
     });
-    closeables.add(root.get());
+    if (fulfilledRoot != null) {
+      closeables.add(fulfilledRoot);
+    }
     closeables.add(applicationMetadata);
     closeables.addAll(queue);
     if (dictionaries != null) {
@@ -162,6 +172,9 @@ public class FlightStream implements AutoCloseable {
     }
 
     AutoCloseables.close(closeables);
+    // Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception)
+    // No-op if already complete; do this after the check in the AutoCloseable lambda above
+    completed.complete(null);
   }
 
   /**
@@ -170,21 +183,18 @@ public class FlightStream implements AutoCloseable {
    */
   public boolean next() {
     try {
-      // make sure we have the root
-      root.get().clear();
-
-      if (completed && queue.isEmpty()) {
+      if (completed.isDone() && queue.isEmpty()) {
         return false;
       }
 
-
       pending--;
       requestOutstanding();
 
       Object data = queue.take();
       if (DONE == data) {
         queue.put(DONE);
-        completed = true;
+        // Other code ignores the value of this CompletableFuture, only whether it's completed (or has an exception)
+        completed.complete(null);
         return false;
       } else if (DONE_EX == data) {
         queue.put(DONE_EX);
@@ -195,18 +205,22 @@ public class FlightStream implements AutoCloseable {
         }
       } else {
         try (ArrowMessage msg = ((ArrowMessage) data)) {
-          if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
+          if (msg.getMessageType() == HeaderType.NONE) {
+            updateMetadata(msg);
+            // We received a message without data, so erase any leftover data
+            if (fulfilledRoot != null) {
+              fulfilledRoot.clear();
+            }
+          } else if (msg.getMessageType() == HeaderType.RECORD_BATCH) {
+            // Ensure we have the root
+            root.get().clear();
             try (ArrowRecordBatch arb = msg.asRecordBatch()) {
               loader.load(arb);
             }
-            if (this.applicationMetadata != null) {
-              this.applicationMetadata.close();
-            }
-            this.applicationMetadata = msg.getApplicationMetadata();
-            if (this.applicationMetadata != null) {
-              this.applicationMetadata.getReferenceManager().retain();
-            }
+            updateMetadata(msg);
           } else if (msg.getMessageType() == HeaderType.DICTIONARY_BATCH) {
+            // Ensure we have the root
+            root.get().clear();
             try (ArrowDictionaryBatch arb = msg.asDictionaryBatch()) {
               final long id = arb.getDictionaryId();
               if (dictionaries == null) {
@@ -239,6 +253,17 @@ public class FlightStream implements AutoCloseable {
     }
   }
 
+  /** Update our metdata reference with a new one from this message. */
+  private void updateMetadata(ArrowMessage msg) {
+    if (this.applicationMetadata != null) {
+      this.applicationMetadata.close();
+    }
+    this.applicationMetadata = msg.getApplicationMetadata();
+    if (this.applicationMetadata != null) {
+      this.applicationMetadata.getReferenceManager().retain();
+    }
+  }
+
   /**
    * Get the current vector data from the stream.
    *
@@ -258,6 +283,17 @@ public class FlightStream implements AutoCloseable {
   }
 
   /**
+   * Check if there is a root (i.e. whether the other end has started sending data).
+   *
+   * Updated by calls to {@link #next()}.
+   *
+   * @return true if and only if the other end has started sending data.
+   */
+  public boolean hasRoot() {
+    return root.isDone();
+  }
+
+  /**
    * Get the most recent metadata sent from the server. This may be cleared by calls to {@link #next()} if the server
    * sends a message without metadata. This does NOT take ownership of the buffer - call retain() to create a reference
    * if you need the buffer after a call to {@link #next()}.
@@ -285,6 +321,16 @@ public class FlightStream implements AutoCloseable {
     public void onNext(ArrowMessage msg) {
       requestOutstanding();
       switch (msg.getMessageType()) {
+        case NONE: {
+          // No IPC message - pure metadata or descriptor
+          if (msg.getDescriptor() != null) {
+            descriptor.set(new FlightDescriptor(msg.getDescriptor()));
+          }
+          if (msg.getApplicationMetadata() != null) {
+            queue.add(msg);
+          }
+          break;
+        }
         case SCHEMA: {
           Schema schema = msg.asSchema();
           final List<Field> fields = new ArrayList<>();
@@ -299,7 +345,9 @@ public class FlightStream implements AutoCloseable {
           schema = new Schema(fields, schema.getCustomMetadata());
           fulfilledRoot = VectorSchemaRoot.create(schema, allocator);
           loader = new VectorLoader(fulfilledRoot);
-          descriptor = msg.getDescriptor() != null ? new FlightDescriptor(msg.getDescriptor()) : null;
+          if (msg.getDescriptor() != null) {
+            descriptor.set(new FlightDescriptor(msg.getDescriptor()));
+          }
           root.set(fulfilledRoot);
 
           break;
@@ -310,7 +358,6 @@ public class FlightStream implements AutoCloseable {
         case DICTIONARY_BATCH:
           queue.add(msg);
           break;
-        case NONE:
         case TENSOR:
         default:
           queue.add(DONE_EX);
@@ -320,18 +367,14 @@ public class FlightStream implements AutoCloseable {
 
     @Override
     public void onError(Throwable t) {
-      ex = t;
+      ex = StatusUtils.fromThrowable(t);
       queue.add(DONE_EX);
-      root.setException(t);
+      root.setException(ex);
     }
 
     @Override
     public void onCompleted() {
       // Depends on gRPC calling onNext and onCompleted non-concurrently
-      if (!root.isDone()) {
-        root.setException(
-            CallStatus.INTERNAL.withDescription("Stream completed without receiving schema.").toRuntimeException());
-      }
       queue.add(DONE);
     }
   }
@@ -342,6 +385,8 @@ public class FlightStream implements AutoCloseable {
    * @throws UnsupportedOperationException on a stream being uploaded from the client.
    */
   public void cancel(String message, Throwable exception) {
+    completed.completeExceptionally(
+        CallStatus.CANCELLED.withDescription(message).withCause(exception).toRuntimeException());
     if (cancellable != null) {
       cancellable.cancel(message, exception);
     } else {
@@ -357,6 +402,7 @@ public class FlightStream implements AutoCloseable {
   /**
    * Provides a callback to cancel a process that is in progress.
    */
+  @FunctionalInterface
   public interface Cancellable {
     void cancel(String message, Throwable exception);
   }
@@ -364,6 +410,7 @@ public class FlightStream implements AutoCloseable {
   /**
    * Provides a interface to request more items from a stream producer.
    */
+  @FunctionalInterface
   public interface Requestor {
     /**
      * Requests <code>count</code> more messages from the instance of this object.
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
new file mode 100644
index 0000000..194003c
--- /dev/null
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListener.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+
+/**
+ * An interface for writing data to a peer, client or server.
+ */
+public interface OutboundStreamListener {
+
+  /**
+   * A hint indicating whether the client is ready to receive data without excessive buffering.
+   *
+   * <p>Writers should poll this flag before sending data to respect backpressure from the client and
+   * avoid sending data faster than the client can handle. Ignoring this flag may mean that the server
+   * will start consuming excessive amounts of memory, as it may buffer messages in memory.
+   */
+  boolean isReady();
+
+  /**
+   * Start sending data, using the schema of the given {@link VectorSchemaRoot}.
+   *
+   * <p>This method must be called before all others, except {@link #putMetadata(ArrowBuf)}.
+   */
+  void start(VectorSchemaRoot root);
+
+  /**
+   * Start sending data, using the schema of the given {@link VectorSchemaRoot}.
+   *
+   * <p>This method must be called before all others.
+   */
+  void start(VectorSchemaRoot root, DictionaryProvider dictionaries);
+
+  /**
+   * Send the current contents of the associated {@link VectorSchemaRoot}.
+   *
+   * <p>This will not necessarily block until the message is actually sent; it may buffer messages
+   * in memory. Use {@link #isReady()} to check if there is backpressure and avoid excessive buffering.
+   */
+  void putNext();
+
+  /**
+   * Send the current contents of the associated {@link VectorSchemaRoot} alongside application-defined metadata.
+   * @param metadata The metadata to send. Ownership of the buffer is transferred to the Flight implementation.
+   */
+  void putNext(ArrowBuf metadata);
+
+  /**
+   * Send a pure metadata message without any associated data.
+   *
+   * <p>This may be called without starting the stream.
+   */
+  void putMetadata(ArrowBuf metadata);
+
+  /**
+   * Indicate an error to the client. Terminates the stream; do not call {@link #completed()} afterwards.
+   */
+  void error(Throwable ex);
+
+  /**
+   * Indicate that transmission is finished.
+   */
+  void completed();
+}
diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListenerImpl.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListenerImpl.java
new file mode 100644
index 0000000..c826c85
--- /dev/null
+++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListenerImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+
+import io.grpc.stub.CallStreamObserver;
+
+/**
+ * A base class for writing Arrow data to a Flight stream.
+ */
+abstract class OutboundStreamListenerImpl implements OutboundStreamListener {
+  private final FlightDescriptor descriptor; // nullable
+  protected final CallStreamObserver<ArrowMessage> responseObserver;
+  protected volatile VectorUnloader unloader; // null until stream started
+
+  OutboundStreamListenerImpl(FlightDescriptor descriptor, CallStreamObserver<ArrowMessage> responseObserver) {
+    Preconditions.checkNotNull(responseObserver, "responseObserver must be provided");
+    this.descriptor = descriptor;
+    this.responseObserver = responseObserver;
+    this.unloader = null;
+  }
+
+  @Override
+  public boolean isReady() {
+    return responseObserver.isReady();
+  }
+
+  @Override
+  public void start(VectorSchemaRoot root) {
+    start(root, new DictionaryProvider.MapDictionaryProvider());
+  }
+
+  @Override
+  public void start(VectorSchemaRoot root, DictionaryProvider dictionaries) {
+    try {
+      DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, dictionaries, responseObserver::onNext);
+    } catch (Exception e) {
+      // Only happens if closing buffers somehow fails - indicates application is an unknown state so propagate
+      // the exception
+      throw new RuntimeException("Could not generate and send all schema messages", e);
+    }
+    // We include the null count and align buffers to be compatible with Flight/C++
+    unloader = new VectorUnloader(root, /* includeNullCount */ true, /* alignBuffers */ true);
+  }
+
+  @Override
+  public void putNext() {
+    putNext(null);
+  }
+
+  /**
+   * Busy-wait until the stream is ready.
+   *
+   * <p>This is overridable as client/server have different behavior.
+   */
+  protected abstract void waitUntilStreamReady();
+
+  @Override
+  public void putNext(ArrowBuf metadata) {
+    if (unloader == null) {
+      throw CallStatus.INTERNAL.withDescription("Stream was not started, call start()").toRuntimeException();
+    }
+
+    waitUntilStreamReady();
+    // close is a no-op if the message has been written to gRPC, otherwise frees the associated buffers
+    // in some code paths (e.g. if the call is cancelled), gRPC does not write the message, so we need to clean up
+    // ourselves. Normally, writing the ArrowMessage will transfer ownership of the data to gRPC/Netty.
+    try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), metadata)) {
+      responseObserver.onNext(message);
+    } catch (Exception e) {
+      // This exception comes from ArrowMessage#close, not responseObserver#onNext.
+      // Generally this should not happen - ArrowMessage's implementation only closes non-throwing things.
+      // The user can't reasonably do anything about this, but if something does throw, we shouldn't let
+      // execution continue since other state (e.g. allocators) may be in an odd state.
+      throw new RuntimeException("Could not free ArrowMessage", e);
+    }
+  }
+
+  @Override
+  public void putMetadata(ArrowBuf metadata) {
+    waitUntilStreamReady();
+    try (final ArrowMessage message = new ArrowMessage(metadata)) {
+      responseObserver.onNext(message);
+    } catch (Exception e) {
+      throw StatusUtils.fromThrowable(e);
+    }
+  }
+
+  @Override
+  public void error(Throwable ex) {
+    responseObserver.onError(StatusUtils.toGrpcException(ex));
+  }
+
+  @Override
+  public void completed() {
+    responseObserver.onCompleted();
+  }
+}
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
index 3a6a676..8242bc0 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestBasicOperation.java
@@ -228,15 +228,18 @@ public class TestBasicOperation {
   @Test
   public void getStream() throws Exception {
     test(c -> {
-      FlightStream stream = c.getStream(new Ticket(new byte[0]));
-      VectorSchemaRoot root = stream.getRoot();
-      IntVector iv = (IntVector) root.getVector("c1");
-      int value = 0;
-      while (stream.next()) {
-        for (int i = 0; i < root.getRowCount(); i++) {
-          Assert.assertEquals(value, iv.get(i));
-          value++;
+      try (final FlightStream stream = c.getStream(new Ticket(new byte[0]))) {
+        VectorSchemaRoot root = stream.getRoot();
+        IntVector iv = (IntVector) root.getVector("c1");
+        int value = 0;
+        while (stream.next()) {
+          for (int i = 0; i < root.getRowCount(); i++) {
+            Assert.assertEquals(value, iv.get(i));
+            value++;
+          }
         }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
     });
   }
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
new file mode 100644
index 0000000..7aa95f7
--- /dev/null
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.IntStream;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.testing.ValueVectorDataPopulator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    // Send a particular descriptor to the server and check for a particular response pattern.
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();
+
+      // Server starts by sending a message without data (hence no VectorSchemaRoot should be present)
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(42, reader.getLatestMetadata().getInt(0));
+
+      // Write a metadata message to the server (without sending any data)
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+
+      // Check that the server echoed the metadata back to us
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+      // Close our write channel and ensure the server also closes theirs
+      stream.getWriter().completed();
+      assertFalse(reader.next());
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoGet() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_GET))) {
+      final FlightStream reader = stream.getReader();
+      VectorSchemaRoot root = reader.getRoot();
+      IntVector iv = (IntVector) root.getVector("a");
+      int value = 0;
+      while (reader.next()) {
+        for (int i = 0; i < root.getRowCount(); i++) {
+          assertFalse(String.format("Row %d should not be null", value), iv.isNull(i));
+          assertEquals(value, iv.get(i));
+          value++;
+        }
+      }
+      assertEquals(10, value);
+    }
+  }
+
+  /** Emulate a DoPut with a DoExchange. */
+  @Test
+  public void testDoExchangeDoPut() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_PUT));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      int counter = 0;
+      for (int i = 0; i < 10; i++) {
+        ValueVectorDataPopulator.setVector(iv, IntStream.range(0, i).boxed().toArray(Integer[]::new));
+        root.setRowCount(i);
+        counter += i;
+        stream.getWriter().putNext();
+
+        assertTrue(stream.getReader().next());
+        assertFalse(stream.getReader().hasRoot());
+        // For each write, the server sends back a metadata message containing the index of the last written batch
+        final ArrowBuf metadata = stream.getReader().getLatestMetadata();
+        assertEquals(counter, metadata.getInt(0));
+      }
+      stream.getWriter().completed();
+
+      while (stream.getReader().next()) {
+        // Drain the stream. Otherwise closing the stream sends a CANCEL which seriously screws with the server.
+        // CANCEL -> runs onCancel handler -> closes the FlightStream early
+      }
+    }
+  }
+
+  /** Test a DoExchange that echoes the client message. */
+  @Test
+  public void testDoExchangeEcho() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream = client.doExchange(FlightDescriptor.command(EXCHANGE_ECHO));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final FlightStream reader = stream.getReader();
+
+      // First try writing metadata without starting the Arrow data stream
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(42);
+      stream.getWriter().putMetadata(buf);
+      buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+
+      // Ensure that the server echoes the metadata back, also without starting its data stream
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(42, reader.getLatestMetadata().getInt(0));
+      assertTrue(reader.next());
+      assertFalse(reader.hasRoot());
+      assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+      // Write data and check that it gets echoed back.
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+      stream.getWriter().start(root);
+      for (int i = 0; i < 10; i++) {
+        iv.setSafe(0, i);
+        root.setRowCount(1);
+        stream.getWriter().putNext();
+
+        assertTrue(reader.next());
+        assertNull(reader.getLatestMetadata());
+        assertEquals(root.getSchema(), reader.getSchema());
+        assertEquals(i, ((IntVector) reader.getRoot().getVector("a")).get(0));
+      }
+
+      // Complete the stream so that the server knows not to expect any more messages from us.
+      stream.getWriter().completed();
+      // The server will end its side of the call, so this shouldn't block or indicate that
+      // there is more data.
+      assertFalse("We should not be waiting for any messages", reader.next());
+    }
+  }
+
+  /** Write some data, have it transformed, then read it back. */
+  @Test
+  public void testTransform() throws Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("a", new ArrowType.Int(32, true)),
+        Field.nullable("b", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_TRANSFORM))) {
+      // Write ten batches of data to the stream, where batch N contains N rows of data (N in [0, 10))
+      final FlightStream reader = stream.getReader();
+      final FlightClient.ClientStreamListener writer = stream.getWriter();
+      try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+        writer.start(root);
+        for (int batchIndex = 0; batchIndex < 10; batchIndex++) {
+          for (final FieldVector rawVec : root.getFieldVectors()) {
+            final IntVector vec = (IntVector) rawVec;
+            ValueVectorDataPopulator.setVector(vec, IntStream.range(0, batchIndex).boxed().toArray(Integer[]::new));
+          }
+          root.setRowCount(batchIndex);
+          writer.putNext();
+        }
+      }
+      // Indicate that we're done writing so that the server does not expect more data.
+      writer.completed();
+
+      // Read back data. We expect the server to double each value in each row of each batch.
+      assertEquals(schema, reader.getSchema());
+      final VectorSchemaRoot root = reader.getRoot();
+      for (int batchIndex = 0; batchIndex < 10; batchIndex++) {
+        assertTrue("Didn't receive batch #" + batchIndex, reader.next());
+        assertEquals(batchIndex, root.getRowCount());
+        for (final FieldVector rawVec : root.getFieldVectors()) {
+          final IntVector vec = (IntVector) rawVec;
+          for (int row = 0; row < batchIndex; row++) {
+            assertEquals(2 * row, vec.get(row));
+          }
+        }
+      }
+
+      // The server also sends back a metadata-only message containing the message count
+      assertTrue("There should be one extra message", reader.next());
+      assertEquals(10, reader.getLatestMetadata().getInt(0));
+      assertFalse("There should be no more data", reader.next());
+    }
+  }
+
+  static class Producer extends NoOpFlightProducer {
+    private final BufferAllocator allocator;
+
+    Producer(BufferAllocator allocator) {
+      this.allocator = allocator;
+    }
+
+    @Override
+    public void doExchange(CallContext context, FlightStream reader, ServerStreamListener writer) {
+      if (Arrays.equals(reader.getDescriptor().getCommand(), EXCHANGE_METADATA_ONLY)) {
+        metadataOnly(context, reader, writer);
+      } else if (Arrays.equals(reader.getDescriptor().getCommand(), EXCHANGE_DO_GET)) {
+        doGet(context, reader, writer);
+      } else if (Arrays.equals(reader.getDescriptor().getCommand(), EXCHANGE_DO_PUT)) {
+        doPut(context, reader, writer);
+      } else if (Arrays.equals(reader.getDescriptor().getCommand(), EXCHANGE_ECHO)) {
+        echo(context, reader, writer);
+      } else if (Arrays.equals(reader.getDescriptor().getCommand(), EXCHANGE_TRANSFORM)) {
+        transform(context, reader, writer);
+      } else {
+        writer.error(CallStatus.UNIMPLEMENTED.withDescription("Command not implemented").toRuntimeException());
+      }
+    }
+
+    /** Emulate DoGet. */
+    private void doGet(CallContext context, FlightStream reader, ServerStreamListener writer) {
+      final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+        writer.start(root);
+        root.allocateNew();
+        IntVector iv = (IntVector) root.getVector("a");
+
+        for (int i = 0; i < 10; i += 2) {
+          iv.set(0, i);
+          iv.set(1, i + 1);
+          root.setRowCount(2);
+          writer.putNext();
+        }
+      }
+      writer.completed();
+    }
+
+    /** Emulate DoPut. */
+    private void doPut(CallContext context, FlightStream reader, ServerStreamListener writer) {
+      int counter = 0;
+      while (reader.next()) {
+        if (!reader.hasRoot()) {
+          writer.error(CallStatus.INVALID_ARGUMENT.withDescription("Message has no data").toRuntimeException());
+          return;
+        }
+        counter += reader.getRoot().getRowCount();
+
+        final ArrowBuf pong = allocator.buffer(4);
+        pong.writeInt(counter);
+        writer.putMetadata(pong);
+      }
+      writer.completed();
+    }
+
+    /** Exchange metadata without ever exchanging data. */
+    private void metadataOnly(CallContext context, FlightStream reader, ServerStreamListener writer) {
+      final ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(42);
+      writer.putMetadata(buf);
+      assertTrue(reader.next());
+      assertNotNull(reader.getLatestMetadata());
+      reader.getLatestMetadata().getReferenceManager().retain();
+      writer.putMetadata(reader.getLatestMetadata());
+      writer.completed();
+    }
+
+    /** Echo the client's response back to it. */
+    private void echo(CallContext context, FlightStream reader, ServerStreamListener writer) {
+      VectorSchemaRoot root = null;
+      VectorLoader loader = null;
+      while (reader.next()) {
+        if (reader.hasRoot()) {
+          if (root == null) {
+            root = VectorSchemaRoot.create(reader.getSchema(), allocator);
+            loader = new VectorLoader(root);
+            writer.start(root);
+          }
+          VectorUnloader unloader = new VectorUnloader(reader.getRoot());
+          try (final ArrowRecordBatch arb = unloader.getRecordBatch()) {
+            loader.load(arb);
+          }
+          if (reader.getLatestMetadata() != null) {
+            reader.getLatestMetadata().getReferenceManager().retain();
+            writer.putNext(reader.getLatestMetadata());
+          } else {
+            writer.putNext();
+          }
+        } else {
+          // Pure metadata
+          reader.getLatestMetadata().getReferenceManager().retain();
+          writer.putMetadata(reader.getLatestMetadata());
+        }
+      }
+      if (root != null) {
+        root.close();
+      }
+      writer.completed();
+    }
+
+    /** Accept a set of messages, then return some result. */
+    private void transform(CallContext context, FlightStream reader, ServerStreamListener writer) {
+      final Schema schema = reader.getSchema();
+      for (final Field field : schema.getFields()) {
+        if (!(field.getType() instanceof ArrowType.Int)) {
+          writer.error(CallStatus.INVALID_ARGUMENT.withDescription("Invalid type: " + field).toRuntimeException());
+          return;
+        }
+        final ArrowType.Int intType = (ArrowType.Int) field.getType();
+        if (!intType.getIsSigned() || intType.getBitWidth() != 32) {
+          writer.error(CallStatus.INVALID_ARGUMENT.withDescription("Must be i32: " + field).toRuntimeException());
+          return;
+        }
+      }
+      int batches = 0;
+      try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+        writer.start(root);
+        final VectorLoader loader = new VectorLoader(root);
+        final VectorUnloader unloader = new VectorUnloader(reader.getRoot());
+        while (reader.next()) {
+          try (final ArrowRecordBatch batch = unloader.getRecordBatch()) {
+            loader.load(batch);
+          }
+          batches++;
+          for (final FieldVector rawVec : root.getFieldVectors()) {
+            final IntVector vec = (IntVector) rawVec;
+            for (int i = 0; i < root.getRowCount(); i++) {
+              if (!vec.isNull(i)) {
+                vec.set(i, vec.get(i) * 2);
+              }
+            }
+          }
+          writer.putNext();
+        }
+      }
+      final ArrowBuf count = allocator.buffer(4);
+      count.writeInt(batches);
+      writer.putMetadata(count);
+      writer.completed();
+    }
+  }
+}
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java
index b6d344f..02a21f2 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestErrorMetadata.java
@@ -48,12 +48,12 @@ public class TestErrorMetadata {
              FlightTestUtil.getStartedServer(
                (location) -> FlightServer.builder(allocator, location, new TestFlightProducer(perf)).build());
           final FlightClient client = FlightClient.builder(allocator, s.getLocation()).build()) {
-      FlightStream stream = client.getStream(new Ticket("abs".getBytes()));
-      stream.next();
-      Assert.fail();
-    } catch (FlightRuntimeException fre) {
+      final CallStatus flightStatus = FlightTestUtil.assertCode(FlightStatusCode.CANCELLED, () -> {
+        FlightStream stream = client.getStream(new Ticket("abs".getBytes()));
+        stream.next();
+      });
       PerfOuterClass.Perf newPerf = null;
-      ErrorFlightMetadata metadata = fre.status().metadata();
+      ErrorFlightMetadata metadata = flightStatus.metadata();
       Assert.assertNotNull(metadata);
       Assert.assertEquals(2, metadata.keys().size());
       Assert.assertTrue(metadata.containsKey("grpc-status-details-bin"));
diff --git a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
index 791e0b1..363ad44 100644
--- a/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
+++ b/java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestServerOptions.java
@@ -124,14 +124,15 @@ public class TestServerOptions {
                 (port) -> FlightServer.builder(a, location, producer).build()
             )) {
       try (FlightClient c = FlightClient.builder(a, location).build()) {
-        FlightStream stream = c.getStream(new Ticket(new byte[0]));
-        VectorSchemaRoot root = stream.getRoot();
-        IntVector iv = (IntVector) root.getVector("c1");
-        int value = 0;
-        while (stream.next()) {
-          for (int i = 0; i < root.getRowCount(); i++) {
-            Assert.assertEquals(value, iv.get(i));
-            value++;
+        try (FlightStream stream = c.getStream(new Ticket(new byte[0]))) {
+          VectorSchemaRoot root = stream.getRoot();
+          IntVector iv = (IntVector) root.getVector("c1");
+          int value = 0;
+          while (stream.next()) {
+            for (int i = 0; i < root.getRowCount(); i++) {
+              Assert.assertEquals(value, iv.get(i));
+              value++;
+            }
           }
         }
       }