You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2020/05/26 15:33:27 UTC

[arrow] branch master updated: ARROW-8297: [FlightRPC][C++] Implement Flight DoExchange for C++

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 391de22  ARROW-8297: [FlightRPC][C++] Implement Flight DoExchange for C++
391de22 is described below

commit 391de222933d345a8255d2b874c563b13484af42
Author: David Li <li...@gmail.com>
AuthorDate: Tue May 26 10:32:18 2020 -0500

    ARROW-8297: [FlightRPC][C++] Implement Flight DoExchange for C++
    
    This is a draft implementation of DoExchange that I'm submitting as part of the mailing list discussion. A simple usage demo is in flight_test.cc and test_util.cc.
    
    Overall, I've tried to share the implementation with DoGet/DoPut as much as possible, changing some classes to be templated. I've also taken the opportunity to clean up some things (e.g. the DoGet implementation on the client side now uses a peekable iterator instead of the messy circularly-dependent classes of before).
    
    - [x] Fix undefined behavior in zero-copy deserialization due to not handling "inlined" slices from gRPC
    - [x] Clean up the public API
      - [x] FlightDescriptor is not exposed to the server
      - [x] Unnecessary Begin/Open methods in some places
      - [x] API docs in Flight.proto
    - [x] More complete test suite to cover different usage patterns
    - [x] More comprehensive error handling/propagation of errors from the gRPC layer
    - [x] Fix not-yet-implemented parts of API
    - [ ] Look for other opportunities to share implementation between DoGet/DoPut/DoExchange
    
    Closes #6656 from lidavidm/doexchange
    
    Authored-by: David Li <li...@gmail.com>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/arrow/flight/client.cc                  | 629 +++++++++++++++++-------
 cpp/src/arrow/flight/client.h                   |  13 +-
 cpp/src/arrow/flight/flight_test.cc             | 288 ++++++++++-
 cpp/src/arrow/flight/internal.cc                |  12 +
 cpp/src/arrow/flight/internal.h                 |   2 +
 cpp/src/arrow/flight/middleware.h               |   1 +
 cpp/src/arrow/flight/serialization_internal.cc  | 229 ++++++---
 cpp/src/arrow/flight/serialization_internal.h   |  83 +++-
 cpp/src/arrow/flight/server.cc                  | 261 +++++++---
 cpp/src/arrow/flight/server.h                   |  20 +
 cpp/src/arrow/flight/test_integration_client.cc |   9 +-
 cpp/src/arrow/flight/test_integration_server.cc |   2 +-
 cpp/src/arrow/flight/test_util.cc               | 181 +++++++
 cpp/src/arrow/flight/types.cc                   |   3 +-
 cpp/src/arrow/flight/types.h                    |  14 +-
 python/pyarrow/_flight.pyx                      |   7 +-
 python/pyarrow/includes/libarrow_flight.pxd     |   2 +-
 17 files changed, 1421 insertions(+), 335 deletions(-)

diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index 378f3ec..cf3b630 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -74,12 +74,6 @@ struct ClientRpc {
     }
   }
 
