You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/04/29 05:24:46 UTC

[GitHub] [arrow] emkornfield commented on a change in pull request #7012: ARROW-8555: [FlightRPC][Java] implement DoExchange

emkornfield commented on a change in pull request #7012:
URL: https://github.com/apache/arrow/pull/7012#discussion_r417067307



##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -154,6 +154,20 @@ public ArrowMessage(ArrowDictionaryBatch batch) {
     this.appMetadata = null;
   }
 
+  public ArrowMessage(ArrowBuf appMetadata) {

Review comment:
       please describe the contents of appMetadata

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -195,31 +196,29 @@ public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRo
    * @param root VectorSchemaRoot the root containing data
    * @param metadataListener A handler for metadata messages from the server.
    * @param options RPC-layer hints for this call.
-   * @return ClientStreamListener an interface to control uploading data
+   * @return ClientStreamListener an interface to control uploading data.
+   *     {@link ClientStreamListener#start(VectorSchemaRoot, DictionaryProvider)} will already have been called.
    */
   public ClientStreamListener startPut(FlightDescriptor descriptor, VectorSchemaRoot root, DictionaryProvider provider,
       PutListener metadataListener, CallOption... options) {
-    Preconditions.checkNotNull(descriptor);
-    Preconditions.checkNotNull(root);
+    Preconditions.checkNotNull(descriptor, "descriptor must not be null");

Review comment:
       IMO, the additional null messages don't necessarily add much, but feel free to keep them if you like them i guess.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -293,6 +292,76 @@ public void onCompleted() {
     return stream;
   }
 
+  /**
+   * Initiate a bidirectional data exchange with the server.
+   *
+   * @param descriptor A descriptor for the data stream.
+   * @param options RPC call options.
+   * @return A pair of a readable stream and a writable stream.
+   */
+  public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption... options) {
+    Preconditions.checkNotNull(descriptor);

Review comment:
       please try to be consistent on whether you include or don't include the message here.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightClient.java
##########
@@ -293,6 +292,76 @@ public void onCompleted() {
     return stream;
   }
 
+  /**
+   * Initiate a bidirectional data exchange with the server.
+   *
+   * @param descriptor A descriptor for the data stream.
+   * @param options RPC call options.
+   * @return A pair of a readable stream and a writable stream.
+   */
+  public ExchangeReaderWriter doExchange(FlightDescriptor descriptor, CallOption... options) {
+    Preconditions.checkNotNull(descriptor);
+    final io.grpc.CallOptions callOptions = CallOptions.wrapStub(asyncStub, options).getCallOptions();
+
+    try {
+      final ClientCall<ArrowMessage, ArrowMessage> call = interceptedChannel.newCall(doExchangeDescriptor, callOptions);
+      final FlightStream stream = new FlightStream(allocator, PENDING_REQUESTS, call::cancel, call::request);
+      final ClientCallStreamObserver<ArrowMessage> observer = (ClientCallStreamObserver<ArrowMessage>)
+              ClientCalls.asyncBidiStreamingCall(call, stream.asObserver());
+      final ClientStreamListener writer = new PutObserver(
+          descriptor, observer, stream.completed::isDone,
+          () -> {
+            try {
+              stream.completed.get();
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw CallStatus.INTERNAL.withDescription("Client error: interrupted").withCause(e).toRuntimeException();
+            } catch (ExecutionException e) {
+              throw CallStatus.INTERNAL.withDescription("Client error: " + e).withCause(e).toRuntimeException();
+            }
+          });
+      // Send the descriptor to start.
+      try (final ArrowMessage message = new ArrowMessage(descriptor.toProtocol())) {
+        observer.onNext(message);
+      } catch (Exception e) {
+        throw CallStatus.INTERNAL
+            .withCause(e)
+            .withDescription("Could not write descriptor message: " + e)

Review comment:
       doesn't appending the error message to the string become redundant if you set it a s as  cause?  more useful might be some additional metadata about this RPC.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/OutboundStreamListenerImpl.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import org.apache.arrow.flight.grpc.StatusUtils;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.dictionary.DictionaryProvider;
+
+import io.grpc.stub.CallStreamObserver;
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * A base class for writing Arrow data to a Flight stream.
+ */
+abstract class OutboundStreamListenerImpl implements OutboundStreamListener {
+  private final FlightDescriptor descriptor; // nullable
+  protected final CallStreamObserver<ArrowMessage> responseObserver;
+  protected volatile VectorUnloader unloader; // null until stream started
+
+  OutboundStreamListenerImpl(FlightDescriptor descriptor, CallStreamObserver<ArrowMessage> responseObserver) {
+    Preconditions.checkNotNull(responseObserver, "responseObserver must be provided");
+    this.descriptor = descriptor;
+    this.responseObserver = responseObserver;
+    this.unloader = null;
+  }
+
+  @Override
+  public boolean isReady() {
+    return responseObserver.isReady();
+  }
+
+  @Override
+  public void start(VectorSchemaRoot root) {
+    start(root, new DictionaryProvider.MapDictionaryProvider());
+  }
+
+  @Override
+  public void start(VectorSchemaRoot root, DictionaryProvider dictionaries) {
+    try {
+      DictionaryUtils.generateSchemaMessages(root.getSchema(), descriptor, dictionaries, responseObserver::onNext);
+    } catch (Exception e) {
+      // Only happens if closing buffers somehow fails - indicates application is an unknown state so propagate
+      // the exception
+      throw new RuntimeException("Could not generate and send all schema messages", e);
+    }
+    unloader = new VectorUnloader(root, /* include # of nulls in vectors */ true,
+        /* must align buffers to be C++-compatible */ true);

Review comment:
       i'd prefer to see actual parameter names here and the comments above.  it might be cleaner to actually name the contants individually and comment on them where they are named and pass them through here.

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightService.java
##########
@@ -155,79 +152,35 @@ public void setOnCancelHandler(Runnable handler) {
       this.onCancelHandler = handler;
     }
 
-    @Override
-    public boolean isReady() {
-      return responseObserver.isReady();
-    }
-
     @Override
     public boolean isCancelled() {
       return responseObserver.isCancelled();
     }
 
     @Override
-    public void start(VectorSchemaRoot root) {
-      start(root, new MapDictionaryProvider());
-    }
-
-    @Override
-    public void start(VectorSchemaRoot root, DictionaryProvider provider) {
-      unloader = new VectorUnloader(root, true, true);
-
-      try {
-        DictionaryUtils.generateSchemaMessages(root.getSchema(), null, provider, responseObserver::onNext);
-      } catch (Exception e) {
-        // Only happens if closing buffers somehow fails - indicates application is an unknown state so propagate
-        // the exception
-        throw new RuntimeException("Could not generate and send all schema messages", e);
-      }
-    }
-
-    @Override
-    public void putNext() {
-      putNext(null);
-    }
-
-    @Override
-    public void putNext(ArrowBuf metadata) {
-      Preconditions.checkNotNull(unloader);
-      // close is a no-op if the message has been written to gRPC, otherwise frees the associated buffers
-      // in some code paths (e.g. if the call is cancelled), gRPC does not write the message, so we need to clean up
-      // ourselves. Normally, writing the ArrowMessage will transfer ownership of the data to gRPC/Netty.
-      try (final ArrowMessage message = new ArrowMessage(unloader.getRecordBatch(), metadata)) {
-        responseObserver.onNext(message);
-      } catch (Exception e) {
-        // This exception comes from ArrowMessage#close, not responseObserver#onNext.
-        // Generally this should not happen - ArrowMessage's implementation only closes non-throwing things.
-        // The user can't reasonably do anything about this, but if something does throw, we shouldn't let
-        // execution continue since other state (e.g. allocators) may be in an odd state.
-        throw new RuntimeException("Could not free ArrowMessage", e);
-      }
+    protected void waitUntilStreamReady() {
+      // Don't do anything - service implementations are expected to manage backpressure themselves

Review comment:
       is this generally possible, do you need to provide a hook here for services to implement?

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();
+      Assert.assertTrue(reader.next());

Review comment:
       static import asserts, I believe this is more consistent with our codebase.

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(42, reader.getLatestMetadata().getInt(0));
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(84, reader.getLatestMetadata().getInt(0));
+      stream.getWriter().completed();
+      Assert.assertFalse(reader.next());
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoGet() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_GET))) {
+      final FlightStream reader = stream.getReader();
+      VectorSchemaRoot root = reader.getRoot();
+      IntVector iv = (IntVector) root.getVector("a");
+      int value = 0;
+      while (reader.next()) {
+        for (int i = 0; i < root.getRowCount(); i++) {
+          Assert.assertFalse(String.format("Row %d should not be null", value), iv.isNull(i));
+          Assert.assertEquals(value, iv.get(i));
+          value++;
+        }
+      }
+      Assert.assertEquals(10, value);
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoPut() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_PUT));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      int counter = 0;
+      for (int i = 0; i < 10; i++) {
+        for (int row = 0; row < i; row++) {
+          iv.setSafe(row, row);
+        }
+        root.setRowCount(i);
+        counter += i;
+        stream.getWriter().putNext();
+
+        Assert.assertTrue(stream.getReader().next());
+        Assert.assertFalse(stream.getReader().hasRoot());
+        final ArrowBuf metadata = stream.getReader().getLatestMetadata();
+        Assert.assertNotNull(metadata);
+        Assert.assertEquals(counter, metadata.getInt(0));
+      }
+      stream.getWriter().completed();
+
+      while (stream.getReader().next()) {
+        // Drain the stream. Otherwise closing the stream sends a CANCEL which seriously screws with the server.
+        // CANCEL -> runs onCancel handler -> closes the FlightStream early
+      }
+    }
+  }
+
+  /** Test a DoExchange that echoes the client message. */
+  @Test
+  public void testDoExchangeEcho() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream = client.doExchange(FlightDescriptor.command(EXCHANGE_ECHO));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final FlightStream reader = stream.getReader();
+
+      // First try writing metadata without starting the Arrow data stream
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(42);
+      stream.getWriter().putMetadata(buf);
+      buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(42, reader.getLatestMetadata().getInt(0));
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      for (int i = 0; i < 10; i++) {
+        iv.setSafe(0, i);
+        root.setRowCount(1);
+        if (i % 2 == 0) {
+          stream.getWriter().putNext();
+        } else {
+          buf = allocator.buffer(4);
+          buf.writeInt(i);
+          stream.getWriter().putNext(buf);
+        }
+
+        Assert.assertTrue(reader.next());
+        Assert.assertTrue(reader.hasRoot());
+        final ArrowBuf metadata = reader.getLatestMetadata();
+        if (i % 2 == 0) {
+          Assert.assertNull(metadata);
+        } else {
+          Assert.assertNotNull(metadata);
+          Assert.assertEquals(i, metadata.getInt(0));
+        }
+        Assert.assertEquals(root.getSchema(), reader.getSchema());
+        Assert.assertEquals(i, ((IntVector) reader.getRoot().getVector("a")).get(0));
+      }
+
+      buf = allocator.buffer(4);
+      buf.writeInt(126);
+      stream.getWriter().putMetadata(buf);
+      Assert.assertTrue(reader.next());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(126, reader.getLatestMetadata().getInt(0));
+
+      stream.getWriter().completed();
+
+      // Ensure the stream is drained. Else, we race with the server for shutdown: we'll go and shut down the
+      // server/allocator before it can finish cleanup.
+      Assert.assertFalse(reader.next());
+    }
+  }
+
+  /** Write some data, have it transformed, then read it back. */
+  @Test
+  public void testTransform() throws Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("a", new ArrowType.Int(32, true)),
+        Field.nullable("b", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_TRANSFORM))) {
+      // Write data to the stream
+      final FlightStream reader = stream.getReader();
+      final FlightClient.ClientStreamListener writer = stream.getWriter();
+      try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+        writer.start(root);
+        for (int batchIndex = 0; batchIndex < 10; batchIndex++) {
+          for (final FieldVector rawVec : root.getFieldVectors()) {
+            final IntVector vec = (IntVector) rawVec;
+            for (int row = 0; row < batchIndex; row++) {
+              vec.setSafe(row, row);
+            }
+          }
+          root.setRowCount(batchIndex);
+          writer.putNext();
+        }
+      }
+      // Indicate that we're done writing so that the server does not expect more data.
+      writer.completed();
+
+      // Start reading back data.
+      Assert.assertEquals(schema, reader.getSchema());
+      final VectorSchemaRoot root = reader.getRoot();
+      for (int batchIndex = 0; batchIndex < 10; batchIndex++) {

Review comment:
       same comment as above on complexity of tests.

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();

Review comment:
       this test is a little dense could you divide up into blocks and provide some comments?

##########
File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightStream.java
##########
@@ -170,21 +182,17 @@ public void close() throws Exception {
    */
   public boolean next() {
     try {
-      // make sure we have the root
-      root.get().clear();
-
-      if (completed && queue.isEmpty()) {
+      if (completed.isDone() && queue.isEmpty()) {
         return false;
       }
 
-
       pending--;
       requestOutstanding();
 
       Object data = queue.take();
       if (DONE == data) {
         queue.put(DONE);
-        completed = true;
+        completed.complete(null);

Review comment:
       please document what null is here.  and if that doesn't make it clear, why is null being passed here?

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(42, reader.getLatestMetadata().getInt(0));
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(84, reader.getLatestMetadata().getInt(0));
+      stream.getWriter().completed();
+      Assert.assertFalse(reader.next());
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoGet() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_GET))) {
+      final FlightStream reader = stream.getReader();
+      VectorSchemaRoot root = reader.getRoot();
+      IntVector iv = (IntVector) root.getVector("a");
+      int value = 0;
+      while (reader.next()) {
+        for (int i = 0; i < root.getRowCount(); i++) {
+          Assert.assertFalse(String.format("Row %d should not be null", value), iv.isNull(i));
+          Assert.assertEquals(value, iv.get(i));
+          value++;
+        }
+      }
+      Assert.assertEquals(10, value);
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoPut() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_PUT));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      int counter = 0;
+      for (int i = 0; i < 10; i++) {
+        for (int row = 0; row < i; row++) {

Review comment:
       can you use vector populator here to make this code clearer?

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(42, reader.getLatestMetadata().getInt(0));
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(84, reader.getLatestMetadata().getInt(0));
+      stream.getWriter().completed();
+      Assert.assertFalse(reader.next());
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoGet() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_GET))) {
+      final FlightStream reader = stream.getReader();
+      VectorSchemaRoot root = reader.getRoot();
+      IntVector iv = (IntVector) root.getVector("a");
+      int value = 0;
+      while (reader.next()) {
+        for (int i = 0; i < root.getRowCount(); i++) {
+          Assert.assertFalse(String.format("Row %d should not be null", value), iv.isNull(i));
+          Assert.assertEquals(value, iv.get(i));
+          value++;
+        }
+      }
+      Assert.assertEquals(10, value);
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoPut() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_PUT));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      int counter = 0;
+      for (int i = 0; i < 10; i++) {
+        for (int row = 0; row < i; row++) {
+          iv.setSafe(row, row);
+        }
+        root.setRowCount(i);
+        counter += i;
+        stream.getWriter().putNext();
+
+        Assert.assertTrue(stream.getReader().next());
+        Assert.assertFalse(stream.getReader().hasRoot());
+        final ArrowBuf metadata = stream.getReader().getLatestMetadata();
+        Assert.assertNotNull(metadata);
+        Assert.assertEquals(counter, metadata.getInt(0));
+      }
+      stream.getWriter().completed();
+
+      while (stream.getReader().next()) {
+        // Drain the stream. Otherwise closing the stream sends a CANCEL which seriously screws with the server.
+        // CANCEL -> runs onCancel handler -> closes the FlightStream early
+      }
+    }
+  }
+
+  /** Test a DoExchange that echoes the client message. */
+  @Test
+  public void testDoExchangeEcho() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream = client.doExchange(FlightDescriptor.command(EXCHANGE_ECHO));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final FlightStream reader = stream.getReader();
+
+      // First try writing metadata without starting the Arrow data stream
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(42);
+      stream.getWriter().putMetadata(buf);
+      buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(42, reader.getLatestMetadata().getInt(0));
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      for (int i = 0; i < 10; i++) {
+        iv.setSafe(0, i);
+        root.setRowCount(1);
+        if (i % 2 == 0) {
+          stream.getWriter().putNext();
+        } else {
+          buf = allocator.buffer(4);
+          buf.writeInt(i);
+          stream.getWriter().putNext(buf);
+        }
+
+        Assert.assertTrue(reader.next());
+        Assert.assertTrue(reader.hasRoot());
+        final ArrowBuf metadata = reader.getLatestMetadata();
+        if (i % 2 == 0) {
+          Assert.assertNull(metadata);
+        } else {
+          Assert.assertNotNull(metadata);
+          Assert.assertEquals(i, metadata.getInt(0));
+        }
+        Assert.assertEquals(root.getSchema(), reader.getSchema());
+        Assert.assertEquals(i, ((IntVector) reader.getRoot().getVector("a")).get(0));
+      }
+
+      buf = allocator.buffer(4);
+      buf.writeInt(126);
+      stream.getWriter().putMetadata(buf);
+      Assert.assertTrue(reader.next());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(126, reader.getLatestMetadata().getInt(0));
+
+      stream.getWriter().completed();
+
+      // Ensure the stream is drained. Else, we race with the server for shutdown: we'll go and shut down the
+      // server/allocator before it can finish cleanup.
+      Assert.assertFalse(reader.next());
+    }
+  }
+
+  /** Write some data, have it transformed, then read it back. */
+  @Test
+  public void testTransform() throws Exception {
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("a", new ArrowType.Int(32, true)),
+        Field.nullable("b", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_TRANSFORM))) {
+      // Write data to the stream
+      final FlightStream reader = stream.getReader();
+      final FlightClient.ClientStreamListener writer = stream.getWriter();
+      try (final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+        writer.start(root);
+        for (int batchIndex = 0; batchIndex < 10; batchIndex++) {
+          for (final FieldVector rawVec : root.getFieldVectors()) {
+            final IntVector vec = (IntVector) rawVec;
+            for (int row = 0; row < batchIndex; row++) {
+              vec.setSafe(row, row);
+            }
+          }
+          root.setRowCount(batchIndex);
+          writer.putNext();
+        }
+      }
+      // Indicate that we're done writing so that the server does not expect more data.
+      writer.completed();
+
+      // Start reading back data.
+      Assert.assertEquals(schema, reader.getSchema());
+      final VectorSchemaRoot root = reader.getRoot();
+      for (int batchIndex = 0; batchIndex < 10; batchIndex++) {
+        Assert.assertTrue("We got back batch " + batchIndex, reader.next());

Review comment:
       should this read "No batch received" (is boolean actually communicated on failure?)

##########
File path: java/flight/flight-core/src/test/java/org/apache/arrow/flight/TestDoExchange.java
##########
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VectorLoader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import io.netty.buffer.ArrowBuf;
+
+public class TestDoExchange {
+  static byte[] EXCHANGE_DO_GET = "do-get".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_DO_PUT = "do-put".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_ECHO = "echo".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_METADATA_ONLY = "only-metadata".getBytes(StandardCharsets.UTF_8);
+  static byte[] EXCHANGE_TRANSFORM = "transform".getBytes(StandardCharsets.UTF_8);
+
+  private BufferAllocator allocator;
+  private FlightServer server;
+  private FlightClient client;
+
+  @Before
+  public void setUp() throws Exception {
+    allocator = new RootAllocator(Integer.MAX_VALUE);
+    final Location serverLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, 0);
+    server = FlightServer.builder(allocator, serverLocation, new Producer(allocator)).build();
+    server.start();
+    final Location clientLocation = Location.forGrpcInsecure(FlightTestUtil.LOCALHOST, server.getPort());
+    client = FlightClient.builder(allocator, clientLocation).build();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    AutoCloseables.close(client, server, allocator);
+  }
+
+  /** Test a pure-metadata flow. */
+  @Test
+  public void testDoExchangeOnlyMetadata() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_METADATA_ONLY))) {
+      final FlightStream reader = stream.getReader();
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(42, reader.getLatestMetadata().getInt(0));
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(84, reader.getLatestMetadata().getInt(0));
+      stream.getWriter().completed();
+      Assert.assertFalse(reader.next());
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoGet() throws Exception {
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_GET))) {
+      final FlightStream reader = stream.getReader();
+      VectorSchemaRoot root = reader.getRoot();
+      IntVector iv = (IntVector) root.getVector("a");
+      int value = 0;
+      while (reader.next()) {
+        for (int i = 0; i < root.getRowCount(); i++) {
+          Assert.assertFalse(String.format("Row %d should not be null", value), iv.isNull(i));
+          Assert.assertEquals(value, iv.get(i));
+          value++;
+        }
+      }
+      Assert.assertEquals(10, value);
+    }
+  }
+
+  /** Emulate a DoGet with a DoExchange. */
+  @Test
+  public void testDoExchangeDoPut() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream =
+             client.doExchange(FlightDescriptor.command(EXCHANGE_DO_PUT));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      int counter = 0;
+      for (int i = 0; i < 10; i++) {
+        for (int row = 0; row < i; row++) {
+          iv.setSafe(row, row);
+        }
+        root.setRowCount(i);
+        counter += i;
+        stream.getWriter().putNext();
+
+        Assert.assertTrue(stream.getReader().next());
+        Assert.assertFalse(stream.getReader().hasRoot());
+        final ArrowBuf metadata = stream.getReader().getLatestMetadata();
+        Assert.assertNotNull(metadata);
+        Assert.assertEquals(counter, metadata.getInt(0));
+      }
+      stream.getWriter().completed();
+
+      while (stream.getReader().next()) {
+        // Drain the stream. Otherwise closing the stream sends a CANCEL which seriously screws with the server.
+        // CANCEL -> runs onCancel handler -> closes the FlightStream early
+      }
+    }
+  }
+
+  /** Test a DoExchange that echoes the client message. */
+  @Test
+  public void testDoExchangeEcho() throws Exception {
+    final Schema schema = new Schema(Collections.singletonList(Field.nullable("a", new ArrowType.Int(32, true))));
+    try (final FlightClient.ExchangeReaderWriter stream = client.doExchange(FlightDescriptor.command(EXCHANGE_ECHO));
+         final VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+      final FlightStream reader = stream.getReader();
+
+      // First try writing metadata without starting the Arrow data stream
+      ArrowBuf buf = allocator.buffer(4);
+      buf.writeInt(42);
+      stream.getWriter().putMetadata(buf);
+      buf = allocator.buffer(4);
+      buf.writeInt(84);
+      stream.getWriter().putMetadata(buf);
+
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(42, reader.getLatestMetadata().getInt(0));
+      Assert.assertTrue(reader.next());
+      Assert.assertFalse(reader.hasRoot());
+      Assert.assertNotNull(reader.getLatestMetadata());
+      Assert.assertEquals(84, reader.getLatestMetadata().getInt(0));
+
+      IntVector iv = (IntVector) root.getVector("a");
+      iv.allocateNew();
+
+      stream.getWriter().start(root);
+      for (int i = 0; i < 10; i++) {

Review comment:
       this seems very complicated for a test, is there a way you could make it clearer what is being tested?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org