You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:28:06 UTC

[GitHub] [arrow] lidavidm commented on a change in pull request #6656: ARROW-8297: [FlightRPC][C++] Implement Flight DoExchange for C++

lidavidm commented on a change in pull request #6656:
URL: https://github.com/apache/arrow/pull/6656#discussion_r430370738



##########
File path: cpp/src/arrow/flight/server.cc
##########
@@ -240,6 +273,90 @@ class GrpcServerAuthSender : public ServerAuthSender {
   grpc::ServerReaderWriter<pb::HandshakeResponse, pb::HandshakeRequest>* stream_;
 };
 
+class GrpcMessageWriter : public FlightMessageWriter {

Review comment:
       I renamed this `DoExchangeMessageWriter` since that's the only place it's used.

##########
File path: cpp/src/arrow/flight/client.cc
##########
@@ -91,6 +85,134 @@ struct ClientRpc {
   }
 };
 
+/// Helper that manages Finish() of a gRPC stream.
+///
+/// When we encounter an error (e.g. could not decode an IPC message),
+/// we want to provide both the client-side error context and any
+/// available server-side context. This helper helps wrap up that
+/// logic.
+///
+/// This class protects the stream with a flag (so that Finish is
+/// idempotent), and drains the read side (so that Finish won't hang).
+template <typename Stream, typename ReadT>

Review comment:
       Through `FinishableWritableStream` it gets templated with both `internal::FlightData` and `pb::PutResult`, so we do need the template parameter unfortunately.

##########
File path: cpp/src/arrow/flight/server.cc
##########
@@ -240,6 +273,90 @@ class GrpcServerAuthSender : public ServerAuthSender {
   grpc::ServerReaderWriter<pb::HandshakeResponse, pb::HandshakeRequest>* stream_;
 };
 
+class GrpcMessageWriter : public FlightMessageWriter {
+ public:
+  explicit GrpcMessageWriter(
+      grpc::ServerReaderWriter<pb::FlightData, pb::FlightData>* stream)
+      : stream_(stream), ipc_options_(ipc::IpcOptions::Defaults()) {}
+
+  Status Begin(const std::shared_ptr<Schema>& schema) override {
+    if (started_) {
+      return Status::Invalid("This writer has already been started.");
+    }
+    started_ = true;
+
+    FlightPayload schema_payload;
+    RETURN_NOT_OK(ipc::internal::GetSchemaPayload(
+        *schema, ipc_options_, &dictionary_memo_, &schema_payload.ipc_message));
+    return WritePayload(schema_payload);
+  }
+
+  Status WriteRecordBatch(const RecordBatch& batch) override {
+    return WriteWithMetadata(batch, nullptr);
+  }
+
+  Status WriteMetadata(std::shared_ptr<Buffer> app_metadata) override {
+    FlightPayload payload{};
+    payload.app_metadata = app_metadata;
+    return WritePayload(payload);
+  }
+
+  Status WriteWithMetadata(const RecordBatch& batch,
+                           std::shared_ptr<Buffer> app_metadata) override {
+    RETURN_NOT_OK(CheckStarted());
+    RETURN_NOT_OK(EnsureDictionariesWritten(batch));
+    FlightPayload payload{};
+    if (app_metadata) {
+      payload.app_metadata = app_metadata;
+    }
+    RETURN_NOT_OK(
+        ipc::internal::GetRecordBatchPayload(batch, ipc_options_, &payload.ipc_message));
+    return WritePayload(payload);
+  }
+
+  Status Close() override {
+    // It's fine to Close() without writing data
+    return Status::OK();
+  }
+
+ private:
+  Status WritePayload(const FlightPayload& payload) {
+    if (!internal::WritePayload(payload, stream_)) {
+      // gRPC doesn't give us any way to find what the error was (if any).
+      return Status::IOError("Could not write payload to stream");
+    }
+    return Status::OK();
+  }
+
+  Status CheckStarted() {
+    if (!started_) {
+      return Status::Invalid("This writer is not started. Call Begin() with a schema");
+    }
+    return Status::OK();
+  }
+
+  Status EnsureDictionariesWritten(const RecordBatch& batch) {
+    if (dictionaries_written_) {
+      return Status::OK();
+    }
+    dictionaries_written_ = true;
+    RETURN_NOT_OK(ipc::CollectDictionaries(batch, &dictionary_memo_));
+    for (auto& pair : dictionary_memo_.id_to_dictionary()) {

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org