-  Status IOError(const std::string& error_message) {
-    std::stringstream ss;
-    ss << error_message << context.debug_error_string();
-    return Status::IOError(ss.str());
-  }
-
   /// \brief Add an auth token via an auth handler
   Status SetToken(ClientAuthHandler* auth_handler) {
     if (auth_handler) {
@@ -91,6 +85,137 @@ struct ClientRpc {
   }
 };
 
+/// Helper that manages Finish() of a gRPC stream.
+///
+/// When we encounter an error (e.g. could not decode an IPC message),
+/// we want to provide both the client-side error context and any
+/// available server-side context. This helper helps wrap up that
+/// logic.
+///
+/// This class protects the stream with a flag (so that Finish is
+/// idempotent), and drains the read side (so that Finish won't hang).
+///
+/// The template lets us abstract between DoGet/DoExchange and DoPut,
+/// which respectively read internal::FlightData and pb::PutResult.
+template <typename Stream, typename ReadT>
+class FinishableStream {
+ public:
+  FinishableStream(std::shared_ptr<ClientRpc> rpc, std::shared_ptr<Stream> stream)
+      : rpc_(rpc), stream_(stream), finished_(false), server_status_() {}
+  virtual ~FinishableStream() = default;
+
+  /// \brief Get the underlying stream.
+  std::shared_ptr<Stream> stream() const { return stream_; }
+
+  /// \brief Finish the call, adding server context to the given status.
+  virtual Status Finish(Status&& st) {
+    if (finished_) {
+      return MergeStatus(std::move(st));
+    }
+
+    // Drain the read side, as otherwise gRPC Finish() will hang. We
+    // only call Finish() when the client closes the writer or the
+    // reader finishes, so it's OK to assume the client no longer
+    // wants to read and drain the read side. (If the client wants to
+    // indicate that it is done writing, but not done reading, it
+    // should use DoneWriting.
+    ReadT message;
+    while (internal::ReadPayload(stream_.get(), &message)) {
+      // Drain the read side to avoid gRPC hanging in Finish()
+    }
+
+    server_status_ = internal::FromGrpcStatus(stream_->Finish(), &rpc_->context);
+    finished_ = true;
+
+    return MergeStatus(std::move(st));
+  }
+
+ private:
+  Status MergeStatus(Status&& st) {
+    if (server_status_.ok()) {
+      return std::move(st);
+    }
+    return Status::FromDetailAndArgs(
+        server_status_.code(), server_status_.detail(), server_status_.message(),
+        ". Client context: ", st.ToString(),
+        ". gRPC client debug context: ", rpc_->context.debug_error_string());
+  }
+
+  std::shared_ptr<ClientRpc> rpc_;
+  std::shared_ptr<Stream> stream_;
+  bool finished_;
+  Status server_status_;
+};
+
+/// Helper that manages \a Finish() of a read-write gRPC stream.
+///
+/// This also calls \a WritesDone() and protects itself with a mutex
+/// to enable sharing between the reader and writer.
+template <typename Stream, typename ReadT>
+class FinishableWritableStream : public FinishableStream<Stream, ReadT> {
+ public:
+  FinishableWritableStream(std::shared_ptr<ClientRpc> rpc,
+                           std::shared_ptr<std::mutex> read_mutex,
+                           std::shared_ptr<Stream> stream)
+      : FinishableStream<Stream, ReadT>(rpc, stream),
+        finish_mutex_(),
+        read_mutex_(read_mutex),
+        done_writing_(false) {}
+  virtual ~FinishableWritableStream() = default;
+
+  /// \brief Indicate to gRPC that the write half of the stream is done.
+  Status DoneWriting() {
+    // This is only used by the writer side of a stream, so it need
+    // not be protected with a lock.
+    if (done_writing_) {
+      return Status::OK();
+    }
+    done_writing_ = true;
+    if (!this->stream()->WritesDone()) {
+      // Error happened, try to close the stream to get more detailed info
+      return Finish(MakeFlightError(FlightStatusCode::Internal,
+                                    "Could not flush pending record batches"));
+    }
+    return Status::OK();
+  }
+
+  Status Finish(Status&& st) override {
+    // This may be used concurrently by reader/writer side of a
+    // stream, so it needs to be protected.
+    std::lock_guard<std::mutex> guard(finish_mutex_);
+
+    // Now that we're shared between a reader and writer, we need to
+    // protect ourselves from being called while there's an
+    // outstanding read.
+    std::unique_lock<std::mutex> read_guard(*read_mutex_, std::try_to_lock);
+    if (!read_guard.owns_lock()) {
+      return Finish(MakeFlightError(
+          FlightStatusCode::Internal,
+          "Cannot close stream with pending read operation. Client context: " +
+              st.ToString()));
+    }
+
+    // Try to flush pending writes. Don't use our WritesDone() to
+    // avoid recursion.
+    bool finished_writes = done_writing_ || this->stream()->WritesDone();
+    done_writing_ = true;
+
+    Status result = FinishableStream<Stream, ReadT>::Finish(std::move(st));
+
+    if (!finished_writes) {
+      return Status::FromDetailAndArgs(
+          result.code(), result.detail(), result.message(),
+          ". Additionally, could not finish writing record batches before closing");
+    }
+    return result;
+  }
+
+ private:
+  std::mutex finish_mutex_;
+  std::shared_ptr<std::mutex> read_mutex_;
+  bool done_writing_;
+};
+
 class GrpcAddCallHeaders : public AddCallHeaders {
  public:
   explicit GrpcAddCallHeaders(std::multimap<grpc::string, grpc::string>* metadata)
@@ -176,6 +301,8 @@ class GrpcClientInterceptorAdapterFactory
       flight_method = FlightMethod::DoGet;
     } else if (method.ends_with("/DoPut")) {
       flight_method = FlightMethod::DoPut;
+    } else if (method.ends_with("/DoExchange")) {
+      flight_method = FlightMethod::DoExchange;
     } else if (method.ends_with("/DoAction")) {
       flight_method = FlightMethod::DoAction;
     } else if (method.ends_with("/ListActions")) {
@@ -243,179 +370,295 @@ class GrpcClientAuthReader : public ClientAuthReader {
       stream_;
 };
 
-// The next two classes are intertwined. To get the application
-// metadata while avoiding reimplementing RecordBatchStreamReader, we
-// create an ipc::MessageReader that is tied to the
-// MetadataRecordBatchReader. Every time an IPC message is read, it updates
-// the application metadata field of the MetadataRecordBatchReader. The
-// MetadataRecordBatchReader wraps RecordBatchStreamReader, offering an
-// additional method to get both the record batch and application
-// metadata.
-
-class GrpcIpcMessageReader;
-class GrpcStreamReader : public FlightStreamReader {
- public:
-  GrpcStreamReader();
-
-  static Status Open(std::unique_ptr<ClientRpc> rpc,
-                     std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream,
-                     std::unique_ptr<GrpcStreamReader>* out);
-  std::shared_ptr<Schema> schema() const override;
-  Status Next(FlightStreamChunk* out) override;
-  void Cancel() override;
-
- private:
-  friend class GrpcIpcMessageReader;
-  std::shared_ptr<ipc::RecordBatchReader> batch_reader_;
-  std::shared_ptr<Buffer> last_app_metadata_;
-  std::shared_ptr<ClientRpc> rpc_;
-};
-
+// An ipc::MessageReader that adapts any readable gRPC stream
+// returning FlightData.
+template <typename Reader>
 class GrpcIpcMessageReader : public ipc::MessageReader {
  public:
-  GrpcIpcMessageReader(GrpcStreamReader* reader, std::shared_ptr<ClientRpc> rpc,
-                       std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream)
-      : flight_reader_(reader),
-        rpc_(rpc),
+  GrpcIpcMessageReader(
+      std::shared_ptr<ClientRpc> rpc, std::shared_ptr<std::mutex> read_mutex,
+      std::shared_ptr<FinishableStream<Reader, internal::FlightData>> stream,
+      std::shared_ptr<internal::PeekableFlightDataReader<std::shared_ptr<Reader>>>
+          peekable_reader,
+      std::shared_ptr<Buffer>* app_metadata)
+      : rpc_(rpc),
+        read_mutex_(read_mutex),
         stream_(std::move(stream)),
+        peekable_reader_(peekable_reader),
+        app_metadata_(app_metadata),
         stream_finished_(false) {}
 
   ::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
-    std::unique_ptr<ipc::Message> out;
-    RETURN_NOT_OK(GetNextMessage(&out));
-    return std::move(out);
-  }
-
- protected:
-  Status GetNextMessage(std::unique_ptr<ipc::Message>* out) {
-    // TODO: Use Result APIs
     if (stream_finished_) {
-      *out = nullptr;
-      flight_reader_->last_app_metadata_ = nullptr;
-      return Status::OK();
+      return nullptr;
+    }
+    internal::FlightData* data;
+    {
+      auto guard = read_mutex_ ? std::unique_lock<std::mutex>(*read_mutex_)
+                               : std::unique_lock<std::mutex>();
+      peekable_reader_->Next(&data);
     }
-    internal::FlightData data;
-    if (!internal::ReadPayload(stream_.get(), &data)) {
-      // Stream is completed
+    if (!data) {
       stream_finished_ = true;
-      *out = nullptr;
-      flight_reader_->last_app_metadata_ = nullptr;
-      return OverrideWithServerError(Status::OK());
+      return stream_->Finish(Status::OK());
     }
     // Validate IPC message
-    auto st = data.OpenMessage(out);
-    if (!st.ok()) {
-      flight_reader_->last_app_metadata_ = nullptr;
-      return OverrideWithServerError(std::move(st));
+    auto result = data->OpenMessage();
+    if (!result.ok()) {
+      return stream_->Finish(result.status());
     }
-    flight_reader_->last_app_metadata_ = data.app_metadata;
-    return Status::OK();
-  }
-
-  Status OverrideWithServerError(Status&& st) {
-    // Get the gRPC status if not OK, to propagate any server error message
-    RETURN_NOT_OK(internal::FromGrpcStatus(stream_->Finish(), &rpc_->context));
-    return std::move(st);
+    *app_metadata_ = std::move(data->app_metadata);
+    return std::move(result);
   }
 
  private:
-  GrpcStreamReader* flight_reader_;
   // The RPC context lifetime must be coupled to the ClientReader
   std::shared_ptr<ClientRpc> rpc_;
-  std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream_;
+  // Guard reads with a mutex to prevent concurrent reads if the write
+  // side calls Finish(). Nullable as DoGet doesn't need this.
+  std::shared_ptr<std::mutex> read_mutex_;
+  std::shared_ptr<FinishableStream<Reader, internal::FlightData>> stream_;
+  std::shared_ptr<internal::PeekableFlightDataReader<std::shared_ptr<Reader>>>
+      peekable_reader_;
+  // A reference to GrpcStreamReader.app_metadata_. That class
+  // can't access the app metadata because when it Peek()s the stream,
+  // it may be looking at a dictionary batch, not the record
+  // batch. Updating it here ensures the reader is always updated with
+  // the last metadata message read.
+  std::shared_ptr<Buffer>* app_metadata_;
   bool stream_finished_;
 };
 
-GrpcStreamReader::GrpcStreamReader() {}
-
-Status GrpcStreamReader::Open(std::unique_ptr<ClientRpc> rpc,
-                              std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream,
-                              std::unique_ptr<GrpcStreamReader>* out) {
-  *out = std::unique_ptr<GrpcStreamReader>(new GrpcStreamReader);
-  out->get()->rpc_ = std::move(rpc);
-  auto reader = std::unique_ptr<ipc::MessageReader>(
-      new GrpcIpcMessageReader(out->get(), out->get()->rpc_, std::move(stream)));
-  ARROW_ASSIGN_OR_RAISE((*out)->batch_reader_,
-                        ipc::RecordBatchStreamReader::Open(std::move(reader)));
-  return Status::OK();
-}
+/// The implementation of the public-facing API for reading from a
+/// FlightData stream
+template <typename Reader>
+class GrpcStreamReader : public FlightStreamReader {
+ public:
+  GrpcStreamReader(std::shared_ptr<ClientRpc> rpc, std::shared_ptr<std::mutex> read_mutex,
+                   std::shared_ptr<FinishableStream<Reader, internal::FlightData>> stream)
+      : rpc_(rpc),
+        read_mutex_(read_mutex),
+        stream_(stream),
+        peekable_reader_(new internal::PeekableFlightDataReader<std::shared_ptr<Reader>>(
+            stream->stream())),
+        app_metadata_(nullptr) {}
+
+  Status EnsureDataStarted() {
+    if (!batch_reader_) {
+      bool skipped_to_data = false;
+      {
+        auto guard = TakeGuard();
+        skipped_to_data = peekable_reader_->SkipToData();
+      }
+      // peek() until we find the first data message; discard metadata
+      if (!skipped_to_data) {
+        return OverrideWithServerError(MakeFlightError(
+            FlightStatusCode::Internal, "Server never sent a data message"));
+      }
 
-std::shared_ptr<Schema> GrpcStreamReader::schema() const {
-  return batch_reader_->schema();
-}
+      auto message_reader =
+          std::unique_ptr<ipc::MessageReader>(new GrpcIpcMessageReader<Reader>(
+              rpc_, read_mutex_, stream_, peekable_reader_, &app_metadata_));
+      auto result = ipc::RecordBatchStreamReader::Open(std::move(message_reader));
+      RETURN_NOT_OK(OverrideWithServerError(std::move(result).Value(&batch_reader_)));
+    }
+    return Status::OK();
+  }
+  arrow::Result<std::shared_ptr<Schema>> GetSchema() override {
+    RETURN_NOT_OK(EnsureDataStarted());
+    return batch_reader_->schema();
+  }
+  Status Next(FlightStreamChunk* out) override {
+    internal::FlightData* data;
+    {
+      auto guard = TakeGuard();
+      peekable_reader_->Peek(&data);
+    }
+    if (!data) {
+      out->app_metadata = nullptr;
+      out->data = nullptr;
+      return stream_->Finish(Status::OK());
+    }
 
-Status GrpcStreamReader::Next(FlightStreamChunk* out) {
-  out->app_metadata = nullptr;
-  RETURN_NOT_OK(batch_reader_->ReadNext(&out->data));
-  out->app_metadata = std::move(last_app_metadata_);
-  return Status::OK();
-}
+    if (!data->metadata) {
+      // Metadata-only (data->metadata is the IPC header)
+      out->app_metadata = data->app_metadata;
+      out->data = nullptr;
+      {
+        auto guard = TakeGuard();
+        peekable_reader_->Next(&data);
+      }
+      return Status::OK();
+    }
 
-void GrpcStreamReader::Cancel() { rpc_->context.TryCancel(); }
+    if (!batch_reader_) {
+      RETURN_NOT_OK(EnsureDataStarted());
+      // Re-peek here since EnsureDataStarted() advances the stream
+      return Next(out);
+    }
+    RETURN_NOT_OK(batch_reader_->ReadNext(&out->data));
+    out->app_metadata = std::move(app_metadata_);
+    return Status::OK();
+  }
+  void Cancel() override { rpc_->context.TryCancel(); }
+
+ private:
+  std::unique_lock<std::mutex> TakeGuard() {
+    return read_mutex_ ? std::unique_lock<std::mutex>(*read_mutex_)
+                       : std::unique_lock<std::mutex>();
+  }
+
+  Status OverrideWithServerError(Status&& st) {
+    if (st.ok()) {
+      return std::move(st);
+    }
+    return stream_->Finish(std::move(st));
+  }
 
-// Similarly, the next two classes are intertwined. In order to get
-// application-specific metadata to the IpcPayloadWriter,
-// DoPutPayloadWriter takes a pointer to
+  friend class GrpcIpcMessageReader<Reader>;
+  std::shared_ptr<ClientRpc> rpc_;
+  // Guard reads with a lock to prevent Finish()/Close() from being
+  // called on the writer while the reader has a pending
+  // read. Nullable, as DoGet() doesn't need this.
+  std::shared_ptr<std::mutex> read_mutex_;
+  std::shared_ptr<FinishableStream<Reader, internal::FlightData>> stream_;
+  std::shared_ptr<internal::PeekableFlightDataReader<std::shared_ptr<Reader>>>
+      peekable_reader_;
+  std::shared_ptr<ipc::RecordBatchReader> batch_reader_;
+  std::shared_ptr<Buffer> app_metadata_;
+};
+
+// The next two classes implement writing to a FlightData stream.
+// Similarly to the read side, we want to reuse the implementation of
+// RecordBatchWriter. As a result, these two classes are intertwined
+// in order to pass application metadata "through" RecordBatchWriter.
+// In order to get application-specific metadata to the
+// IpcPayloadWriter, DoPutPayloadWriter takes a pointer to
 // GrpcStreamWriter. GrpcStreamWriter updates a metadata field on
 // write; DoPutPayloadWriter reads that metadata field to determine
 // what to write.
 
+template <typename ProtoReadT, typename FlightReadT>
 class DoPutPayloadWriter;
+
+template <typename ProtoReadT, typename FlightReadT>
 class GrpcStreamWriter : public FlightStreamWriter {
  public:
   ~GrpcStreamWriter() override = default;
 
+  using GrpcStream = grpc::ClientReaderWriter<pb::FlightData, ProtoReadT>;
+
   explicit GrpcStreamWriter(
-      std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer)
-      : app_metadata_(nullptr), batch_writer_(nullptr), writer_(writer) {}
+      const FlightDescriptor& descriptor, std::shared_ptr<ClientRpc> rpc,
+      std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer)
+      : app_metadata_(nullptr),
+        batch_writer_(nullptr),
+        writer_(std::move(writer)),
+        rpc_(std::move(rpc)),
+        descriptor_(descriptor),
+        writer_closed_(false) {}
 
   static Status Open(
-      const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema,
-      std::unique_ptr<ClientRpc> rpc, std::unique_ptr<pb::PutResult> response,
-      std::shared_ptr<std::mutex> read_mutex,
-      std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer,
+      const FlightDescriptor& descriptor, std::shared_ptr<Schema> schema,
+      std::shared_ptr<ClientRpc> rpc,
+      std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer,
       std::unique_ptr<FlightStreamWriter>* out);
 
+  Status CheckStarted() {
+    if (!batch_writer_) {
+      return Status::Invalid("Writer not initialized. Call Begin() with a schema.");
+    }
+    return Status::OK();
+  }
+
+  Status Begin(const std::shared_ptr<Schema>& schema) override {
+    if (batch_writer_) {
+      return Status::Invalid("This writer has already been started.");
+    }
+    std::unique_ptr<ipc::internal::IpcPayloadWriter> payload_writer(
+        new DoPutPayloadWriter<ProtoReadT, FlightReadT>(descriptor_, std::move(rpc_),
+                                                        writer_, this));
+    // XXX: this does not actually write the message to the stream.
+    // See Close().
+    ARROW_ASSIGN_OR_RAISE(batch_writer_, ipc::internal::OpenRecordBatchWriter(
+                                             std::move(payload_writer), schema));
+    return Status::OK();
+  }
+
   Status WriteRecordBatch(const RecordBatch& batch) override {
+    RETURN_NOT_OK(CheckStarted());
     return WriteWithMetadata(batch, nullptr);
   }
+
+  Status WriteMetadata(std::shared_ptr<Buffer> app_metadata) override {
+    FlightPayload payload{};
+    payload.app_metadata = app_metadata;
+    if (!internal::WritePayload(payload, writer_->stream().get())) {
+      return writer_->Finish(MakeFlightError(FlightStatusCode::Internal,
+                                             "Could not write metadata to stream"));
+    }
+    return Status::OK();
+  }
   Status WriteWithMetadata(const RecordBatch& batch,
                            std::shared_ptr<Buffer> app_metadata) override {
+    RETURN_NOT_OK(CheckStarted());
     app_metadata_ = app_metadata;
     return batch_writer_->WriteRecordBatch(batch);
   }
   Status DoneWriting() override {
-    if (done_writing_) {
-      return Status::OK();
+    // Do not CheckStarted - DoneWriting applies to data and metadata
+    if (batch_writer_) {
+      // Close the writer if we have one; this will force it to flush any
+      // remaining data, before we close the write side of the stream.
+      writer_closed_ = true;
+      Status st = batch_writer_->Close();
+      if (!st.ok()) {
+        return writer_->Finish(std::move(st));
+      }
     }
-    done_writing_ = true;
-    if (!writer_->WritesDone()) {
-      return Status::IOError("Could not flush pending record batches.");
+    return writer_->DoneWriting();
+  }
+  Status Close() override {
+    // Do not CheckStarted - Close applies to data and metadata
+    if (batch_writer_ && !writer_closed_) {
+      // This is important! Close() calls
+      // IpcPayloadWriter::CheckStarted() which will force the initial
+      // schema message to be written to the stream. This is required
+      // to unstick the server, else the client and the server end up
+      // waiting for each other. This happens if the client never
+      // wrote anything before calling Close().
+      writer_closed_ = true;
+      return writer_->Finish(batch_writer_->Close());
     }
-    return Status::OK();
+    return writer_->Finish(Status::OK());
   }
-  Status Close() override { return batch_writer_->Close(); }
 
  private:
-  friend class DoPutPayloadWriter;
+  friend class DoPutPayloadWriter<ProtoReadT, FlightReadT>;
   std::shared_ptr<Buffer> app_metadata_;
   std::unique_ptr<ipc::RecordBatchWriter> batch_writer_;
-  std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer_;
-  bool done_writing_ = false;
+  std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer_;
+
+  // Fields used to lazy-initialize the IpcPayloadWriter. They're
+  // invalid once Begin() is called.
+  std::shared_ptr<ClientRpc> rpc_;
+  FlightDescriptor descriptor_;
+  bool writer_closed_;
 };
 
-/// A IpcPayloadWriter implementation that writes to a DoPut stream
+/// A IpcPayloadWriter implementation that writes to a gRPC stream of
+/// FlightData messages.
+template <typename ProtoReadT, typename FlightReadT>
 class DoPutPayloadWriter : public ipc::internal::IpcPayloadWriter {
  public:
+  using GrpcStream = grpc::ClientReaderWriter<pb::FlightData, ProtoReadT>;
+
   DoPutPayloadWriter(
-      const FlightDescriptor& descriptor, std::unique_ptr<ClientRpc> rpc,
-      std::unique_ptr<pb::PutResult> response, std::shared_ptr<std::mutex> read_mutex,
-      std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer,
-      GrpcStreamWriter* stream_writer)
+      const FlightDescriptor& descriptor, std::shared_ptr<ClientRpc> rpc,
+      std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer,
+      GrpcStreamWriter<ProtoReadT, FlightReadT>* stream_writer)
       : descriptor_(descriptor),
-        rpc_(std::move(rpc)),
-        response_(std::move(response)),
-        read_mutex_(read_mutex),
+        rpc_(rpc),
         writer_(std::move(writer)),
         first_payload_(true),
         stream_writer_(stream_writer) {}
@@ -433,68 +676,59 @@ class DoPutPayloadWriter : public ipc::internal::IpcPayloadWriter {
       if (ipc_payload.type != ipc::Message::SCHEMA) {
         return Status::Invalid("First IPC message should be schema");
       }
-      std::string str_descr;
-      {
-        pb::FlightDescriptor pb_descr;
-        RETURN_NOT_OK(internal::ToProto(descriptor_, &pb_descr));
-        if (!pb_descr.SerializeToString(&str_descr)) {
-          return Status::UnknownError("Failed to serialized Flight descriptor");
-        }
-      }
-      payload.descriptor = Buffer::FromString(std::move(str_descr));
+      // Write the descriptor to begin with
+      RETURN_NOT_OK(internal::ToPayload(descriptor_, &payload.descriptor));
       first_payload_ = false;
     } else if (ipc_payload.type == ipc::Message::RECORD_BATCH &&
                stream_writer_->app_metadata_) {
       payload.app_metadata = std::move(stream_writer_->app_metadata_);
     }
 
-    if (!internal::WritePayload(payload, writer_.get())) {
-      return rpc_->IOError("Could not write record batch to stream: ");
+    if (!internal::WritePayload(payload, writer_->stream().get())) {
+      return writer_->Finish(MakeFlightError(FlightStatusCode::Internal,
+                                             "Could not write record batch to stream"));
     }
     return Status::OK();
   }
 
   Status Close() override {
-    bool finished_writes = stream_writer_->done_writing_ ? true : writer_->WritesDone();
-    // Drain the read side to avoid hanging
-    std::unique_lock<std::mutex> guard(*read_mutex_, std::try_to_lock);
-    if (!guard.owns_lock()) {
-      return Status::IOError("Cannot close stream with pending read operation.");
-    }
-    pb::PutResult message;
-    while (writer_->Read(&message)) {
-    }
-    RETURN_NOT_OK(internal::FromGrpcStatus(writer_->Finish(), &rpc_->context));
-    if (!finished_writes) {
-      return Status::UnknownError(
-          "Could not finish writing record batches before closing");
-    }
+    // Closing is handled one layer up in GrpcStreamWriter::Close
     return Status::OK();
   }
 
  protected:
-  // TODO: there isn't a way to access this as a user.
   const FlightDescriptor descriptor_;
-  std::unique_ptr<ClientRpc> rpc_;
-  std::unique_ptr<pb::PutResult> response_;
-  std::shared_ptr<std::mutex> read_mutex_;
-  std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer_;
+  std::shared_ptr<ClientRpc> rpc_;
+  std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer_;
   bool first_payload_;
-  GrpcStreamWriter* stream_writer_;
+  GrpcStreamWriter<ProtoReadT, FlightReadT>* stream_writer_;
 };
 
-Status GrpcStreamWriter::Open(
-    const FlightDescriptor& descriptor, const std::shared_ptr<Schema>& schema,
-    std::unique_ptr<ClientRpc> rpc, std::unique_ptr<pb::PutResult> response,
-    std::shared_ptr<std::mutex> read_mutex,
-    std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer,
+template <typename ProtoReadT, typename FlightReadT>
+Status GrpcStreamWriter<ProtoReadT, FlightReadT>::Open(
+    const FlightDescriptor& descriptor,
+    std::shared_ptr<Schema> schema,  // this schema is nullable
+    std::shared_ptr<ClientRpc> rpc,
+    std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer,
     std::unique_ptr<FlightStreamWriter>* out) {
-  std::unique_ptr<GrpcStreamWriter> result(new GrpcStreamWriter(writer));
-  std::unique_ptr<ipc::internal::IpcPayloadWriter> payload_writer(new DoPutPayloadWriter(
-      descriptor, std::move(rpc), std::move(response), read_mutex, writer, result.get()));
-  ARROW_ASSIGN_OR_RAISE(result->batch_writer_, ipc::internal::OpenRecordBatchWriter(
-                                                   std::move(payload_writer), schema));
-  *out = std::move(result);
+  std::unique_ptr<GrpcStreamWriter<ProtoReadT, FlightReadT>> instance(
+      new GrpcStreamWriter<ProtoReadT, FlightReadT>(descriptor, std::move(rpc), writer));
+  if (schema) {
+    // The schema was provided (DoPut). Eagerly write the schema and
+    // descriptor together as the first message.
+    RETURN_NOT_OK(instance->Begin(schema));
+  } else {
+    // The schema was not provided (DoExchange). Eagerly write just
+    // the descriptor as the first message. Note that if the client
+    // calls Begin() to send data, we'll send a redundant descriptor.
+    FlightPayload payload{};
+    RETURN_NOT_OK(internal::ToPayload(descriptor, &payload.descriptor));
+    if (!internal::WritePayload(payload, instance->writer_->stream().get())) {
+      return writer->Finish(MakeFlightError(FlightStatusCode::Internal,
+                                            "Could not write descriptor to stream"));
+    }
+  }
+  *out = std::move(instance);
   return Status::OK();
 }
 
@@ -591,7 +825,8 @@ class FlightClient::FlightClientImpl {
     bool finished_writes = stream->WritesDone();
     RETURN_NOT_OK(internal::FromGrpcStatus(stream->Finish(), &rpc.context));
     if (!finished_writes) {
-      return Status::UnknownError("Could not finish writing before closing");
+      return MakeFlightError(FlightStatusCode::Internal,
+                             "Could not finish writing before closing");
     }
     return Status::OK();
   }
@@ -701,35 +936,66 @@ class FlightClient::FlightClientImpl {
 
   Status DoGet(const FlightCallOptions& options, const Ticket& ticket,
                std::unique_ptr<FlightStreamReader>* out) {
+    using StreamReader = GrpcStreamReader<grpc::ClientReader<pb::FlightData>>;
     pb::Ticket pb_ticket;
     internal::ToProto(ticket, &pb_ticket);
 
-    std::unique_ptr<ClientRpc> rpc(new ClientRpc(options));
+    auto rpc = std::make_shared<ClientRpc>(options);
     RETURN_NOT_OK(rpc->SetToken(auth_handler_.get()));
-    std::unique_ptr<grpc::ClientReader<pb::FlightData>> stream(
-        stub_->DoGet(&rpc->context, pb_ticket));
-
-    std::unique_ptr<GrpcStreamReader> reader;
-    RETURN_NOT_OK(GrpcStreamReader::Open(std::move(rpc), std::move(stream), &reader));
-    *out = std::move(reader);
-    return Status::OK();
+    std::shared_ptr<grpc::ClientReader<pb::FlightData>> stream =
+        stub_->DoGet(&rpc->context, pb_ticket);
+    auto finishable_stream = std::make_shared<
+        FinishableStream<grpc::ClientReader<pb::FlightData>, internal::FlightData>>(
+        rpc, stream);
+    *out =
+        std::unique_ptr<StreamReader>(new StreamReader(rpc, nullptr, finishable_stream));
+    // Eagerly read the schema
+    return static_cast<StreamReader*>(out->get())->EnsureDataStarted();
   }
 
   Status DoPut(const FlightCallOptions& options, const FlightDescriptor& descriptor,
                const std::shared_ptr<Schema>& schema,
                std::unique_ptr<FlightStreamWriter>* out,
                std::unique_ptr<FlightMetadataReader>* reader) {
-    std::unique_ptr<ClientRpc> rpc(new ClientRpc(options));
-    RETURN_NOT_OK(rpc->SetToken(auth_handler_.get()));
-    std::unique_ptr<pb::PutResult> response(new pb::PutResult);
-    std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>> writer(
-        stub_->DoPut(&rpc->context));
+    using GrpcStream = grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>;
+    using StreamWriter = GrpcStreamWriter<pb::PutResult, pb::PutResult>;
 
+    auto rpc = std::make_shared<ClientRpc>(options);
+    RETURN_NOT_OK(rpc->SetToken(auth_handler_.get()));
+    std::shared_ptr<GrpcStream> stream = stub_->DoPut(&rpc->context);
+    // The writer drains the reader on close to avoid hanging inside
+    // gRPC. Concurrent reads are unsafe, so a mutex protects this operation.
     std::shared_ptr<std::mutex> read_mutex = std::make_shared<std::mutex>();
+    auto finishable_stream =
+        std::make_shared<FinishableWritableStream<GrpcStream, pb::PutResult>>(
+            rpc, read_mutex, stream);
     *reader =
-        std::unique_ptr<FlightMetadataReader>(new GrpcMetadataReader(writer, read_mutex));
-    return GrpcStreamWriter::Open(descriptor, schema, std::move(rpc), std::move(response),
-                                  read_mutex, writer, out);
+        std::unique_ptr<FlightMetadataReader>(new GrpcMetadataReader(stream, read_mutex));
+    return StreamWriter::Open(descriptor, schema, rpc, finishable_stream, out);
+  }
+
+  Status DoExchange(const FlightCallOptions& options, const FlightDescriptor& descriptor,
+                    std::unique_ptr<FlightStreamWriter>* writer,
+                    std::unique_ptr<FlightStreamReader>* reader) {
+    using GrpcStream = grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>;
+    using StreamReader = GrpcStreamReader<GrpcStream>;
+    using StreamWriter = GrpcStreamWriter<pb::FlightData, internal::FlightData>;
+
+    auto rpc = std::make_shared<ClientRpc>(options);
+    RETURN_NOT_OK(rpc->SetToken(auth_handler_.get()));
+    std::shared_ptr<grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>> stream =
+        stub_->DoExchange(&rpc->context);
+    // The writer drains the reader on close to avoid hanging inside
+    // gRPC. Concurrent reads are unsafe, so a mutex protects this operation.
+    std::shared_ptr<std::mutex> read_mutex = std::make_shared<std::mutex>();
+    auto finishable_stream =
+        std::make_shared<FinishableWritableStream<GrpcStream, internal::FlightData>>(
+            rpc, read_mutex, stream);
+    *reader = std::unique_ptr<StreamReader>(
+        new StreamReader(rpc, read_mutex, finishable_stream));
+    // Do not eagerly read the schema. There may be metadata messages
+    // before any data is sent, or data may not be sent at all.
+    return StreamWriter::Open(descriptor, nullptr, rpc, finishable_stream, writer);
   }
 
  private:
@@ -802,5 +1068,12 @@ Status FlightClient::DoPut(const FlightCallOptions& options,
   return impl_->DoPut(options, descriptor, schema, stream, reader);
 }
 
+Status FlightClient::DoExchange(const FlightCallOptions& options,
+                                const FlightDescriptor& descriptor,
+                                std::unique_ptr<FlightStreamWriter>* writer,
+                                std::unique_ptr<FlightStreamReader>* reader) {
+  return impl_->DoExchange(options, descriptor, writer, reader);
+}
+
 }  // namespace flight
 }  // namespace arrow
diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h
index 7d6abff..4fd014c 100644
--- a/cpp/src/arrow/flight/client.h
+++ b/cpp/src/arrow/flight/client.h
@@ -92,10 +92,8 @@ class ARROW_FLIGHT_EXPORT FlightStreamReader : public MetadataRecordBatchReader
 
 /// \brief A RecordBatchWriter that also allows sending
 /// application-defined metadata via the Flight protocol.
-class ARROW_FLIGHT_EXPORT FlightStreamWriter : public ipc::RecordBatchWriter {
+class ARROW_FLIGHT_EXPORT FlightStreamWriter : public MetadataRecordBatchWriter {
  public:
-  virtual Status WriteWithMetadata(const RecordBatch& batch,
-                                   std::shared_ptr<Buffer> app_metadata) = 0;
   /// \brief Indicate that the application is done writing to this stream.
   ///
   /// The application may not write to this stream after calling
@@ -245,6 +243,15 @@ class ARROW_FLIGHT_EXPORT FlightClient {
     return DoPut({}, descriptor, schema, stream, reader);
   }
 
+  Status DoExchange(const FlightCallOptions& options, const FlightDescriptor& descriptor,
+                    std::unique_ptr<FlightStreamWriter>* writer,
+                    std::unique_ptr<FlightStreamReader>* reader);
+  Status DoExchange(const FlightDescriptor& descriptor,
+                    std::unique_ptr<FlightStreamWriter>* writer,
+                    std::unique_ptr<FlightStreamReader>* reader) {
+    return DoExchange({}, descriptor, writer, reader);
+  }
+
  private:
   FlightClient();
   class FlightClientImpl;
diff --git a/cpp/src/arrow/flight/flight_test.cc b/cpp/src/arrow/flight/flight_test.cc
index 3342925..fc80051 100644
--- a/cpp/src/arrow/flight/flight_test.cc
+++ b/cpp/src/arrow/flight/flight_test.cc
@@ -425,7 +425,11 @@ class MetadataTestServer : public FlightServerBase {
   Status DoGet(const ServerCallContext& context, const Ticket& request,
                std::unique_ptr<FlightDataStream>* data_stream) override {
     BatchVector batches;
-    RETURN_NOT_OK(ExampleIntBatches(&batches));
+    if (request.ticket == "dicts") {
+      RETURN_NOT_OK(ExampleDictBatches(&batches));
+    } else {
+      RETURN_NOT_OK(ExampleIntBatches(&batches));
+    }
     std::shared_ptr<RecordBatchReader> batch_reader =
         std::make_shared<BatchIterator>(batches[0]->schema(), batches);
 
@@ -985,6 +989,238 @@ TEST_F(TestFlightClient, DoGetDicts) {
   CheckDoGet(descr, expected_batches, check_endpoints);
 }
 
+TEST_F(TestFlightClient, DoExchange) {
+  auto descr = FlightDescriptor::Command("counter");
+  BatchVector batches;
+  auto a1 = ArrayFromJSON(int32(), "[4, 5, 6, null]");
+  auto schema = arrow::schema({field("f1", a1->type())});
+  batches.push_back(RecordBatch::Make(schema, a1->length(), {a1}));
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+  ASSERT_OK(writer->Begin(schema));
+  for (const auto& batch : batches) {
+    ASSERT_OK(writer->WriteRecordBatch(*batch));
+  }
+  ASSERT_OK(writer->DoneWriting());
+  FlightStreamChunk chunk;
+  ASSERT_OK(reader->Next(&chunk));
+  ASSERT_NE(nullptr, chunk.app_metadata);
+  ASSERT_EQ(nullptr, chunk.data);
+  ASSERT_EQ("1", chunk.app_metadata->ToString());
+  ASSERT_OK_AND_ASSIGN(auto server_schema, reader->GetSchema());
+  AssertSchemaEqual(schema, server_schema);
+  for (const auto& batch : batches) {
+    ASSERT_OK(reader->Next(&chunk));
+    ASSERT_BATCHES_EQUAL(*batch, *chunk.data);
+  }
+  ASSERT_OK(writer->Close());
+}
+
+// Test pure-metadata DoExchange to ensure nothing blocks waiting for
+// schema messages
+TEST_F(TestFlightClient, DoExchangeNoData) {
+  auto descr = FlightDescriptor::Command("counter");
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+  ASSERT_OK(writer->DoneWriting());
+  FlightStreamChunk chunk;
+  ASSERT_OK(reader->Next(&chunk));
+  ASSERT_EQ(nullptr, chunk.data);
+  ASSERT_NE(nullptr, chunk.app_metadata);
+  ASSERT_EQ("0", chunk.app_metadata->ToString());
+  ASSERT_OK(writer->Close());
+}
+
+// Test sending a schema without any data, as this hits an edge case
+// in the client-side writer.
+TEST_F(TestFlightClient, DoExchangeWriteOnlySchema) {
+  auto descr = FlightDescriptor::Command("counter");
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+  auto schema = arrow::schema({field("f1", arrow::int32())});
+  ASSERT_OK(writer->Begin(schema));
+  ASSERT_OK(writer->WriteMetadata(Buffer::FromString("foo")));
+  ASSERT_OK(writer->DoneWriting());
+  FlightStreamChunk chunk;
+  ASSERT_OK(reader->Next(&chunk));
+  ASSERT_EQ(nullptr, chunk.data);
+  ASSERT_NE(nullptr, chunk.app_metadata);
+  ASSERT_EQ("0", chunk.app_metadata->ToString());
+  ASSERT_OK(writer->Close());
+}
+
+// Emulate DoGet
+TEST_F(TestFlightClient, DoExchangeGet) {
+  auto descr = FlightDescriptor::Command("get");
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+  ASSERT_OK_AND_ASSIGN(auto server_schema, reader->GetSchema());
+  AssertSchemaEqual(*ExampleIntSchema(), *server_schema);
+  BatchVector batches;
+  ASSERT_OK(ExampleIntBatches(&batches));
+  FlightStreamChunk chunk;
+  for (const auto& batch : batches) {
+    ASSERT_OK(reader->Next(&chunk));
+    ASSERT_NE(nullptr, chunk.data);
+    AssertBatchesEqual(*batch, *chunk.data);
+  }
+  ASSERT_OK(reader->Next(&chunk));
+  ASSERT_EQ(nullptr, chunk.data);
+  ASSERT_EQ(nullptr, chunk.app_metadata);
+  ASSERT_OK(writer->Close());
+}
+
+// Emulate DoPut
+TEST_F(TestFlightClient, DoExchangePut) {
+  auto descr = FlightDescriptor::Command("put");
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+  ASSERT_OK(writer->Begin(ExampleIntSchema()));
+  BatchVector batches;
+  ASSERT_OK(ExampleIntBatches(&batches));
+  for (const auto& batch : batches) {
+    ASSERT_OK(writer->WriteRecordBatch(*batch));
+  }
+  ASSERT_OK(writer->DoneWriting());
+  FlightStreamChunk chunk;
+  ASSERT_OK(reader->Next(&chunk));
+  ASSERT_NE(nullptr, chunk.app_metadata);
+  AssertBufferEqual(*chunk.app_metadata, "done");
+  ASSERT_OK(reader->Next(&chunk));
+  ASSERT_EQ(nullptr, chunk.data);
+  ASSERT_EQ(nullptr, chunk.app_metadata);
+  ASSERT_OK(writer->Close());
+}
+
+// Test the echo server
+TEST_F(TestFlightClient, DoExchangeEcho) {
+  auto descr = FlightDescriptor::Command("echo");
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+  ASSERT_OK(writer->Begin(ExampleIntSchema()));
+  BatchVector batches;
+  FlightStreamChunk chunk;
+  ASSERT_OK(ExampleIntBatches(&batches));
+  for (const auto& batch : batches) {
+    ASSERT_OK(writer->WriteRecordBatch(*batch));
+    ASSERT_OK(reader->Next(&chunk));
+    ASSERT_NE(nullptr, chunk.data);
+    ASSERT_EQ(nullptr, chunk.app_metadata);
+    AssertBatchesEqual(*batch, *chunk.data);
+  }
+  for (int i = 0; i < 10; i++) {
+    const auto buf = Buffer::FromString(std::to_string(i));
+    ASSERT_OK(writer->WriteMetadata(buf));
+    ASSERT_OK(reader->Next(&chunk));
+    ASSERT_EQ(nullptr, chunk.data);
+    ASSERT_NE(nullptr, chunk.app_metadata);
+    AssertBufferEqual(*buf, *chunk.app_metadata);
+  }
+  int index = 0;
+  for (const auto& batch : batches) {
+    const auto buf = Buffer::FromString(std::to_string(index));
+    ASSERT_OK(writer->WriteWithMetadata(*batch, buf));
+    ASSERT_OK(reader->Next(&chunk));
+    ASSERT_NE(nullptr, chunk.data);
+    ASSERT_NE(nullptr, chunk.app_metadata);
+    AssertBatchesEqual(*batch, *chunk.data);
+    AssertBufferEqual(*buf, *chunk.app_metadata);
+    index++;
+  }
+  ASSERT_OK(writer->DoneWriting());
+  ASSERT_OK(reader->Next(&chunk));
+  ASSERT_EQ(nullptr, chunk.data);
+  ASSERT_EQ(nullptr, chunk.app_metadata);
+  ASSERT_OK(writer->Close());
+}
+
+// Test interleaved reading/writing
+TEST_F(TestFlightClient, DoExchangeTotal) {
+  auto descr = FlightDescriptor::Command("total");
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  {
+    auto a1 = ArrayFromJSON(arrow::int32(), "[4, 5, 6, null]");
+    auto schema = arrow::schema({field("f1", a1->type())});
+    // XXX: as noted in flight/client.cc, Begin() is lazy and the
+    // schema message won't be written until some data is also
+    // written. There's also timing issues; hence we check each status
+    // here.
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        Invalid, ::testing::HasSubstr("Field is not INT64: f1"), ([&]() {
+          RETURN_NOT_OK(client_->DoExchange(descr, &writer, &reader));
+          RETURN_NOT_OK(writer->Begin(schema));
+          auto batch = RecordBatch::Make(schema, /* num_rows */ 4, {a1});
+          RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+          return writer->Close();
+        })());
+  }
+  {
+    auto a1 = ArrayFromJSON(arrow::int64(), "[1, 2, null, 3]");
+    auto a2 = ArrayFromJSON(arrow::int64(), "[null, 4, 5, 6]");
+    auto schema = arrow::schema({field("f1", a1->type()), field("f2", a2->type())});
+    ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+    ASSERT_OK(writer->Begin(schema));
+    auto batch = RecordBatch::Make(schema, /* num_rows */ 4, {a1, a2});
+    FlightStreamChunk chunk;
+    ASSERT_OK(writer->WriteRecordBatch(*batch));
+    ASSERT_OK_AND_ASSIGN(auto server_schema, reader->GetSchema());
+    AssertSchemaEqual(*schema, *server_schema);
+
+    ASSERT_OK(reader->Next(&chunk));
+    ASSERT_NE(nullptr, chunk.data);
+    auto expected1 = RecordBatch::Make(
+        schema, /* num_rows */ 1,
+        {ArrayFromJSON(arrow::int64(), "[6]"), ArrayFromJSON(arrow::int64(), "[15]")});
+    AssertBatchesEqual(*expected1, *chunk.data);
+
+    ASSERT_OK(writer->WriteRecordBatch(*batch));
+    ASSERT_OK(reader->Next(&chunk));
+    ASSERT_NE(nullptr, chunk.data);
+    auto expected2 = RecordBatch::Make(
+        schema, /* num_rows */ 1,
+        {ArrayFromJSON(arrow::int64(), "[12]"), ArrayFromJSON(arrow::int64(), "[30]")});
+    AssertBatchesEqual(*expected2, *chunk.data);
+
+    ASSERT_OK(writer->Close());
+  }
+}
+
+// Ensure server errors get propagated no matter what we try
+TEST_F(TestFlightClient, DoExchangeError) {
+  auto descr = FlightDescriptor::Command("error");
+  std::unique_ptr<FlightStreamReader> reader;
+  std::unique_ptr<FlightStreamWriter> writer;
+  {
+    ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+    auto status = writer->Close();
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented, ::testing::HasSubstr("Expected error"), writer->Close());
+  }
+  {
+    ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+    FlightStreamChunk chunk;
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented, ::testing::HasSubstr("Expected error"), reader->Next(&chunk));
+  }
+  {
+    ASSERT_OK(client_->DoExchange(descr, &writer, &reader));
+    EXPECT_RAISES_WITH_MESSAGE_THAT(
+        NotImplemented, ::testing::HasSubstr("Expected error"), reader->GetSchema());
+  }
+  // writer->Begin isn't tested here because, as noted in client.cc,
+  // OpenRecordBatchWriter lazily writes the initial message - hence
+  // Begin() won't fail. Additionally, it appears gRPC may buffer
+  // writes - a write won't immediately fail even when the server
+  // immediately fails.
+}
+
 TEST_F(TestFlightClient, ListActions) {
   std::vector<ActionType> actions;
   ASSERT_OK(client_->ListActions(&actions));
@@ -1381,6 +1617,31 @@ TEST_F(TestMetadata, DoGet) {
   ASSERT_EQ(nullptr, chunk.data);
 }
 
+// Test dictionaries. This tests a corner case in the reader:
+// dictionary batches come in between the schema and the first record
+// batch, so the server must take care to read application metadata
+// from the record batch, and not one of the dictionary batches.
+TEST_F(TestMetadata, DoGetDictionaries) {
+  Ticket ticket{"dicts"};
+  std::unique_ptr<FlightStreamReader> stream;
+  ASSERT_OK(client_->DoGet(ticket, &stream));
+
+  BatchVector expected_batches;
+  ASSERT_OK(ExampleDictBatches(&expected_batches));
+
+  FlightStreamChunk chunk;
+  auto num_batches = static_cast<int>(expected_batches.size());
+  for (int i = 0; i < num_batches; ++i) {
+    ASSERT_OK(stream->Next(&chunk));
+    ASSERT_NE(nullptr, chunk.data);
+    ASSERT_NE(nullptr, chunk.app_metadata);
+    ASSERT_BATCHES_EQUAL(*expected_batches[i], *chunk.data);
+    ASSERT_EQ(std::to_string(i), chunk.app_metadata->ToString());
+  }
+  ASSERT_OK(stream->Next(&chunk));
+  ASSERT_EQ(nullptr, chunk.data);
+}
+
 TEST_F(TestMetadata, DoPut) {
   std::unique_ptr<FlightStreamWriter> writer;
   std::unique_ptr<FlightMetadataReader> reader;
@@ -1403,6 +1664,31 @@ TEST_F(TestMetadata, DoPut) {
   ASSERT_OK(writer->Close());
 }
 
+// Test DoPut() with dictionaries. This tests a corner case in the
+// server-side reader; see DoGetDictionaries above.
+TEST_F(TestMetadata, DoPutDictionaries) {
+  std::unique_ptr<FlightStreamWriter> writer;
+  std::unique_ptr<FlightMetadataReader> reader;
+  BatchVector expected_batches;
+  ASSERT_OK(ExampleDictBatches(&expected_batches));
+  // ARROW-8749: don't get the schema via ExampleDictSchema because
+  // DictionaryMemo uses field addresses to determine whether it's
+  // seen a field before. Hence, if we use a schema that is different
+  // (identity-wise) than the schema of the first batch we write,
+  // we'll end up generating a duplicate set of dictionaries that
+  // confuses the reader.
+  ASSERT_OK(client_->DoPut(FlightDescriptor{}, expected_batches[0]->schema(), &writer,
+                           &reader));
+  std::shared_ptr<RecordBatch> chunk;
+  std::shared_ptr<Buffer> metadata;
+  auto num_batches = static_cast<int>(expected_batches.size());
+  for (int i = 0; i < num_batches; ++i) {
+    ASSERT_OK(writer->WriteWithMetadata(*expected_batches[i],
+                                        Buffer::FromString(std::to_string(i))));
+  }
+  ASSERT_OK(writer->Close());
+}
+
 TEST_F(TestMetadata, DoPutReadMetadata) {
   std::unique_ptr<FlightStreamWriter> writer;
   std::unique_ptr<FlightMetadataReader> reader;
diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc
index 7958b42..2625204 100644
--- a/cpp/src/arrow/flight/internal.cc
+++ b/cpp/src/arrow/flight/internal.cc
@@ -499,6 +499,18 @@ Status ToProto(const SchemaResult& result, pb::SchemaResult* pb_result) {
   return Status::OK();
 }
 
+Status ToPayload(const FlightDescriptor& descr, std::shared_ptr<Buffer>* out) {
+  // TODO(lidavidm): make these use Result<T>
+  std::string str_descr;
+  pb::FlightDescriptor pb_descr;
+  RETURN_NOT_OK(ToProto(descr, &pb_descr));
+  if (!pb_descr.SerializeToString(&str_descr)) {
+    return Status::UnknownError("Failed to serialize Flight descriptor");
+  }
+  *out = Buffer::FromString(std::move(str_descr));
+  return Status::OK();
+}
+
 }  // namespace internal
 }  // namespace flight
 }  // namespace arrow
diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h
index d41f752..c0964c6 100644
--- a/cpp/src/arrow/flight/internal.h
+++ b/cpp/src/arrow/flight/internal.h
@@ -121,6 +121,8 @@ Status ToProto(const SchemaResult& result, pb::SchemaResult* pb_result);
 void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket);
 Status ToProto(const BasicAuth& basic_auth, pb::BasicAuth* pb_basic_auth);
 
+Status ToPayload(const FlightDescriptor& descr, std::shared_ptr<Buffer>* out);
+
 }  // namespace internal
 }  // namespace flight
 }  // namespace arrow
