You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/02/24 06:43:08 UTC

[GitHub] [arrow] cyb70289 commented on a change in pull request #12465: ARROW-15282: [C++][FlightRPC] Split data methods from the underlying transport

cyb70289 commented on a change in pull request #12465:
URL: https://github.com/apache/arrow/pull/12465#discussion_r813537446



##########
File path: cpp/src/arrow/flight/test_util.h
##########
@@ -82,24 +82,43 @@ std::unique_ptr<FlightServerBase> ExampleTestServer();
 // Helper to initialize a server and matching client with callbacks to
 // populate options.
 template <typename T, typename... Args>
-Status MakeServer(std::unique_ptr<FlightServerBase>* server,
+Status MakeServer(const Location& location, std::unique_ptr<FlightServerBase>* server,
                   std::unique_ptr<FlightClient>* client,
                   std::function<Status(FlightServerOptions*)> make_server_options,
                   std::function<Status(FlightClientOptions*)> make_client_options,
                   Args&&... server_args) {
-  Location location;
-  RETURN_NOT_OK(Location::ForGrpcTcp("localhost", 0, &location));
   *server = arrow::internal::make_unique<T>(std::forward<Args>(server_args)...);
   FlightServerOptions server_options(location);
   RETURN_NOT_OK(make_server_options(&server_options));
   RETURN_NOT_OK((*server)->Init(server_options));
   Location real_location;
-  RETURN_NOT_OK(Location::ForGrpcTcp("localhost", (*server)->port(), &real_location));
+  if ((*server)->port() > 0) {

Review comment:
       Add a comment to explain the difference of `server->port() == 0`?

##########
File path: cpp/src/arrow/flight/transport_server_impl.cc
##########
@@ -0,0 +1,327 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/transport_impl.h"
+
+#include <unordered_map>
+
+#include "arrow/buffer.h"
+#include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace flight {
+namespace internal {
+
+namespace {
+class TransportIpcMessageReader : public ipc::MessageReader {
+ public:
+  explicit TransportIpcMessageReader(
+      std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader,
+      std::shared_ptr<MemoryManager> memory_manager,
+      std::shared_ptr<Buffer>* app_metadata)
+      : peekable_reader_(peekable_reader),
+        memory_manager_(std::move(memory_manager)),
+        app_metadata_(app_metadata) {}
+
+  ::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
+    if (stream_finished_) {
+      return nullptr;
+    }
+    internal::FlightData* data;
+    peekable_reader_->Next(&data);
+    if (!data) {
+      stream_finished_ = true;
+      if (first_message_) {
+        return Status::Invalid(
+            "Client provided malformed message or did not provide message");
+      }
+      return nullptr;
+    }
+    if (data->body &&
+        ARROW_PREDICT_FALSE(!data->body->device()->Equals(*memory_manager_->device()))) {
+      ARROW_ASSIGN_OR_RAISE(data->body, Buffer::ViewOrCopy(data->body, memory_manager_));
+    }
+    *app_metadata_ = std::move(data->app_metadata);
+    return data->OpenMessage();
+  }
+
+ protected:
+  std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader_;
+  std::shared_ptr<MemoryManager> memory_manager_;
+  // A reference to TransportDataStream.app_metadata_. That class
+  // can't access the app metadata because when it Peek()s the stream,
+  // it may be looking at a dictionary batch, not the record
+  // batch. Updating it here ensures the reader is always updated with
+  // the last metadata message read.
+  std::shared_ptr<Buffer>* app_metadata_;
+  bool first_message_ = true;
+  bool stream_finished_ = false;
+};
+
+/// \brief Adapt TransportDataStream to the FlightMessageReader
+///   interface for DoPut.
+class TransportMessageReader final : public FlightMessageReader {
+ public:
+  explicit TransportMessageReader(TransportDataStream* stream,
+                                  std::shared_ptr<MemoryManager> memory_manager)
+      : peekable_reader_(new internal::PeekableFlightDataReader(stream)),
+        memory_manager_(std::move(memory_manager)) {}
+
+  Status Init() {
+    // Peek the first message to get the descriptor.
+    internal::FlightData* data;
+    peekable_reader_->Peek(&data);
+    if (!data) {
+      return Status::IOError("Stream finished before first message sent");
+    }
+    if (!data->descriptor) {
+      return Status::IOError("Descriptor missing on first message");
+    }
+    descriptor_ = *data->descriptor;
+    // If there's a schema (=DoPut), also Open().
+    if (data->metadata) {
+      return EnsureDataStarted();
+    }
+    peekable_reader_->Next(&data);
+    return Status::OK();
+  }
+
+  const FlightDescriptor& descriptor() const override { return descriptor_; }
+
+  arrow::Result<std::shared_ptr<Schema>> GetSchema() override {
+    RETURN_NOT_OK(EnsureDataStarted());
+    return batch_reader_->schema();
+  }
+
+  Status Next(FlightStreamChunk* out) override {
+    internal::FlightData* data;
+    peekable_reader_->Peek(&data);
+    if (!data) {
+      out->app_metadata = nullptr;
+      out->data = nullptr;
+      return Status::OK();
+    }
+
+    if (!data->metadata) {
+      // Metadata-only (data->metadata is the IPC header)
+      out->app_metadata = data->app_metadata;
+      out->data = nullptr;
+      peekable_reader_->Next(&data);
+      return Status::OK();
+    }
+
+    if (!batch_reader_) {
+      RETURN_NOT_OK(EnsureDataStarted());
+      // re-peek here since EnsureDataStarted() advances the stream
+      return Next(out);
+    }
+    RETURN_NOT_OK(batch_reader_->ReadNext(&out->data));
+    out->app_metadata = std::move(app_metadata_);
+    return Status::OK();
+  }
+
+ private:
+  /// Ensure we are set up to read data.
+  Status EnsureDataStarted() {
+    if (!batch_reader_) {
+      // peek() until we find the first data message; discard metadata
+      if (!peekable_reader_->SkipToData()) {
+        return Status::IOError("Client never sent a data message");
+      }
+      auto message_reader =
+          std::unique_ptr<ipc::MessageReader>(new TransportIpcMessageReader(
+              peekable_reader_, memory_manager_, &app_metadata_));
+      ARROW_ASSIGN_OR_RAISE(
+          batch_reader_, ipc::RecordBatchStreamReader::Open(std::move(message_reader)));
+    }
+    return Status::OK();
+  }
+
+  FlightDescriptor descriptor_;
+  std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader_;
+  std::shared_ptr<MemoryManager> memory_manager_;
+  std::shared_ptr<RecordBatchReader> batch_reader_;
+  std::shared_ptr<Buffer> app_metadata_;
+};
+
+// TODO(ARROW-10787): this should use the same writer/ipc trick as client
+class TransportMessageWriter final : public FlightMessageWriter {
+ public:
+  explicit TransportMessageWriter(TransportDataStream* stream)
+      : stream_(stream), ipc_options_(::arrow::ipc::IpcWriteOptions::Defaults()) {}
+
+  Status Begin(const std::shared_ptr<Schema>& schema,
+               const ipc::IpcWriteOptions& options) override {
+    if (started_) {
+      return Status::Invalid("This writer has already been started.");
+    }
+    started_ = true;
+    ipc_options_ = options;
+
+    RETURN_NOT_OK(mapper_.AddSchemaFields(*schema));
+    FlightPayload schema_payload;
+    RETURN_NOT_OK(ipc::GetSchemaPayload(*schema, ipc_options_, mapper_,
+                                        &schema_payload.ipc_message));
+    return WritePayload(schema_payload);
+  }
+
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    return WriteWithMetadata(batch, nullptr);
+  }
+
+  Status WriteMetadata(std::shared_ptr<Buffer> app_metadata) override {
+    FlightPayload payload{};
+    payload.app_metadata = app_metadata;
+    return WritePayload(payload);
+  }
+
+  Status WriteWithMetadata(const RecordBatch& batch,
+                           std::shared_ptr<Buffer> app_metadata) override {
+    RETURN_NOT_OK(CheckStarted());
+    RETURN_NOT_OK(EnsureDictionariesWritten(batch));
+    FlightPayload payload{};
+    if (app_metadata) {
+      payload.app_metadata = app_metadata;
+    }
+    RETURN_NOT_OK(ipc::GetRecordBatchPayload(batch, ipc_options_, &payload.ipc_message));
+    RETURN_NOT_OK(WritePayload(payload));
+    ++stats_.num_record_batches;
+    return Status::OK();
+  }
+
+  Status Close() override {
+    // It's fine to Close() without writing data
+    return Status::OK();
+  }
+
+  ipc::WriteStats stats() const override { return stats_; }
+
+ private:
+  Status WritePayload(const FlightPayload& payload) {
+    RETURN_NOT_OK(stream_->WriteData(payload));
+    ++stats_.num_messages;
+    return Status::OK();
+  }
+
+  Status CheckStarted() {
+    if (!started_) {
+      return Status::Invalid("This writer is not started. Call Begin() with a schema");
+    }
+    return Status::OK();
+  }
+
+  Status EnsureDictionariesWritten(const RecordBatch& batch) {
+    if (dictionaries_written_) {
+      return Status::OK();
+    }
+    dictionaries_written_ = true;
+    ARROW_ASSIGN_OR_RAISE(const auto dictionaries,
+                          ipc::CollectDictionaries(batch, mapper_));
+    for (const auto& pair : dictionaries) {
+      FlightPayload payload{};
+      RETURN_NOT_OK(ipc::GetDictionaryPayload(pair.first, pair.second, ipc_options_,
+                                              &payload.ipc_message));
+      RETURN_NOT_OK(WritePayload(payload));
+      ++stats_.num_dictionary_batches;
+    }
+    return Status::OK();
+  }
+
+  TransportDataStream* stream_;
+  ::arrow::ipc::IpcWriteOptions ipc_options_;
+  ipc::DictionaryFieldMapper mapper_;
+  ipc::WriteStats stats_;
+  bool started_ = false;
+  bool dictionaries_written_ = false;
+};
+
+/// \brief Adapt TransportDataStream to the FlightMetadataWriter
+///   interface for DoPut.
+class TransportMetadataWriter final : public FlightMetadataWriter {
+ public:
+  explicit TransportMetadataWriter(TransportDataStream* stream) : stream_(stream) {}
+
+  Status WriteMetadata(const Buffer& buffer) override {
+    return stream_->WritePutMetadata(buffer);
+  }
+
+ private:
+  TransportDataStream* stream_;
+};
+}  // namespace
+
+Status FlightServiceImpl::DoGet(const ServerCallContext& context, const Ticket& ticket,
+                                TransportDataStream* stream) {
+  std::unique_ptr<FlightDataStream> data_stream;
+  RETURN_NOT_OK(service_->DoGet(context, ticket, &data_stream));
+
+  if (!data_stream) return Status::KeyError("No data in this flight");
+
+  // Write the schema as the first message in the stream
+  FlightPayload schema_payload;
+  RETURN_NOT_OK(data_stream->GetSchemaPayload(&schema_payload));
+  auto status = stream->WriteData(schema_payload);
+  // Connection terminated
+  if (status.IsIOError()) return Status::OK();
+  RETURN_NOT_OK(status);
+
+  // Consume data stream and write out payloads
+  while (true) {
+    FlightPayload payload;
+    RETURN_NOT_OK(data_stream->Next(&payload));
+    // End of stream
+    if (payload.ipc_message.metadata == nullptr) break;
+    auto status = stream->WriteData(payload);
+    // Ignore IOError (used to signal that client disconnected; there's nothing
+    // we can do - e.g. see WritePayload in serialization_internal.cc)
+    if (status.IsIOError()) return Status::OK();
+    RETURN_NOT_OK(status);
+  }
+  RETURN_NOT_OK(stream->WritesDone());
+  return Status::OK();
+}
+
+Status FlightServiceImpl::DoPut(const ServerCallContext& context,
+                                TransportDataStream* stream) {
+  std::unique_ptr<FlightMessageReader> reader(
+      new TransportMessageReader(stream, memory_manager_));
+  std::unique_ptr<FlightMetadataWriter> writer(new TransportMetadataWriter(stream));
+  RETURN_NOT_OK(reinterpret_cast<TransportMessageReader*>(reader.get())->Init());

Review comment:
       This cast looks a bit strange. What about defining reader as ptr to TransportMessageReader (using make_unique) and calling reader->Init() here? Will it work?

##########
File path: cpp/src/arrow/flight/transport_server_impl.cc
##########
@@ -0,0 +1,327 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/transport_impl.h"
+
+#include <unordered_map>
+
+#include "arrow/buffer.h"
+#include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace flight {
+namespace internal {
+
+namespace {
+class TransportIpcMessageReader : public ipc::MessageReader {
+ public:
+  explicit TransportIpcMessageReader(
+      std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader,
+      std::shared_ptr<MemoryManager> memory_manager,
+      std::shared_ptr<Buffer>* app_metadata)
+      : peekable_reader_(peekable_reader),
+        memory_manager_(std::move(memory_manager)),
+        app_metadata_(app_metadata) {}
+
+  ::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
+    if (stream_finished_) {
+      return nullptr;
+    }
+    internal::FlightData* data;
+    peekable_reader_->Next(&data);
+    if (!data) {
+      stream_finished_ = true;
+      if (first_message_) {

Review comment:
       Probably from legacy code, looks `first_message_` is always true? Or we should set it to false somewhere?

##########
File path: cpp/src/arrow/flight/transport_server_impl.cc
##########
@@ -0,0 +1,327 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/transport_impl.h"
+
+#include <unordered_map>
+
+#include "arrow/buffer.h"
+#include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace flight {
+namespace internal {
+
+namespace {
+class TransportIpcMessageReader : public ipc::MessageReader {
+ public:
+  explicit TransportIpcMessageReader(
+      std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader,
+      std::shared_ptr<MemoryManager> memory_manager,
+      std::shared_ptr<Buffer>* app_metadata)
+      : peekable_reader_(peekable_reader),
+        memory_manager_(std::move(memory_manager)),
+        app_metadata_(app_metadata) {}
+
+  ::arrow::Result<std::unique_ptr<ipc::Message>> ReadNextMessage() override {
+    if (stream_finished_) {
+      return nullptr;
+    }
+    internal::FlightData* data;
+    peekable_reader_->Next(&data);
+    if (!data) {
+      stream_finished_ = true;
+      if (first_message_) {
+        return Status::Invalid(
+            "Client provided malformed message or did not provide message");
+      }
+      return nullptr;
+    }
+    if (data->body &&
+        ARROW_PREDICT_FALSE(!data->body->device()->Equals(*memory_manager_->device()))) {
+      ARROW_ASSIGN_OR_RAISE(data->body, Buffer::ViewOrCopy(data->body, memory_manager_));
+    }
+    *app_metadata_ = std::move(data->app_metadata);
+    return data->OpenMessage();
+  }
+
+ protected:
+  std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader_;
+  std::shared_ptr<MemoryManager> memory_manager_;
+  // A reference to TransportDataStream.app_metadata_. That class
+  // can't access the app metadata because when it Peek()s the stream,
+  // it may be looking at a dictionary batch, not the record
+  // batch. Updating it here ensures the reader is always updated with
+  // the last metadata message read.
+  std::shared_ptr<Buffer>* app_metadata_;
+  bool first_message_ = true;
+  bool stream_finished_ = false;
+};
+
+/// \brief Adapt TransportDataStream to the FlightMessageReader
+///   interface for DoPut.
+class TransportMessageReader final : public FlightMessageReader {
+ public:
+  explicit TransportMessageReader(TransportDataStream* stream,

Review comment:
       `explicit` can be removed

##########
File path: cpp/src/arrow/flight/client.cc
##########
@@ -596,198 +427,257 @@ class GrpcStreamReader : public FlightStreamReader {
     return ReadAll(table, stop_token_);
   }
   using FlightStreamReader::ReadAll;
-  void Cancel() override { rpc_->context.TryCancel(); }
+  void Cancel() override { stream_->TryCancel(); }
 
  private:
-  std::unique_lock<std::mutex> TakeGuard() {
-    return read_mutex_ ? std::unique_lock<std::mutex>(*read_mutex_)
-                       : std::unique_lock<std::mutex>();
-  }
-
   Status OverrideWithServerError(Status&& st) {
     if (st.ok()) {
       return std::move(st);
     }
     return stream_->Finish(std::move(st));
   }
 
-  friend class GrpcIpcMessageReader<Reader>;
-  std::shared_ptr<ClientRpc> rpc_;
-  std::shared_ptr<MemoryManager> memory_manager_;
-  // Guard reads with a lock to prevent Finish()/Close() from being
-  // called on the writer while the reader has a pending
-  // read. Nullable, as DoGet() doesn't need this.
-  std::shared_ptr<std::mutex> read_mutex_;
+  std::shared_ptr<internal::ClientDataStream> stream_;
   ipc::IpcReadOptions options_;
   StopToken stop_token_;
-  std::shared_ptr<FinishableStream<Reader, internal::FlightData>> stream_;
-  std::shared_ptr<internal::PeekableFlightDataReader<std::shared_ptr<Reader>>>
-      peekable_reader_;
+  std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader_;
   std::shared_ptr<ipc::RecordBatchReader> batch_reader_;
   std::shared_ptr<Buffer> app_metadata_;
 };
 
-// The next two classes implement writing to a FlightData stream.
-// Similarly to the read side, we want to reuse the implementation of
-// RecordBatchWriter. As a result, these two classes are intertwined
-// in order to pass application metadata "through" RecordBatchWriter.
-// In order to get application-specific metadata to the
-// IpcPayloadWriter, DoPutPayloadWriter takes a pointer to
-// GrpcStreamWriter. GrpcStreamWriter updates a metadata field on
-// write; DoPutPayloadWriter reads that metadata field to determine
-// what to write.
-
-template <typename ProtoReadT, typename FlightReadT>
-class DoPutPayloadWriter;
-
-template <typename ProtoReadT, typename FlightReadT>
-class GrpcStreamWriter : public FlightStreamWriter {
+FlightMetadataReader::~FlightMetadataReader() = default;
+
+/// \brief The base of the ClientDataStream implementation for gRPC.
+template <typename Stream, typename ReadPayload>
+class FinishableDataStream : public internal::ClientDataStream {
  public:
-  ~GrpcStreamWriter() override = default;
+  FinishableDataStream(std::shared_ptr<ClientRpc> rpc, std::shared_ptr<Stream> stream,
+                       std::shared_ptr<MemoryManager> memory_manager)
+      : rpc_(std::move(rpc)),
+        stream_(std::move(stream)),
+        memory_manager_(memory_manager ? std::move(memory_manager)
+                                       : CPUDevice::Instance()->default_memory_manager()),
+        finished_(false) {}
 
-  using GrpcStream = grpc::ClientReaderWriter<pb::FlightData, ProtoReadT>;
+  Status Finish() override {
+    if (finished_) {
+      return server_status_;
+    }
 
-  explicit GrpcStreamWriter(
-      const FlightDescriptor& descriptor, std::shared_ptr<ClientRpc> rpc,
-      int64_t write_size_limit_bytes, const ipc::IpcWriteOptions& options,
-      std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer)
-      : app_metadata_(nullptr),
-        batch_writer_(nullptr),
-        writer_(std::move(writer)),
-        rpc_(std::move(rpc)),
-        write_size_limit_bytes_(write_size_limit_bytes),
-        options_(options),
-        descriptor_(descriptor),
-        writer_closed_(false) {}
+    // Drain the read side, as otherwise gRPC Finish() will hang. We
+    // only call Finish() when the client closes the writer or the
+    // reader finishes, so it's OK to assume the client no longer
+    // wants to read and drain the read side. (If the client wants to
+    // indicate that it is done writing, but not done reading, it
+    // should use DoneWriting.
+    ReadPayload message;
+    while (internal::ReadPayload(stream_.get(), &message)) {
+      // Drain the read side to avoid gRPC hanging in Finish()
+    }
 
-  static Status Open(
-      const FlightDescriptor& descriptor, std::shared_ptr<Schema> schema,
-      const ipc::IpcWriteOptions& options, std::shared_ptr<ClientRpc> rpc,
-      int64_t write_size_limit_bytes,
-      std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer,
-      std::unique_ptr<FlightStreamWriter>* out);
+    server_status_ = internal::FromGrpcStatus(stream_->Finish(), &rpc_->context);
+    if (!server_status_.ok()) {
+      server_status_ = Status::FromDetailAndArgs(
+          server_status_.code(), server_status_.detail(), server_status_.message(),
+          ". gRPC client debug context: ", rpc_->context.debug_error_string());
+    }
+    finished_ = true;
 
-  Status CheckStarted() {
-    if (!batch_writer_) {
-      return Status::Invalid("Writer not initialized. Call Begin() with a schema.");
+    return server_status_;
+  }
+  void TryCancel() override { rpc_->context.TryCancel(); }
+
+  std::shared_ptr<ClientRpc> rpc_;
+  std::shared_ptr<Stream> stream_;
+  std::shared_ptr<MemoryManager> memory_manager_;
+  bool finished_;
+  Status server_status_;
+};
+
+/// \brief A ClientDataStream implementation for gRPC that manages a
+///   mutex to protect from concurrent reads/writes, and drains the
+///   read side on finish.
+template <typename Stream, typename ReadPayload>
+class WritableDataStream : public FinishableDataStream<Stream, ReadPayload> {
+ public:
+  using Base = FinishableDataStream<Stream, ReadPayload>;
+  WritableDataStream(std::shared_ptr<ClientRpc> rpc, std::shared_ptr<Stream> stream,
+                     std::shared_ptr<MemoryManager> memory_manager)
+      : Base(std::move(rpc), std::move(stream), std::move(memory_manager)),
+        read_mutex_(),
+        finish_mutex_(),
+        done_writing_(false) {}
+
+  Status WritesDone() override {
+    // This is only used by the writer side of a stream, so it need
+    // not be protected with a lock.
+    if (done_writing_) {
+      return Status::OK();
+    }
+    done_writing_ = true;
+    if (!stream_->WritesDone()) {
+      // Error happened, try to close the stream to get more detailed info
+      return internal::ClientDataStream::Finish(MakeFlightError(
+          FlightStatusCode::Internal, "Could not flush pending record batches"));
     }
     return Status::OK();
   }
 
-  Status Begin(const std::shared_ptr<Schema>& schema,
-               const ipc::IpcWriteOptions& options) override {
-    if (batch_writer_) {
-      return Status::Invalid("This writer has already been started.");
+  Status Finish() override {
+    // This may be used concurrently by reader/writer side of a
+    // stream, so it needs to be protected.
+    std::lock_guard<std::mutex> guard(finish_mutex_);
+
+    // Now that we're shared between a reader and writer, we need to
+    // protect ourselves from being called while there's an
+    // outstanding read.
+    std::unique_lock<std::mutex> read_guard(read_mutex_, std::try_to_lock);
+    if (!read_guard.owns_lock()) {
+      return MakeFlightError(FlightStatusCode::Internal,
+                             "Cannot close stream with pending read operation.");
     }
-    std::unique_ptr<ipc::internal::IpcPayloadWriter> payload_writer(
-        new DoPutPayloadWriter<ProtoReadT, FlightReadT>(
-            descriptor_, std::move(rpc_), write_size_limit_bytes_, writer_, this));
-    // XXX: this does not actually write the message to the stream.
-    // See Close().
-    ARROW_ASSIGN_OR_RAISE(batch_writer_, ipc::internal::OpenRecordBatchWriter(
-                                             std::move(payload_writer), schema, options));
-    return Status::OK();
+
+    // Try to flush pending writes. Don't use our WritesDone() to
+    // avoid recursion.
+    bool finished_writes = done_writing_ || stream_->WritesDone();
+    done_writing_ = true;
+
+    Status st = Base::Finish();
+    if (!finished_writes) {
+      return Status::FromDetailAndArgs(
+          st.code(), st.detail(), st.message(),
+          ". Additionally, could not finish writing record batches before closing");
+    }
+    return st;
   }
 
-  Status Begin(const std::shared_ptr<Schema>& schema) override {
-    return Begin(schema, options_);
+  using Base::stream_;
+  std::mutex read_mutex_;
+  std::mutex finish_mutex_;
+  bool done_writing_;
+};
+
+class GrpcClientGetStream
+    : public FinishableDataStream<grpc::ClientReader<pb::FlightData>,
+                                  internal::FlightData> {
+ public:
+  using FinishableDataStream::FinishableDataStream;
+
+  bool ReadData(internal::FlightData* data) override {
+    bool success = internal::ReadPayload(stream_.get(), data);
+    if (ARROW_PREDICT_FALSE(!success)) return false;
+    if (data->body &&
+        ARROW_PREDICT_FALSE(!data->body->device()->Equals(*memory_manager_->device()))) {
+      auto status = Buffer::ViewOrCopy(data->body, memory_manager_).Value(&data->body);
+      if (!status.ok()) {
+        server_status_ = std::move(status);
+        return false;
+      }
+    }
+    return true;
   }
+  Status WritesDone() override { return Status::NotImplemented("NYI"); }
+};
 
-  Status WriteRecordBatch(const RecordBatch& batch) override {
-    RETURN_NOT_OK(CheckStarted());
-    return WriteWithMetadata(batch, nullptr);
+class GrpcClientPutStream
+    : public WritableDataStream<grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>,
+                                pb::PutResult> {
+ public:
+  using Stream = grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>;
+  GrpcClientPutStream(std::shared_ptr<ClientRpc> rpc, std::shared_ptr<Stream> stream,
+                      std::shared_ptr<MemoryManager> memory_manager)
+      : WritableDataStream(std::move(rpc), std::move(stream), std::move(memory_manager)) {
   }
 
-  Status WriteMetadata(std::shared_ptr<Buffer> app_metadata) override {
-    FlightPayload payload{};
-    payload.app_metadata = app_metadata;
-    auto status = internal::WritePayload(payload, writer_->stream().get());
+  bool ReadPutMetadata(std::shared_ptr<Buffer>* out) override {
+    std::lock_guard<std::mutex> guard(read_mutex_);
+    pb::PutResult message;
+    if (stream_->Read(&message)) {
+      *out = Buffer::FromString(std::move(*message.mutable_app_metadata()));
+    } else {
+      // Stream finished
+      *out = nullptr;
+    }
+    return true;
+  }
+  Status WriteData(const FlightPayload& payload) override {
+    auto status = internal::WritePayload(payload, this->stream_.get());
     if (status.IsIOError()) {
-      return writer_->Finish(MakeFlightError(FlightStatusCode::Internal,
-                                             "Could not write metadata to stream"));
+      return internal::ClientDataStream::Finish(MakeFlightError(
+          FlightStatusCode::Internal, "Could not write record batch to stream"));
     }
     return status;
   }
+};
 
-  Status WriteWithMetadata(const RecordBatch& batch,
-                           std::shared_ptr<Buffer> app_metadata) override {
-    RETURN_NOT_OK(CheckStarted());
-    app_metadata_ = app_metadata;
-    return batch_writer_->WriteRecordBatch(batch);
+class GrpcClientExchangeStream
+    : public WritableDataStream<grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>,
+                                internal::FlightData> {
+ public:
+  using Stream = grpc::ClientReaderWriter<pb::FlightData, pb::FlightData>;
+  GrpcClientExchangeStream(std::shared_ptr<ClientRpc> rpc, std::shared_ptr<Stream> stream,
+                           std::shared_ptr<MemoryManager> memory_manager)
+      : WritableDataStream(std::move(rpc), std::move(stream), std::move(memory_manager)) {
   }
 
-  Status DoneWriting() override {
-    // Do not CheckStarted - DoneWriting applies to data and metadata
-    if (batch_writer_) {
-      // Close the writer if we have one; this will force it to flush any
-      // remaining data, before we close the write side of the stream.
-      writer_closed_ = true;
-      Status st = batch_writer_->Close();
-      if (!st.ok()) {
-        return writer_->Finish(std::move(st));
+  bool ReadData(internal::FlightData* data) override {
+    std::lock_guard<std::mutex> guard(read_mutex_);
+    bool success = internal::ReadPayload(stream_.get(), data);
+    if (ARROW_PREDICT_FALSE(!success)) return false;
+    if (data->body &&
+        ARROW_PREDICT_FALSE(!data->body->device()->Equals(*memory_manager_->device()))) {
+      auto status = Buffer::ViewOrCopy(data->body, memory_manager_).Value(&data->body);
+      if (!status.ok()) {
+        server_status_ = std::move(status);
+        return false;
       }
     }
-    return writer_->DoneWriting();
+    return true;
   }
-
-  Status Close() override {
-    // Do not CheckStarted - Close applies to data and metadata
-    if (batch_writer_ && !writer_closed_) {
-      // This is important! Close() calls
-      // IpcPayloadWriter::CheckStarted() which will force the initial
-      // schema message to be written to the stream. This is required
-      // to unstick the server, else the client and the server end up
-      // waiting for each other. This happens if the client never
-      // wrote anything before calling Close().
-      writer_closed_ = true;
-      return writer_->Finish(batch_writer_->Close());
+  Status WriteData(const FlightPayload& payload) override {
+    auto status = internal::WritePayload(payload, this->stream_.get());
+    if (status.IsIOError()) {
+      return internal::ClientDataStream::Finish(MakeFlightError(
+          FlightStatusCode::Internal, "Could not write record batch to stream"));
     }
-    return writer_->Finish(Status::OK());
+    return status;
   }
+};
 
-  ipc::WriteStats stats() const override {
-    ARROW_CHECK_NE(batch_writer_, nullptr);
-    return batch_writer_->stats();
+class ClientMetadataReader : public FlightMetadataReader {
+ public:
+  explicit ClientMetadataReader(std::shared_ptr<internal::ClientDataStream> stream)
+      : stream_(std::move(stream)) {}
+
+  Status ReadMetadata(std::shared_ptr<Buffer>* out) override {
+    if (!stream_->ReadPutMetadata(out)) {
+      return stream_->Finish(Status::OK());
+    }
+    return Status::OK();
   }
 
  private:
-  friend class DoPutPayloadWriter<ProtoReadT, FlightReadT>;
-  std::shared_ptr<Buffer> app_metadata_;
-  std::unique_ptr<ipc::RecordBatchWriter> batch_writer_;
-  std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer_;
-
-  // Fields used to lazy-initialize the IpcPayloadWriter. They're
-  // invalid once Begin() is called.
-  std::shared_ptr<ClientRpc> rpc_;
-  int64_t write_size_limit_bytes_;
-  ipc::IpcWriteOptions options_;
-  FlightDescriptor descriptor_;
-  bool writer_closed_;
+  std::shared_ptr<internal::ClientDataStream> stream_;
 };
 
-/// A IpcPayloadWriter implementation that writes to a gRPC stream of
-/// FlightData messages.
-template <typename ProtoReadT, typename FlightReadT>
-class DoPutPayloadWriter : public ipc::internal::IpcPayloadWriter {
+/// \brief An IpcPayloadWriter for any ClientDataStream.
+///
+/// To support app_metadata and reuse the existing IPC infrastructure,
+/// this takes a pointer to a buffer to be combined with the IPC
+/// payload when writing a Flight payload.
+class ClientPutPayloadWriter : public ipc::internal::IpcPayloadWriter {
  public:
-  using GrpcStream = grpc::ClientReaderWriter<pb::FlightData, ProtoReadT>;
-
-  DoPutPayloadWriter(
-      const FlightDescriptor& descriptor, std::shared_ptr<ClientRpc> rpc,
-      int64_t write_size_limit_bytes,
-      std::shared_ptr<FinishableWritableStream<GrpcStream, FlightReadT>> writer,
-      GrpcStreamWriter<ProtoReadT, FlightReadT>* stream_writer)
-      : descriptor_(descriptor),
-        rpc_(rpc),
+  explicit ClientPutPayloadWriter(std::shared_ptr<internal::ClientDataStream> stream,

Review comment:
       `explicit` can be removed

##########
File path: cpp/src/arrow/flight/client.cc
##########
@@ -477,15 +334,9 @@ class GrpcIpcMessageReader : public ipc::MessageReader {
   }
 
  private:
-  // The RPC context lifetime must be coupled to the ClientReader
-  std::shared_ptr<ClientRpc> rpc_;
+  std::shared_ptr<internal::ClientDataStream> stream_;
+  std::shared_ptr<internal::PeekableFlightDataReader> peekable_reader_;
   std::shared_ptr<MemoryManager> memory_manager_;
-  // Guard reads with a mutex to prevent concurrent reads if the write
-  // side calls Finish(). Nullable as DoGet doesn't need this.
-  std::shared_ptr<std::mutex> read_mutex_;

Review comment:
       The lock is not necessary now?

##########
File path: cpp/src/arrow/flight/transport_impl.h
##########
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Internal (but not private) interface for implementing alternate
+// transports in Flight.
+//
+// EXPERIMENTAL. Subject to change.
+
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/visibility.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow {
+namespace ipc {
+class Message;
+}
+namespace flight {
+namespace internal {
+
+/// Internal, not user-visible type used for memory-efficient reads from gRPC

Review comment:
       not only `gRPC`?

##########
File path: cpp/src/arrow/flight/transport_impl.h
##########
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Internal (but not private) interface for implementing alternate
+// transports in Flight.
+//
+// EXPERIMENTAL. Subject to change.
+
+#pragma once
+
+#include <chrono>
+#include <functional>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/flight/type_fwd.h"
+#include "arrow/flight/visibility.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow {
+namespace ipc {
+class Message;
+}
+namespace flight {
+namespace internal {
+
+/// Internal, not user-visible type used for memory-efficient reads from gRPC
+/// stream
+struct FlightData {
+  /// Used only for puts, may be null
+  std::unique_ptr<FlightDescriptor> descriptor;
+
+  /// Non-length-prefixed Message header as described in format/Message.fbs
+  std::shared_ptr<Buffer> metadata;
+
+  /// Application-defined metadata
+  std::shared_ptr<Buffer> app_metadata;
+
+  /// Message body
+  std::shared_ptr<Buffer> body;
+
+  /// Open IPC message from the metadata and body
+  ::arrow::Result<std::unique_ptr<ipc::Message>> OpenMessage();
+};
+
+/// \brief An transport-specific interface for reading/writing Arrow data.
+///
+/// New transports will implement this to read/write IPC payloads to
+/// the underlying stream.
+class ARROW_FLIGHT_EXPORT TransportDataStream {
+ public:
+  virtual ~TransportDataStream() = default;
+  /// \brief Attemnpt to read the next FlightData message.
+  ///
+  /// \return success true if data was populated, false if there was
+  ///   an error. For clients, the error can be retrieved from Finish.
+  virtual bool ReadData(FlightData* data);

Review comment:
       Can this also return `Status`?
   Looks it's called by `PeekableFlightDataReader` and error message is not populated there, so `Status`is not necessary?

##########
File path: cpp/src/arrow/flight/server.cc
##########
@@ -442,20 +196,69 @@ class GrpcAddCallHeaders : public AddCallHeaders {
   grpc::ServerContext* context_;
 };
 
+class GetDataStream : public internal::TransportDataStream {
+ public:
+  explicit GetDataStream(ServerWriter<pb::FlightData>* writer) : writer_(writer) {}
+
+  Status WriteData(const FlightPayload& payload) override {
+    return internal::WritePayload(payload, writer_);
+  }
+
+ private:
+  ServerWriter<pb::FlightData>* writer_;
+};
+
+class PutDataStream final : public internal::TransportDataStream {
+ public:
+  explicit PutDataStream(grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* stream)
+      : stream_(stream) {}
+
+  bool ReadData(internal::FlightData* data) override {
+    return internal::ReadPayload(&*stream_, data);
+  }
+  Status WritePutMetadata(const Buffer& metadata) override {
+    pb::PutResult message{};
+    message.set_app_metadata(metadata.data(), metadata.size());
+    if (stream_->Write(message)) {
+      return Status::OK();
+    }
+    return Status::IOError("Unknown error writing metadata.");
+  }
+
+ private:
+  grpc::ServerReaderWriter<pb::PutResult, pb::FlightData>* stream_;
+};
+
+class ExchangeDataStream final : public internal::TransportDataStream {
+ public:
+  explicit ExchangeDataStream(
+      grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream)
+      : stream_(stream) {}
+
+  bool ReadData(internal::FlightData* data) override {
+    return internal::ReadPayload(&*stream_, data);
+  }
+  Status WriteData(const FlightPayload& payload) override {
+    return internal::WritePayload(payload, stream_);
+  }
+
+ private:
+  grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream_;
+};
+
 // This class glues an implementation of FlightServerBase together with the
 // gRPC service definition, so the latter is not exposed in the public API
-class FlightServiceImpl : public FlightService::Service {
+class FlightGrpcServiceImpl : public FlightService::Service {
  public:
-  explicit FlightServiceImpl(
+  explicit FlightGrpcServiceImpl(

Review comment:
       `explicit` can be removed

##########
File path: cpp/src/arrow/flight/transport_server_impl.cc
##########
@@ -0,0 +1,327 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/transport_impl.h"
+
+#include <unordered_map>
+
+#include "arrow/buffer.h"
+#include "arrow/flight/serialization_internal.h"
+#include "arrow/flight/server.h"
+#include "arrow/flight/types.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+
+namespace arrow {
+namespace flight {
+namespace internal {
+
+namespace {
+class TransportIpcMessageReader : public ipc::MessageReader {
+ public:
+  explicit TransportIpcMessageReader(

Review comment:
       `explicit` is not necessary

##########
File path: cpp/src/arrow/flight/transport_impl.cc
##########
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/flight/transport_impl.h"
+
+#include <unordered_map>
+
+#include "arrow/flight/client_auth.h"
+#include "arrow/result.h"
+#include "arrow/status.h"
+#include "arrow/util/make_unique.h"
+
+namespace arrow {
+namespace flight {
+namespace internal {
+
+bool TransportDataStream::ReadData(internal::FlightData*) { return false; }
+Status TransportDataStream::WriteData(const FlightPayload&) {
+  return Status::NotImplemented("Writing data for this stream");
+}
+Status TransportDataStream::WritePutMetadata(const Buffer&) {
+  return Status::NotImplemented("Writing put metadata for this stream");
+}
+Status TransportDataStream::WritesDone() { return Status::OK(); }
+bool ClientDataStream::ReadPutMetadata(std::shared_ptr<Buffer>*) { return false; }
+Status ClientDataStream::Finish(Status st) {
+  auto server_status = Finish();
+  if (server_status.ok()) return st;
+
+  return Status::FromDetailAndArgs(server_status.code(), server_status.detail(),
+                                   server_status.message(),
+                                   ". Client context: ", st.ToString());
+}
+
+Status ClientTransportImpl::Authenticate(
+    const FlightCallOptions& options, std::unique_ptr<ClientAuthHandler> auth_handler) {
+  return Status::NotImplemented("Authenticate for this transport");
+}
+arrow::Result<std::pair<std::string, std::string>>
+ClientTransportImpl::AuthenticateBasicToken(const FlightCallOptions& options,
+                                            const std::string& username,
+                                            const std::string& password) {
+  return Status::NotImplemented("AuthenticateBasicToken for this transport");
+}
+Status ClientTransportImpl::DoAction(const FlightCallOptions& options,
+                                     const Action& action,
+                                     std::unique_ptr<ResultStream>* results) {
+  return Status::NotImplemented("DoAction for this transport");
+}
+Status ClientTransportImpl::ListActions(const FlightCallOptions& options,
+                                        std::vector<ActionType>* actions) {
+  return Status::NotImplemented("ListActions for this transport");
+}
+Status ClientTransportImpl::GetFlightInfo(const FlightCallOptions& options,
+                                          const FlightDescriptor& descriptor,
+                                          std::unique_ptr<FlightInfo>* info) {
+  return Status::NotImplemented("GetFlightInfo for this transport");
+}
+Status ClientTransportImpl::GetSchema(const FlightCallOptions& options,
+                                      const FlightDescriptor& descriptor,
+                                      std::unique_ptr<SchemaResult>* schema_result) {
+  return Status::NotImplemented("GetSchema for this transport");
+}
+Status ClientTransportImpl::ListFlights(const FlightCallOptions& options,
+                                        const Criteria& criteria,
+                                        std::unique_ptr<FlightListing>* listing) {
+  return Status::NotImplemented("ListFlights for this transport");
+}
+Status ClientTransportImpl::DoGet(const FlightCallOptions& options, const Ticket& ticket,
+                                  std::unique_ptr<ClientDataStream>* stream) {
+  return Status::NotImplemented("DoGet for this transport");
+}
+Status ClientTransportImpl::DoPut(const FlightCallOptions& options,
+                                  std::unique_ptr<ClientDataStream>* stream) {
+  return Status::NotImplemented("DoPut for this transport");
+}
+Status ClientTransportImpl::DoExchange(const FlightCallOptions& options,
+                                       std::unique_ptr<ClientDataStream>* stream) {
+  return Status::NotImplemented("DoExchange for this transport");
+}
+
+class TransportImplRegistry::Impl {
+ public:
+  arrow::Result<std::unique_ptr<ClientTransportImpl>> MakeClientImpl(
+      const std::string& scheme) {
+    auto it = client_factories_.find(scheme);
+    if (it == client_factories_.end()) {
+      return Status::KeyError("No client transport implementation for ", scheme);
+    }
+    return it->second();
+  }
+  arrow::Result<std::unique_ptr<ServerTransportImpl>> MakeServerImpl(
+      const std::string& scheme) {
+    auto it = server_factories_.find(scheme);
+    if (it == server_factories_.end()) {
+      return Status::KeyError("No server transport implementation for ", scheme);
+    }
+    return it->second();
+  }
+  Status RegisterClient(const std::string& scheme, ClientFactory factory) {
+    auto it = client_factories_.insert({scheme, std::move(factory)});
+    if (!it.second) {
+      return Status::Invalid("Client transport already registered for ", scheme);
+    }
+    return Status::OK();
+  }
+  Status RegisterServer(const std::string& scheme, ServerFactory factory) {
+    auto it = server_factories_.insert({scheme, std::move(factory)});
+    if (!it.second) {
+      return Status::Invalid("Server transport already registered for ", scheme);
+    }
+    return Status::OK();
+  }
+
+ private:
+  std::unordered_map<std::string, TransportImplRegistry::ClientFactory> client_factories_;
+  std::unordered_map<std::string, TransportImplRegistry::ServerFactory> server_factories_;
+};
+
+TransportImplRegistry::TransportImplRegistry() {
+  impl_ = arrow::internal::make_unique<Impl>();
+}
+TransportImplRegistry::~TransportImplRegistry() = default;
+arrow::Result<std::unique_ptr<ClientTransportImpl>> TransportImplRegistry::MakeClientImpl(
+    const std::string& scheme) {
+  return impl_->MakeClientImpl(scheme);
+}
+arrow::Result<std::unique_ptr<ServerTransportImpl>> TransportImplRegistry::MakeServerImpl(
+    const std::string& scheme) {
+  return impl_->MakeServerImpl(scheme);
+}
+Status TransportImplRegistry::RegisterClient(const std::string& scheme,
+                                             ClientFactory factory) {
+  return impl_->RegisterClient(scheme, std::move(factory));
+}
+Status TransportImplRegistry::RegisterServer(const std::string& scheme,
+                                             ServerFactory factory) {
+  return impl_->RegisterServer(scheme, std::move(factory));
+}
+
+TransportImplRegistry* GetDefaultTransportImplRegistry() {
+  static std::unique_ptr<TransportImplRegistry> kRegistry =
+      arrow::internal::make_unique<TransportImplRegistry>();
+  return kRegistry.get();

Review comment:
       `unique_ptr` looks not necessary, return address of a static instance?

##########
File path: cpp/src/arrow/flight/client.cc
##########
@@ -428,22 +298,18 @@ class GrpcClientAuthReader : public ClientAuthReader {
       stream_;
 };
 
-// An ipc::MessageReader that adapts any readable gRPC stream
-// returning FlightData.
-template <typename Reader>
-class GrpcIpcMessageReader : public ipc::MessageReader {
+/// \brief An ipc::MessageReader adapting the Flight ClientDataStream interface.
+///
+/// In order to support app_metadata and reuse the existing IPC
+/// infrastructure, this takes a pointer to a buffer (provided by the
+/// FlightStreamReader implementation) and upon reading a message,
+/// updates that buffer with the one read from the server.
+class IpcMessageReader : public ipc::MessageReader {

Review comment:
       Is this file grpc only or some classes are general?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org