diff --git a/cpp/src/arrow/flight/middleware.h b/cpp/src/arrow/flight/middleware.h
index b1096e1..dd6585b 100644
--- a/cpp/src/arrow/flight/middleware.h
+++ b/cpp/src/arrow/flight/middleware.h
@@ -55,6 +55,7 @@ enum class FlightMethod : char {
   DoPut = 6,
   DoAction = 7,
   ListActions = 8,
+  DoExchange = 9,
 };
 
 /// \brief Information about an instance of a Flight RPC.
diff --git a/cpp/src/arrow/flight/serialization_internal.cc b/cpp/src/arrow/flight/serialization_internal.cc
index 28ff5663f..bb911b6 100644
--- a/cpp/src/arrow/flight/serialization_internal.cc
+++ b/cpp/src/arrow/flight/serialization_internal.cc
@@ -48,6 +48,7 @@
 
 #include "arrow/buffer.h"
 #include "arrow/flight/server.h"
+#include "arrow/ipc/message.h"
 #include "arrow/ipc/writer.h"
 #include "arrow/util/bit_util.h"
 #include "arrow/util/logging.h"
@@ -75,7 +76,9 @@ bool ReadBytesZeroCopy(const std::shared_ptr<Buffer>& source_data,
   if (!input->ReadVarint32(&length)) {
     return false;
   }
-  *out = SliceBuffer(source_data, input->CurrentPosition(), static_cast<int64_t>(length));
+  auto buf =
+      SliceBuffer(source_data, input->CurrentPosition(), static_cast<int64_t>(length));
+  *out = buf;
   return input->Skip(static_cast<int>(length));
 }
 
@@ -109,8 +112,16 @@ class GrpcBuffer : public MutableBuffer {
       // If it is, then we can reference the `grpc_slice` directly.
       grpc_slice slice = buffer->data.raw.slice_buffer.slices[0];
 
-      // Increment reference count so this memory remains valid
-      *out = std::make_shared<GrpcBuffer>(slice, true);
+      if (slice.refcount) {
+        // Increment reference count so this memory remains valid
+        *out = std::make_shared<GrpcBuffer>(slice, true);
+      } else {
+        // Small slices (less than GRPC_SLICE_INLINED_SIZE bytes) are
+        // inlined into the structure and must be copied.
+        const uint8_t length = slice.data.inlined.length;
+        ARROW_ASSIGN_OR_RAISE(*out, arrow::AllocateBuffer(length));
+        std::memcpy((*out)->mutable_data(), slice.data.inlined.bytes, length);
+      }
     } else {
       // Otherwise, we need to use `grpc_byte_buffer_reader_readall` to read
       // `buffer` into a single contiguous `grpc_slice`. The gRPC reader gives
@@ -151,10 +162,41 @@ grpc::Slice SliceFromBuffer(const std::shared_ptr<Buffer>& buf) {
 
 static const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
 
+// Update the sizes of our Protobuf fields based on the given IPC payload.
+grpc::Status IpcMessageHeaderSize(const arrow::ipc::internal::IpcPayload& ipc_msg,
+                                  bool has_body, size_t* body_size, size_t* header_size,
+                                  int32_t* metadata_size) {
+  DCHECK_LT(ipc_msg.metadata->size(), kInt32Max);
+  *metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
+
+  // 1 byte for metadata tag
+  *header_size += 1 + WireFormatLite::LengthDelimitedSize(*metadata_size);
+
+  for (const auto& buffer : ipc_msg.body_buffers) {
+    // Buffer may be null when the row length is zero, or when all
+    // entries are invalid.
+    if (!buffer) continue;
+
+    *body_size += static_cast<size_t>(BitUtil::RoundUpToMultipleOf8(buffer->size()));
+  }
+
+  // 2 bytes for body tag
+  if (has_body) {
+    // We write the body tag in the header but not the actual body data
+    *header_size += 2 + WireFormatLite::LengthDelimitedSize(*body_size) - *body_size;
+  }
+
+  return grpc::Status::OK;
+}
+
 grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
                                  bool* own_buffer) {
+  // Size of the IPC body (protobuf: data_body)
   size_t body_size = 0;
+  // Size of the Protobuf "header" (everything except for the body)
   size_t header_size = 0;
+  // Size of IPC header metadata (protobuf: data_header)
+  int32_t metadata_size = 0;
 
   // Write the descriptor if present
   int32_t descriptor_size = 0;
@@ -166,14 +208,6 @@ grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
     header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size);
   }
 
-  const arrow::ipc::internal::IpcPayload& ipc_msg = msg.ipc_message;
-
-  DCHECK_LT(ipc_msg.metadata->size(), kInt32Max);
-  const int32_t metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
-
-  // 1 byte for metadata tag
-  header_size += 1 + WireFormatLite::LengthDelimitedSize(metadata_size);
-
   // App metadata tag if appropriate
   int32_t app_metadata_size = 0;
   if (msg.app_metadata && msg.app_metadata->size() > 0) {
@@ -182,21 +216,15 @@ grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
     header_size += 1 + WireFormatLite::LengthDelimitedSize(app_metadata_size);
   }
 
-  for (const auto& buffer : ipc_msg.body_buffers) {
-    // Buffer may be null when the row length is zero, or when all
-    // entries are invalid.
-    if (!buffer) continue;
-
-    body_size += static_cast<size_t>(BitUtil::RoundUpToMultipleOf8(buffer->size()));
-  }
-
-  bool has_body = ipc::Message::HasBody(ipc_msg.type);
-  DCHECK(has_body || ipc_msg.body_length == 0);
-
-  // 2 bytes for body tag
-  if (has_body) {
-    // We write the body tag in the header but not the actual body data
-    header_size += 2 + WireFormatLite::LengthDelimitedSize(body_size) - body_size;
+  const arrow::ipc::internal::IpcPayload& ipc_msg = msg.ipc_message;
+  // No data in this payload (metadata-only).
+  bool has_ipc = ipc_msg.type != ipc::Message::NONE;
+  bool has_body = has_ipc ? ipc::Message::HasBody(ipc_msg.type) : false;
+
+  if (has_ipc) {
+    DCHECK(has_body || ipc_msg.body_length == 0);
+    GRPC_RETURN_NOT_GRPC_OK(IpcMessageHeaderSize(ipc_msg, has_body, &body_size,
+                                                 &header_size, &metadata_size));
   }
 
   // TODO(wesm): messages over 2GB unlikely to be yet supported
@@ -207,65 +235,67 @@ grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
 
   // Allocate and initialize slices
   std::vector<grpc::Slice> slices;
-  grpc::Slice header_slice(header_size);
-  slices.push_back(header_slice);
-
-  // XXX(wesm): for debugging
-  // std::cout << "Writing record batch with total size " << total_size << std::endl;
-
-  ArrayOutputStream header_writer(const_cast<uint8_t*>(header_slice.begin()),
-                                  static_cast<int>(header_slice.size()));
-  CodedOutputStream header_stream(&header_writer);
+  slices.emplace_back(header_size);
+
+  // Force the header_stream to be destructed, which actually flushes
+  // the data into the slice.
+  {
+    ArrayOutputStream header_writer(const_cast<uint8_t*>(slices[0].begin()),
+                                    static_cast<int>(slices[0].size()));
+    CodedOutputStream header_stream(&header_writer);
+
+    // Write descriptor
+    if (msg.descriptor != nullptr) {
+      WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
+      header_stream.WriteVarint32(descriptor_size);
+      header_stream.WriteRawMaybeAliased(msg.descriptor->data(),
+                                         static_cast<int>(msg.descriptor->size()));
+    }
 
-  // Write descriptor
-  if (msg.descriptor != nullptr) {
-    WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber,
-                             WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
-    header_stream.WriteVarint32(descriptor_size);
-    header_stream.WriteRawMaybeAliased(msg.descriptor->data(),
-                                       static_cast<int>(msg.descriptor->size()));
-  }
+    // Write header
+    if (has_ipc) {
+      WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
+      header_stream.WriteVarint32(metadata_size);
+      header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
+                                         static_cast<int>(ipc_msg.metadata->size()));
+    }
 
-  // Write header
-  WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
-                           WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
-  header_stream.WriteVarint32(metadata_size);
-  header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
-                                     static_cast<int>(ipc_msg.metadata->size()));
-
-  // Write app metadata
-  if (app_metadata_size > 0) {
-    WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber,
-                             WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
-    header_stream.WriteVarint32(app_metadata_size);
-    header_stream.WriteRawMaybeAliased(msg.app_metadata->data(),
-                                       static_cast<int>(msg.app_metadata->size()));
-  }
+    // Write app metadata
+    if (app_metadata_size > 0) {
+      WireFormatLite::WriteTag(pb::FlightData::kAppMetadataFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
+      header_stream.WriteVarint32(app_metadata_size);
+      header_stream.WriteRawMaybeAliased(msg.app_metadata->data(),
+                                         static_cast<int>(msg.app_metadata->size()));
+    }
 
-  if (has_body) {
-    // Write body tag
-    WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
-                             WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
-    header_stream.WriteVarint32(static_cast<uint32_t>(body_size));
-
-    // Enqueue body buffers for writing, without copying
-    for (const auto& buffer : ipc_msg.body_buffers) {
-      // Buffer may be null when the row length is zero, or when all
-      // entries are invalid.
-      if (!buffer) continue;
-
-      slices.push_back(SliceFromBuffer(buffer));
-
-      // Write padding if not multiple of 8
-      const auto remainder = static_cast<int>(
-          BitUtil::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
-      if (remainder) {
-        slices.push_back(grpc::Slice(kPaddingBytes, remainder));
+    if (has_body) {
+      // Write body tag
+      WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
+                               WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
+      header_stream.WriteVarint32(static_cast<uint32_t>(body_size));
+
+      // Enqueue body buffers for writing, without copying
+      for (const auto& buffer : ipc_msg.body_buffers) {
+        // Buffer may be null when the row length is zero, or when all
+        // entries are invalid.
+        if (!buffer) continue;
+
+        slices.push_back(SliceFromBuffer(buffer));
+
+        // Write padding if not multiple of 8
+        const auto remainder = static_cast<int>(
+            BitUtil::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
+        if (remainder) {
+          slices.push_back(grpc::Slice(kPaddingBytes, remainder));
+        }
       }
     }
-  }
 
-  DCHECK_EQ(static_cast<int>(header_size), header_stream.ByteCount());
+    DCHECK_EQ(static_cast<int>(header_size), header_stream.ByteCount());
+  }
 
   // Hand off the slices to the returned ByteBuffer
   *out = grpc::ByteBuffer(slices.data(), slices.size());
@@ -280,6 +310,12 @@ grpc::Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
     return grpc::Status(grpc::StatusCode::INTERNAL, "No payload");
   }
 
+  // Reset fields in case the caller reuses a single allocation
+  out->descriptor = nullptr;
+  out->app_metadata = nullptr;
+  out->metadata = nullptr;
+  out->body = nullptr;
+
   std::shared_ptr<arrow::Buffer> wrapped_buffer;
   GRPC_RETURN_NOT_OK(GrpcBuffer::Wrap(buffer, &wrapped_buffer));
 
@@ -343,8 +379,8 @@ grpc::Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
   return grpc::Status::OK;
 }
 
-Status FlightData::OpenMessage(std::unique_ptr<ipc::Message>* message) {
-  return ipc::Message::Open(metadata, body).Value(message);
+::arrow::Result<std::unique_ptr<ipc::Message>> FlightData::OpenMessage() {
+  return ipc::Message::Open(metadata, body);
 }
 
 // The pointer bitcast hack below causes legitimate warnings, silence them.
@@ -370,6 +406,20 @@ bool WritePayload(const FlightPayload& payload,
 }
 
 bool WritePayload(const FlightPayload& payload,
+                  grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>* writer) {
+  // Pretend to be pb::FlightData and intercept in SerializationTraits
+  return writer->Write(*reinterpret_cast<const pb::FlightData*>(&payload),
+                       grpc::WriteOptions());
+}
+
+bool WritePayload(const FlightPayload& payload,
+                  grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* writer) {
+  // Pretend to be pb::FlightData and intercept in SerializationTraits
+  return writer->Write(*reinterpret_cast<const pb::FlightData*>(&payload),
+                       grpc::WriteOptions());
+}
+
+bool WritePayload(const FlightPayload& payload,
                   grpc::ServerWriter<pb::FlightData>* writer) {
   // Pretend to be pb::FlightData and intercept in SerializationTraits
   return writer->Write(*reinterpret_cast<const pb::FlightData*>(&payload),
@@ -381,12 +431,29 @@ bool ReadPayload(grpc::ClientReader<pb::FlightData>* reader, FlightData* data) {
   return reader->Read(reinterpret_cast<pb::FlightData*>(data));
 }
 
+bool ReadPayload(grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>* reader,
+                 FlightData* data) {
+  // Pretend to be pb::FlightData and intercept in SerializationTraits
+  return reader->Read(reinterpret_cast<pb::FlightData*>(data));
+}
+
 bool ReadPayload(grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader,
                  FlightData* data) {
   // Pretend to be pb::FlightData and intercept in SerializationTraits
   return reader->Read(reinterpret_cast<pb::FlightData*>(data));
 }
 
+bool ReadPayload(grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* reader,
+                 FlightData* data) {
+  // Pretend to be pb::FlightData and intercept in SerializationTraits
+  return reader->Read(reinterpret_cast<pb::FlightData*>(data));
+}
+
+bool ReadPayload(grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>* reader,
+                 pb::PutResult* data) {
+  return reader->Read(data);
+}
+
 #ifndef _WIN32
 #pragma GCC diagnostic pop
 #endif
diff --git a/cpp/src/arrow/flight/serialization_internal.h b/cpp/src/arrow/flight/serialization_internal.h
index cfb8b8a..2a75d69 100644
--- a/cpp/src/arrow/flight/serialization_internal.h
+++ b/cpp/src/arrow/flight/serialization_internal.h
@@ -25,7 +25,7 @@
 #include "arrow/flight/internal.h"
 #include "arrow/flight/types.h"
 #include "arrow/ipc/message.h"
-#include "arrow/status.h"
+#include "arrow/result.h"
 
 namespace arrow {
 
@@ -50,7 +50,7 @@ struct FlightData {
   std::shared_ptr<Buffer> body;
 
   /// Open IPC message from the metadata and body
-  Status OpenMessage(std::unique_ptr<ipc::Message>* message);
+  ::arrow::Result<std::unique_ptr<ipc::Message>> OpenMessage();
 };
 
 /// Write Flight message on gRPC stream with zero-copy optimizations.
@@ -58,13 +58,92 @@ struct FlightData {
 bool WritePayload(const FlightPayload& payload,
                   grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>* writer);
 bool WritePayload(const FlightPayload& payload,
+                  grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>* writer);
+bool WritePayload(const FlightPayload& payload,
+                  grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* writer);
+bool WritePayload(const FlightPayload& payload,
                   grpc::ServerWriter<pb::FlightData>* writer);
 
 /// Read Flight message from gRPC stream with zero-copy optimizations.
 /// True is returned on success, false if stream ended.
 bool ReadPayload(grpc::ClientReader<pb::FlightData>* reader, FlightData* data);
+bool ReadPayload(grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>* reader,
+                 FlightData* data);
 bool ReadPayload(grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader,
                  FlightData* data);
+bool ReadPayload(grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* reader,
+                 FlightData* data);
+// Overload to make genericity easier in DoPutPayloadWriter
+bool ReadPayload(grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>* reader,
+                 pb::PutResult* data);
+
+// We want to reuse RecordBatchStreamReader's implementation while
+// (1) Adapting it to the Flight message format
+// (2) Allowing pure-metadata messages before data is sent
+// (3) Reusing the reader implementation between DoGet and DoExchange.
+// To do this, we wrap the gRPC reader in a peekable iterator.
+// The Flight reader can then peek at the message to determine whether
+// it has application metadata or not, and pass the message to
+// RecordBatchStreamReader as appropriate.
+template <typename ReaderPtr>
+class PeekableFlightDataReader {
+ public:
+  explicit PeekableFlightDataReader(ReaderPtr stream)
+      : stream_(stream), peek_(), finished_(false), valid_(false) {}
+
+  void Peek(internal::FlightData** out) {
+    *out = nullptr;
+    if (finished_) {
+      return;
+    }
+    if (EnsurePeek()) {
+      *out = &peek_;
+    }
+  }
+
+  void Next(internal::FlightData** out) {
+    Peek(out);
+    valid_ = false;
+  }
+
+  /// \brief Peek() until the first data message.
+  ///
+  /// After this is called, either this will return \a false, or the
+  /// next result of \a Peek and \a Next will contain Arrow data.
+  bool SkipToData() {
+    FlightData* data;
+    while (true) {
+      Peek(&data);
+      if (!data) {
+        return false;
+      }
+      if (data->metadata) {
+        return true;
+      }
+      Next(&data);
+    }
+  }
+
+ private:
+  bool EnsurePeek() {
+    if (finished_ || valid_) {
+      return valid_;
+    }
+
+    if (!internal::ReadPayload(&*stream_, &peek_)) {
+      finished_ = true;
+      valid_ = false;
+    } else {
+      valid_ = true;
+    }
+    return valid_;
+  }
+
+  ReaderPtr stream_;
+  internal::FlightData peek_;
+  bool finished_;
+  bool valid_;
+};
 
 }  // namespace internal
 }  // namespace flight
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 5ad8bba..a544aec 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -90,96 +90,129 @@ namespace flight {
 
 namespace {
 
-// A MessageReader implementation that reads from a gRPC ServerReader
+// A MessageReader implementation that reads from a gRPC ServerReader.
+// Templated to be generic over DoPut/DoExchange.
+template <typename Reader>
 class FlightIpcMessageReader : public ipc::MessageReader {
  public:
   explicit FlightIpcMessageReader(
-      grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader,
-      std::shared_ptr<Buffer>* last_metadata)
-      : reader_(reader), app_metadata_(last_metadata) {}
-
-  const FlightDescriptor& descriptor() const { return descriptor_; }
+      std::shared_ptr<internal::PeekableFlightDataReader<Reader*>> peekable_reader,
+      std::shared_ptr<Buffer>* app_metadata)
+      : peekable_reader_(peekable_reader), app_metadata_(app_metadata) {}
 
   ::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
-    std::unique_ptr<ipc::Message> out;
-    RETURN_NOT_OK(GetNextMessage(&out));
-    return std::move(out);
-  }
-
- protected:
-  Status GetNextMessage(std::unique_ptr<ipc::Message>* out) {
-    // TODO: Migrate to Result APIs
     if (stream_finished_) {
-      *out = nullptr;
-      *app_metadata_ = nullptr;
-      return Status::OK();
+      return nullptr;
     }
-    internal::FlightData data;
-    if (!internal::ReadPayload(reader_, &data)) {
-      // Stream is finished
+    internal::FlightData* data;
+    peekable_reader_->Next(&data);
+    if (!data) {
       stream_finished_ = true;
       if (first_message_) {
         return Status::Invalid(
             "Client provided malformed message or did not provide message");
       }
-      *out = nullptr;
-      *app_metadata_ = nullptr;
-      return Status::OK();
-    }
-
-    if (first_message_) {
-      if (!data.descriptor) {
-        return Status::Invalid("DoPut must start with non-null descriptor");
-      }
-      descriptor_ = *data.descriptor;
-      first_message_ = false;
+      return nullptr;
     }
-
-    RETURN_NOT_OK(data.OpenMessage(out));
-    *app_metadata_ = std::move(data.app_metadata);
-    return Status::OK();
+    *app_metadata_ = std::move(data->app_metadata);
+    return data->OpenMessage();
   }
 
-  grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader_;
-  bool stream_finished_ = false;
-  bool first_message_ = true;
-  FlightDescriptor descriptor_;
+ protected:
+  std::shared_ptr<internal::PeekableFlightDataReader<Reader*>> peekable_reader_;
+  // A reference to FlightMessageReaderImpl.app_metadata_. That class
+  // can't access the app metadata because when it Peek()s the stream,
+  // it may be looking at a dictionary batch, not the record
+  // batch. Updating it here ensures the reader is always updated with
+  // the last metadata message read.
   std::shared_ptr<Buffer>* app_metadata_;
+  bool first_message_ = true;
+  bool stream_finished_ = false;
 };
 
+template <typename WritePayload>
 class FlightMessageReaderImpl : public FlightMessageReader {
  public:
-  explicit FlightMessageReaderImpl(
-      grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader)
-      : reader_(reader) {}
+  using GrpcStream = grpc::ServerReaderWriter<WritePayload, pb::FlightData>;
+
+  explicit FlightMessageReaderImpl(GrpcStream* reader)
+      : reader_(reader),
+        peekable_reader_(new internal::PeekableFlightDataReader<GrpcStream*>(reader)) {}
 
   Status Init() {
-    message_reader_ = new FlightIpcMessageReader(reader_, &last_metadata_);
-    return ipc::RecordBatchStreamReader::Open(
-               std::unique_ptr<ipc::MessageReader>(message_reader_))
-        .Value(&batch_reader_);
+    // Peek the first message to get the descriptor.
+    internal::FlightData* data;
+    peekable_reader_->Peek(&data);
+    if (!data) {
+      return Status::IOError("Stream finished before first message sent");
+    }
+    if (!data->descriptor) {
+      return Status::IOError("Descriptor missing on first message");
+    }
+    descriptor_ = *data->descriptor.get();  // Copy
+    // If there's a schema (=DoPut), also Open().
+    if (data->metadata) {
+      return EnsureDataStarted();
+    }
+    peekable_reader_->Next(&data);
+    return Status::OK();
   }
 
-  const FlightDescriptor& descriptor() const override {
-    return message_reader_->descriptor();
-  }
+  const FlightDescriptor& descriptor() const override { return descriptor_; }
 
-  std::shared_ptr<Schema> schema() const override { return batch_reader_->schema(); }
+  arrow::Result<std::shared_ptr<Schema>> GetSchema() override {
+    RETURN_NOT_OK(EnsureDataStarted());
+    return batch_reader_->schema();
+  }
 
   Status Next(FlightStreamChunk* out) override {
-    out->app_metadata = nullptr;
+    internal::FlightData* data;
+    peekable_reader_->Peek(&data);
+    if (!data) {
+      out->app_metadata = nullptr;
+      out->data = nullptr;
+      return Status::OK();
+    }
+
+    if (!data->metadata) {
+      // Metadata-only (data->metadata is the IPC header)
+      out->app_metadata = data->app_metadata;
+      out->data = nullptr;
+      peekable_reader_->Next(&data);
+      return Status::OK();
+    }
+
+    if (!batch_reader_) {
+      RETURN_NOT_OK(EnsureDataStarted());
+      // re-peek here since EnsureDataStarted() advances the stream
+      return Next(out);
+    }
     RETURN_NOT_OK(batch_reader_->ReadNext(&out->data));
-    out->app_metadata = std::move(last_metadata_);
+    out->app_metadata = std::move(app_metadata_);
     return Status::OK();
   }
 
  private:
-  std::shared_ptr<Schema> schema_;
-  std::unique_ptr<ipc::DictionaryMemo> dictionary_memo_;
-  grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* reader_;
-  FlightIpcMessageReader* message_reader_;
-  std::shared_ptr<Buffer> last_metadata_;
+  /// Ensure we are set up to read data.
+  Status EnsureDataStarted() {
+    if (!batch_reader_) {
+      // peek() until we find the first data message; discard metadata
+      if (!peekable_reader_->SkipToData()) {
+        return Status::IOError("Client never sent a data message");
+      }
+      auto message_reader = std::unique_ptr<ipc::MessageReader>(
+          new FlightIpcMessageReader<GrpcStream>(peekable_reader_, &app_metadata_));
+      ARROW_ASSIGN_OR_RAISE(
+          batch_reader_, ipc::RecordBatchStreamReader::Open(std::move(message_reader)));
+    }
+    return Status::OK();
+  }
+
+  FlightDescriptor descriptor_;
+  GrpcStream* reader_;
+  std::shared_ptr<internal::PeekableFlightDataReader<GrpcStream*>> peekable_reader_;
   std::shared_ptr<RecordBatchReader> batch_reader_;
+  std::shared_ptr<Buffer> app_metadata_;
 };
 
 class GrpcMetadataWriter : public FlightMetadataWriter {
@@ -239,6 +272,92 @@ class GrpcServerAuthSender : public ServerAuthSender {
   grpc::ServerReaderWriter<pb::HandshakeResponse, pb::HandshakeRequest>* stream_;
 };
 
+/// The implementation of the write side of a bidirectional FlightData
+/// stream for DoExchange.
+class DoExchangeMessageWriter : public FlightMessageWriter {
+ public:
+  explicit DoExchangeMessageWriter(
+      grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream)
+      : stream_(stream), ipc_options_(ipc::IpcOptions::Defaults()) {}
+
+  Status Begin(const std::shared_ptr<Schema>& schema) override {
+    if (started_) {
+      return Status::Invalid("This writer has already been started.");
+    }
+    started_ = true;
+
+    FlightPayload schema_payload;
+    RETURN_NOT_OK(ipc::internal::GetSchemaPayload(
+        *schema, ipc_options_, &dictionary_memo_, &schema_payload.ipc_message));
+    return WritePayload(schema_payload);
+  }
+
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    return WriteWithMetadata(batch, nullptr);
+  }
+
+  Status WriteMetadata(std::shared_ptr<Buffer> app_metadata) override {
+    FlightPayload payload{};
+    payload.app_metadata = app_metadata;
+    return WritePayload(payload);
+  }
+
+  Status WriteWithMetadata(const RecordBatch& batch,
+                           std::shared_ptr<Buffer> app_metadata) override {
+    RETURN_NOT_OK(CheckStarted());
+    RETURN_NOT_OK(EnsureDictionariesWritten(batch));
+    FlightPayload payload{};
+    if (app_metadata) {
+      payload.app_metadata = app_metadata;
+    }
+    RETURN_NOT_OK(
+        ipc::internal::GetRecordBatchPayload(batch, ipc_options_, &payload.ipc_message));
+    return WritePayload(payload);
+  }
+
+  Status Close() override {
+    // It's fine to Close() without writing data
+    return Status::OK();
+  }
+
+ private:
+  Status WritePayload(const FlightPayload& payload) {
+    if (!internal::WritePayload(payload, stream_)) {
+      // gRPC doesn't give us any way to find what the error was (if any).
+      return Status::IOError("Could not write payload to stream");
+    }
+    return Status::OK();
+  }
+
+  Status CheckStarted() {
+    if (!started_) {
+      return Status::Invalid("This writer is not started. Call Begin() with a schema");
+    }
+    return Status::OK();
+  }
+
+  Status EnsureDictionariesWritten(const RecordBatch& batch) {
+    if (dictionaries_written_) {
+      return Status::OK();
+    }
+    dictionaries_written_ = true;
+    RETURN_NOT_OK(ipc::CollectDictionaries(batch, &dictionary_memo_));
+    for (auto& pair : dictionary_memo_.dictionaries()) {
+      FlightPayload payload{};
+      RETURN_NOT_OK(ipc::internal::GetDictionaryPayload(
+          pair.first, pair.second, ipc_options_, &payload.ipc_message));
+      RETURN_NOT_OK(WritePayload(payload));
+    }
+    return Status::OK();
+  }
+
+  grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream_;
+  ipc::IpcOptions ipc_options_;
+  ipc::DictionaryMemo dictionary_memo_;
+  bool started_ = false;
+  bool dictionaries_written_ = false;
+};
+
 class FlightServiceImpl;
 class GrpcServerCallContext : public ServerCallContext {
   explicit GrpcServerCallContext(grpc::ServerContext* context) : context_(context) {}
@@ -509,7 +628,8 @@ class FlightServiceImpl : public FlightService::Service {
     FlightPayload schema_payload;
     SERVICE_RETURN_NOT_OK(flight_context, data_stream->GetSchemaPayload(&schema_payload));
     if (!internal::WritePayload(schema_payload, writer)) {
-      // Connection terminated?  XXX return error code?
+      // gRPC doesn't give any way for us to know why the message
+      // could not be written.
       RETURN_WITH_MIDDLEWARE(flight_context, grpc::Status::OK);
     }
 
@@ -531,8 +651,8 @@ class FlightServiceImpl : public FlightService::Service {
     GrpcServerCallContext flight_context(context);
     GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::DoPut, context, flight_context));
 
-    auto message_reader =
-        std::unique_ptr<FlightMessageReaderImpl>(new FlightMessageReaderImpl(reader));
+    auto message_reader = std::unique_ptr<FlightMessageReaderImpl<pb::PutResult>>(
+        new FlightMessageReaderImpl<pb::PutResult>(reader));
     SERVICE_RETURN_NOT_OK(flight_context, message_reader->Init());
     auto metadata_writer =
         std::unique_ptr<FlightMetadataWriter>(new GrpcMetadataWriter(reader));
@@ -541,6 +661,21 @@ class FlightServiceImpl : public FlightService::Service {
                                           std::move(metadata_writer)));
   }
 
+  grpc::Status DoExchange(
+      ServerContext* context,
+      grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream) {
+    GrpcServerCallContext flight_context(context);
+    GRPC_RETURN_NOT_GRPC_OK(CheckAuth(FlightMethod::DoExchange, context, flight_context));
+    auto message_reader = std::unique_ptr<FlightMessageReaderImpl<pb::FlightData>>(
+        new FlightMessageReaderImpl<pb::FlightData>(stream));
+    SERVICE_RETURN_NOT_OK(flight_context, message_reader->Init());
+    auto writer =
+        std::unique_ptr<DoExchangeMessageWriter>(new DoExchangeMessageWriter(stream));
+    RETURN_WITH_MIDDLEWARE(flight_context,
+                           server_->DoExchange(flight_context, std::move(message_reader),
+                                               std::move(writer)));
+  }
+
   grpc::Status ListActions(ServerContext* context, const pb::Empty* request,
                            ServerWriter<pb::ActionType>* writer) {
     GrpcServerCallContext flight_context(context);
@@ -794,6 +929,12 @@ Status FlightServerBase::DoPut(const ServerCallContext& context,
   return Status::NotImplemented("NYI");
 }
 
+Status FlightServerBase::DoExchange(const ServerCallContext& context,
+                                    std::unique_ptr<FlightMessageReader> reader,
+                                    std::unique_ptr<FlightMessageWriter> writer) {
+  return Status::NotImplemented("NYI");
+}
+
 Status FlightServerBase::DoAction(const ServerCallContext& context, const Action& action,
                                   std::unique_ptr<ResultStream>* result) {
   return Status::NotImplemented("NYI");
diff --git a/cpp/src/arrow/flight/server.h b/cpp/src/arrow/flight/server.h
index b03b361..f1206bd 100644
--- a/cpp/src/arrow/flight/server.h
+++ b/cpp/src/arrow/flight/server.h
@@ -96,6 +96,17 @@ class ARROW_FLIGHT_EXPORT FlightMetadataWriter {
   virtual Status WriteMetadata(const Buffer& app_metadata) = 0;
 };
 
+/// \brief A writer for IPC payloads to a client. Also allows sending
+/// application-defined metadata via the Flight protocol.
+///
+/// This class offers more control compared to FlightDataStream,
+/// including the option to write metadata without data and the
+/// ability to interleave reading and writing.
+class ARROW_FLIGHT_EXPORT FlightMessageWriter : public MetadataRecordBatchWriter {
+ public:
+  virtual ~FlightMessageWriter() = default;
+};
+
 /// \brief Call state/contextual data.
 class ARROW_FLIGHT_EXPORT ServerCallContext {
  public:
@@ -236,6 +247,15 @@ class ARROW_FLIGHT_EXPORT FlightServerBase {
                        std::unique_ptr<FlightMessageReader> reader,
                        std::unique_ptr<FlightMetadataWriter> writer);
 
+  /// \brief Process a bidirectional stream of IPC payloads
+  /// \param[in] context The call context.
+  /// \param[in] reader a sequence of uploaded record batches
+  /// \param[in] writer send data back to the client
+  /// \return Status
+  virtual Status DoExchange(const ServerCallContext& context,
+                            std::unique_ptr<FlightMessageReader> reader,
+                            std::unique_ptr<FlightMessageWriter> writer);
+
   /// \brief Execute an action, return stream of zero or more results
   /// \param[in] context The call context.
   /// \param[in] action the action to execute, with type and body
diff --git a/cpp/src/arrow/flight/test_integration_client.cc b/cpp/src/arrow/flight/test_integration_client.cc
index 2405d94..f73f0fe 100644
--- a/cpp/src/arrow/flight/test_integration_client.cc
+++ b/cpp/src/arrow/flight/test_integration_client.cc
@@ -74,9 +74,12 @@ Status UploadBatchesToFlight(const std::vector<std::shared_ptr<RecordBatch>>& ch
     // Wait for the server to ack the result
     std::shared_ptr<Buffer> ack_metadata;
     RETURN_NOT_OK(metadata_reader.ReadMetadata(&ack_metadata));
-    if (!ack_metadata->Equals(*metadata)) {
-      return Status::Invalid("Expected metadata value: " + metadata->ToString() +
-                             " but got: " + ack_metadata->ToString());
+    if (!ack_metadata) {
+      return Status::Invalid("Expected metadata value: ", metadata->ToString(),
+                             " but got nothing.");
+    } else if (!ack_metadata->Equals(*metadata)) {
+      return Status::Invalid("Expected metadata value: ", metadata->ToString(),
+                             " but got: ", ack_metadata->ToString());
     }
     counter++;
   }
diff --git a/cpp/src/arrow/flight/test_integration_server.cc b/cpp/src/arrow/flight/test_integration_server.cc
index 9da42ae..a98f057 100644
--- a/cpp/src/arrow/flight/test_integration_server.cc
+++ b/cpp/src/arrow/flight/test_integration_server.cc
@@ -138,7 +138,7 @@ class FlightIntegrationTestServer : public FlightServerBase {
     std::string key = descriptor.path[0];
 
     IntegrationDataset dataset;
-    dataset.schema = reader->schema();
+    ARROW_ASSIGN_OR_RAISE(dataset.schema, reader->GetSchema());
     arrow::flight::FlightStreamChunk chunk;
     while (true) {
       RETURN_NOT_OK(reader->Next(&chunk));
diff --git a/cpp/src/arrow/flight/test_util.cc b/cpp/src/arrow/flight/test_util.cc
index 9ed1015..b4996ea 100644
--- a/cpp/src/arrow/flight/test_util.cc
+++ b/cpp/src/arrow/flight/test_util.cc
@@ -203,6 +203,187 @@ class FlightTestServer : public FlightServerBase {
     return Status::OK();
   }
 
+  Status DoExchange(const ServerCallContext& context,
+                    std::unique_ptr<FlightMessageReader> reader,
+                    std::unique_ptr<FlightMessageWriter> writer) override {
+    // Test various scenarios for a DoExchange
+    if (reader->descriptor().type != FlightDescriptor::DescriptorType::CMD) {
+      return Status::Invalid("Must provide a command descriptor");
+    }
+
+    const std::string& cmd = reader->descriptor().cmd;
+    if (cmd == "error") {
+      // Immediately return an error to the client.
+      return Status::NotImplemented("Expected error");
+    } else if (cmd == "get") {
+      return RunExchangeGet(std::move(reader), std::move(writer));
+    } else if (cmd == "put") {
+      return RunExchangePut(std::move(reader), std::move(writer));
+    } else if (cmd == "counter") {
+      return RunExchangeCounter(std::move(reader), std::move(writer));
+    } else if (cmd == "total") {
+      return RunExchangeTotal(std::move(reader), std::move(writer));
+    } else if (cmd == "echo") {
+      return RunExchangeEcho(std::move(reader), std::move(writer));
+    } else {
+      return Status::NotImplemented("Scenario not implemented: ", cmd);
+    }
+  }
+
+  // A simple example - act like DoGet.
+  Status RunExchangeGet(std::unique_ptr<FlightMessageReader> reader,
+                        std::unique_ptr<FlightMessageWriter> writer) {
+    RETURN_NOT_OK(writer->Begin(ExampleIntSchema()));
+    BatchVector batches;
+    RETURN_NOT_OK(ExampleIntBatches(&batches));
+    for (const auto& batch : batches) {
+      RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+    }
+    return Status::OK();
+  }
+
+  // A simple example - act like DoPut
+  Status RunExchangePut(std::unique_ptr<FlightMessageReader> reader,
+                        std::unique_ptr<FlightMessageWriter> writer) {
+    ARROW_ASSIGN_OR_RAISE(auto schema, reader->GetSchema());
+    if (!schema->Equals(ExampleIntSchema(), false)) {
+      return Status::Invalid("Schema is not as expected");
+    }
+    BatchVector batches;
+    RETURN_NOT_OK(ExampleIntBatches(&batches));
+    FlightStreamChunk chunk;
+    for (const auto& batch : batches) {
+      RETURN_NOT_OK(reader->Next(&chunk));
+      if (!chunk.data) {
+        return Status::Invalid("Expected another batch");
+      }
+      if (!batch->Equals(*chunk.data)) {
+        return Status::Invalid("Batch does not match");
+      }
+    }
+    RETURN_NOT_OK(reader->Next(&chunk));
+    if (chunk.data || chunk.app_metadata) {
+      return Status::Invalid("Too many batches");
+    }
+
+    RETURN_NOT_OK(writer->WriteMetadata(Buffer::FromString("done")));
+    return Status::OK();
+  }
+
+  // Read some number of record batches from the client, send a
+  // metadata message back with the count, then echo the batches back.
+  Status RunExchangeCounter(std::unique_ptr<FlightMessageReader> reader,
+                            std::unique_ptr<FlightMessageWriter> writer) {
+    std::vector<std::shared_ptr<RecordBatch>> batches;
+    FlightStreamChunk chunk;
+    int chunks = 0;
+    while (true) {
+      RETURN_NOT_OK(reader->Next(&chunk));
+      if (!chunk.data && !chunk.app_metadata) {
+        break;
+      }
+      if (chunk.data) {
+        batches.push_back(chunk.data);
+        chunks++;
+      }
+    }
+
+    // Echo back the number of record batches read.
+    std::shared_ptr<Buffer> buf = Buffer::FromString(std::to_string(chunks));
+    RETURN_NOT_OK(writer->WriteMetadata(buf));
+    // Echo the record batches themselves.
+    if (chunks > 0) {
+      ARROW_ASSIGN_OR_RAISE(auto schema, reader->GetSchema());
+      RETURN_NOT_OK(writer->Begin(schema));
+
+      for (const auto& batch : batches) {
+        RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
+      }
+    }
+
+    return Status::OK();
+  }
+
+  // Read int64 batches from the client, each time sending back a
+  // batch with a running sum of columns.
+  Status RunExchangeTotal(std::unique_ptr<FlightMessageReader> reader,
+                          std::unique_ptr<FlightMessageWriter> writer) {
+    FlightStreamChunk chunk{};
+    ARROW_ASSIGN_OR_RAISE(auto schema, reader->GetSchema());
+    // Ensure the schema contains only int64 columns
+    for (const auto& field : schema->fields()) {
+      if (field->type()->id() != Type::type::INT64) {
+        return Status::Invalid("Field is not INT64: ", field->name());
+      }
+    }
+    std::vector<int64_t> sums(schema->num_fields());
+    std::vector<std::shared_ptr<Array>> columns(schema->num_fields());
+    RETURN_NOT_OK(writer->Begin(schema));
+    while (true) {
+      RETURN_NOT_OK(reader->Next(&chunk));
+      if (!chunk.data && !chunk.app_metadata) {
+        break;
+      }
+      if (chunk.data) {
+        if (!chunk.data->schema()->Equals(schema, false)) {
+          // A compliant client implementation would make this impossible
+          return Status::Invalid("Schemas are incompatible");
+        }
+
+        // Update the running totals
+        auto builder = std::make_shared<Int64Builder>();
+        int col_index = 0;
+        for (const auto& column : chunk.data->columns()) {
+          auto arr = std::dynamic_pointer_cast<Int64Array>(column);
+          if (!arr) {
+            return MakeFlightError(FlightStatusCode::Internal, "Could not cast array");
+          }
+          for (int row = 0; row < column->length(); row++) {
+            if (!arr->IsNull(row)) {
+              sums[col_index] += arr->Value(row);
+            }
+          }
+
+          builder->Reset();
+          RETURN_NOT_OK(builder->Append(sums[col_index]));
+          RETURN_NOT_OK(builder->Finish(&columns[col_index]));
+
+          col_index++;
+        }
+
+        // Echo the totals to the client
+        auto response = RecordBatch::Make(schema, /* num_rows */ 1, columns);
+        RETURN_NOT_OK(writer->WriteRecordBatch(*response));
+      }
+    }
+    return Status::OK();
+  }
+
+  // Echo the client's messages back.
+  Status RunExchangeEcho(std::unique_ptr<FlightMessageReader> reader,
+                         std::unique_ptr<FlightMessageWriter> writer) {
+    FlightStreamChunk chunk;
+    bool begun = false;
+    while (true) {
+      RETURN_NOT_OK(reader->Next(&chunk));
+      if (!chunk.data && !chunk.app_metadata) {
+        break;
+      }
+      if (!begun && chunk.data) {
+        begun = true;
+        RETURN_NOT_OK(writer->Begin(chunk.data->schema()));
+      }
+      if (chunk.data && chunk.app_metadata) {
+        RETURN_NOT_OK(writer->WriteWithMetadata(*chunk.data, chunk.app_metadata));
+      } else if (chunk.data) {
+        RETURN_NOT_OK(writer->WriteRecordBatch(*chunk.data));
+      } else if (chunk.app_metadata) {
+        RETURN_NOT_OK(writer->WriteMetadata(chunk.app_metadata));
+      }
+    }
+    return Status::OK();
+  }
+
   Status RunAction1(const Action& action, std::unique_ptr<ResultStream>* out) {
     std::vector<Result> results;
     for (int i = 0; i < 3; ++i) {
diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc
index 5365a79..1ba65ac 100644
--- a/cpp/src/arrow/flight/types.cc
+++ b/cpp/src/arrow/flight/types.cc
@@ -268,7 +268,8 @@ Status MetadataRecordBatchReader::ReadAll(
 Status MetadataRecordBatchReader::ReadAll(std::shared_ptr<Table>* table) {
   std::vector<std::shared_ptr<RecordBatch>> batches;
   RETURN_NOT_OK(ReadAll(&batches));
-  return Table::FromRecordBatches(schema(), std::move(batches)).Value(table);
+  ARROW_ASSIGN_OR_RAISE(auto schema, GetSchema());
+  return Table::FromRecordBatches(schema, std::move(batches)).Value(table);
 }
 
 SimpleFlightListing::SimpleFlightListing(const std::vector<FlightInfo>& flights)
diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h
index 8a01d99..7c9658d 100644
--- a/cpp/src/arrow/flight/types.h
+++ b/cpp/src/arrow/flight/types.h
@@ -28,6 +28,7 @@
 
 #include "arrow/flight/visibility.h"
 #include "arrow/ipc/writer.h"
+#include "arrow/result.h"
 
 namespace arrow {
 
@@ -463,7 +464,7 @@ class ARROW_FLIGHT_EXPORT MetadataRecordBatchReader {
   virtual ~MetadataRecordBatchReader() = default;
 
   /// \brief Get the schema for this stream.
-  virtual std::shared_ptr<Schema> schema() const = 0;
+  virtual arrow::Result<std::shared_ptr<Schema>> GetSchema() = 0;
   /// \brief Get the next message from Flight. If the stream is
   /// finished, then the members of \a FlightStreamChunk will be
   /// nullptr.
@@ -474,6 +475,17 @@ class ARROW_FLIGHT_EXPORT MetadataRecordBatchReader {
   virtual Status ReadAll(std::shared_ptr<Table>* table);
 };
 
+/// \brief An interface to write IPC payloads with metadata.
+class ARROW_FLIGHT_EXPORT MetadataRecordBatchWriter : public ipc::RecordBatchWriter {
+ public:
+  virtual ~MetadataRecordBatchWriter() = default;
+  /// \brief Begin writing data with the given schema. Only used with \a DoExchange.
+  virtual Status Begin(const std::shared_ptr<Schema>& schema) = 0;
+  virtual Status WriteMetadata(std::shared_ptr<Buffer> app_metadata) = 0;
+  virtual Status WriteWithMetadata(const RecordBatch& batch,
+                                   std::shared_ptr<Buffer> app_metadata) = 0;
+};
+
 /// \brief A FlightListing implementation based on a vector of
 /// FlightInfo objects.
 ///
diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx
index 88599f7..2041a68 100644
--- a/python/pyarrow/_flight.pyx
+++ b/python/pyarrow/_flight.pyx
@@ -1167,7 +1167,8 @@ cdef class FlightClient:
                     deref(c_options), ticket.ticket, &reader))
         result = FlightStreamReader()
         result.reader.reset(reader.release())
-        result.schema = pyarrow_wrap_schema(result.reader.get().schema())
+        schema = GetResultValue(result.reader.get().GetSchema())
+        result.schema = pyarrow_wrap_schema(schema)
         return result
 
     def do_put(self, descriptor: FlightDescriptor, schema: Schema,
@@ -1584,8 +1585,8 @@ cdef CStatus _do_put(void* self, const CServerCallContext& context,
 
     descriptor.descriptor = reader.get().descriptor()
     py_reader.reader.reset(reader.release())
-    py_reader.schema = pyarrow_wrap_schema(
-        py_reader.reader.get().schema())
+    schema = GetResultValue(py_reader.reader.get().GetSchema())
+    py_reader.schema = pyarrow_wrap_schema(schema)
     py_writer.writer.reset(writer.release())
     try:
         (<object> self).do_put(ServerCallContext.wrap(context), descriptor,
diff --git a/python/pyarrow/includes/libarrow_flight.pxd b/python/pyarrow/includes/libarrow_flight.pxd
index cd5f9d0..0f61d52 100644
--- a/python/pyarrow/includes/libarrow_flight.pxd
+++ b/python/pyarrow/includes/libarrow_flight.pxd
@@ -149,7 +149,7 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
 
     cdef cppclass CMetadataRecordBatchReader \
             " arrow::flight::MetadataRecordBatchReader":
-        shared_ptr[CSchema] schema()
+        CResult[shared_ptr[CSchema]] GetSchema()
         CStatus Next(CFlightStreamChunk* out)
         CStatus ReadAll(shared_ptr[CTable]* table)