You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/02/20 09:54:22 UTC
[arrow] branch master updated: ARROW-4562: [C++] Avoid copies when
serializing Flight data
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6c4118b ARROW-4562: [C++] Avoid copies when serializing Flight data
6c4118b is described below
commit 6c4118b274cadf044b2d0581401a018f0a438205
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Wed Feb 20 10:54:12 2019 +0100
ARROW-4562: [C++] Avoid copies when serializing Flight data
Also massage the Flight headers to avoid unnecessary includes and declarations.
Author: Antoine Pitrou <an...@python.org>
Closes #3705 from pitrou/ARROW-4562-no-copy-grpc-serialization and squashes the following commits:
b24194baa <Antoine Pitrou> ARROW-4562: Avoid copies when serializing Flight data
---
cpp/src/arrow/flight/client.cc | 2 +-
cpp/src/arrow/flight/customize_protobuf.h | 39 ++++--
cpp/src/arrow/flight/flight-test.cc | 6 -
cpp/src/arrow/flight/serialization-internal.cc | 169 ++++++++++++++-----------
cpp/src/arrow/flight/serialization-internal.h | 62 +--------
cpp/src/arrow/flight/server.cc | 4 +-
6 files changed, 129 insertions(+), 153 deletions(-)
diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc
index 9925c25..2563126 100644
--- a/cpp/src/arrow/flight/client.cc
+++ b/cpp/src/arrow/flight/client.cc
@@ -68,7 +68,7 @@ class FlightStreamReader : public RecordBatchReader {
std::shared_ptr<Schema> schema() const override { return schema_; }
Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
- FlightData data;
+ internal::FlightData data;
if (stream_finished_) {
*out = nullptr;
diff --git a/cpp/src/arrow/flight/customize_protobuf.h b/cpp/src/arrow/flight/customize_protobuf.h
index 1f69253..9f5dfab 100644
--- a/cpp/src/arrow/flight/customize_protobuf.h
+++ b/cpp/src/arrow/flight/customize_protobuf.h
@@ -31,12 +31,33 @@
#include <grpcpp/impl/codegen/proto_utils.h>
+namespace grpc {
+
+class ByteBuffer;
+
+} // namespace grpc
+
namespace arrow {
namespace flight {
-struct FlightData;
struct FlightPayload;
+namespace internal {
+
+struct FlightData;
+
+// Those two functions are defined in serialization-internal.cc
+
+// Write FlightData to a grpc::ByteBuffer without extra copying
+grpc::Status FlightDataSerialize(const FlightPayload& msg, grpc::ByteBuffer* out,
+ bool* own_buffer);
+
+// Read internal::FlightData from grpc::ByteBuffer containing FlightData
+// protobuf without copying
+grpc::Status FlightDataDeserialize(grpc::ByteBuffer* buffer, FlightData* out);
+
+} // namespace internal
+
namespace protocol {
class FlightData;
@@ -47,15 +68,6 @@ class FlightData;
namespace grpc {
-using arrow::flight::FlightData;
-using arrow::flight::FlightPayload;
-
-class ByteBuffer;
-class Status;
-
-Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out, bool* own_buffer);
-Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out);
-
// This class provides a protobuf serializer. It translates between protobuf
// objects and grpc_byte_buffers. More information about SerializationTraits can
// be found in include/grpcpp/impl/codegen/serialization_traits.h.
@@ -81,12 +93,13 @@ class SerializationTraits<T, typename std::enable_if<std::is_same<
public:
static Status Serialize(const grpc::protobuf::Message& msg, ByteBuffer* bb,
bool* own_buffer) {
- return FlightDataSerialize(*reinterpret_cast<const FlightPayload*>(&msg), bb,
- own_buffer);
+ return arrow::flight::internal::FlightDataSerialize(
+ *reinterpret_cast<const arrow::flight::FlightPayload*>(&msg), bb, own_buffer);
}
static Status Deserialize(ByteBuffer* buffer, grpc::protobuf::Message* msg) {
- return FlightDataDeserialize(buffer, reinterpret_cast<FlightData*>(msg));
+ return arrow::flight::internal::FlightDataDeserialize(
+ buffer, reinterpret_cast<arrow::flight::internal::FlightData*>(msg));
}
};
diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc
index 9268aec..d1ab0aa 100644
--- a/cpp/src/arrow/flight/flight-test.cc
+++ b/cpp/src/arrow/flight/flight-test.cc
@@ -15,12 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef _WIN32
-#include <sys/stat.h>
-#include <sys/wait.h>
-#include <unistd.h>
-#endif
-
#include <chrono>
#include <cstdint>
#include <cstdio>
diff --git a/cpp/src/arrow/flight/serialization-internal.cc b/cpp/src/arrow/flight/serialization-internal.cc
index 0c031e0..d80c0c7 100644
--- a/cpp/src/arrow/flight/serialization-internal.cc
+++ b/cpp/src/arrow/flight/serialization-internal.cc
@@ -17,33 +17,53 @@
#include "arrow/flight/serialization-internal.h"
+#include <cstdint>
+#include <limits>
#include <string>
+#include <vector>
+
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/wire_format_lite.h>
+#include <grpc/byte_buffer_reader.h>
+#include <grpcpp/grpcpp.h>
+#include <grpcpp/impl/codegen/proto_utils.h>
#include "arrow/buffer.h"
#include "arrow/flight/server.h"
#include "arrow/ipc/writer.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/logging.h"
+
+namespace pb = arrow::flight::protocol;
+
+static constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
namespace arrow {
namespace flight {
namespace internal {
-bool ReadBytesZeroCopy(const std::shared_ptr<arrow::Buffer>& source_data,
- CodedInputStream* input, std::shared_ptr<arrow::Buffer>* out) {
+using arrow::ipc::internal::IpcPayload;
+
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::ArrayOutputStream;
+using google::protobuf::io::CodedInputStream;
+using google::protobuf::io::CodedOutputStream;
+
+using grpc::ByteBuffer;
+
+bool ReadBytesZeroCopy(const std::shared_ptr<Buffer>& source_data,
+ CodedInputStream* input, std::shared_ptr<Buffer>* out) {
uint32_t length;
if (!input->ReadVarint32(&length)) {
return false;
}
- *out = arrow::SliceBuffer(source_data, input->CurrentPosition(),
- static_cast<int64_t>(length));
+ *out = SliceBuffer(source_data, input->CurrentPosition(), static_cast<int64_t>(length));
return input->Skip(static_cast<int>(length));
}
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-
// Internal wrapper for gRPC ByteBuffer so its memory can be exposed to Arrow
// consumers with zero-copy
-class GrpcBuffer : public arrow::MutableBuffer {
+class GrpcBuffer : public MutableBuffer {
public:
GrpcBuffer(grpc_slice slice, bool incref)
: MutableBuffer(GRPC_SLICE_START_PTR(slice),
@@ -55,8 +75,7 @@ class GrpcBuffer : public arrow::MutableBuffer {
grpc_slice_unref(slice_);
}
- static arrow::Status Wrap(grpc::ByteBuffer* cpp_buf,
- std::shared_ptr<arrow::Buffer>* out) {
+ static Status Wrap(ByteBuffer* cpp_buf, std::shared_ptr<Buffer>* out) {
// These types are guaranteed by static assertions in gRPC to have the same
// in-memory representation
@@ -80,7 +99,7 @@ class GrpcBuffer : public arrow::MutableBuffer {
// us back a new slice with the refcount already incremented.
grpc_byte_buffer_reader reader;
if (!grpc_byte_buffer_reader_init(&reader, buffer)) {
- return arrow::Status::IOError("Internal gRPC error reading from ByteBuffer");
+ return Status::IOError("Internal gRPC error reading from ByteBuffer");
}
grpc_slice slice = grpc_byte_buffer_reader_readall(&reader);
grpc_byte_buffer_reader_destroy(&reader);
@@ -89,37 +108,42 @@ class GrpcBuffer : public arrow::MutableBuffer {
*out = std::make_shared<GrpcBuffer>(slice, false);
}
- return arrow::Status::OK();
+ return Status::OK();
}
private:
grpc_slice slice_;
};
-} // namespace internal
-} // namespace flight
-} // namespace arrow
-
-namespace grpc {
+// Destructor callback for grpc::Slice
+static void ReleaseBuffer(void* buf_ptr) {
+ delete reinterpret_cast<std::shared_ptr<Buffer>*>(buf_ptr);
+}
-using arrow::flight::FlightData;
-using arrow::flight::internal::GrpcBuffer;
-using arrow::flight::internal::ReadBytesZeroCopy;
+// Initialize gRPC Slice from arrow Buffer
+grpc::Slice SliceFromBuffer(const std::shared_ptr<Buffer>& buf) {
+ // Allocate persistent shared_ptr to control Buffer lifetime
+ auto ptr = new std::shared_ptr<Buffer>(buf);
+ grpc::Slice slice(const_cast<uint8_t*>(buf->data()), static_cast<size_t>(buf->size()),
+ &ReleaseBuffer, ptr);
+ // Make sure no copy was done (some grpc::Slice() constructors do an implicit memcpy)
+ DCHECK_EQ(slice.begin(), buf->data());
+ return slice;
+}
-using google::protobuf::internal::WireFormatLite;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
+static const uint8_t kPaddingBytes[8] = {0, 0, 0, 0, 0, 0, 0, 0};
-Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out, bool* own_buffer) {
- size_t total_size = 0;
+grpc::Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out,
+ bool* own_buffer) {
+ size_t body_size = 0;
+ size_t header_size = 0;
// Write the descriptor if present
int32_t descriptor_size = 0;
if (msg.descriptor != nullptr) {
DCHECK_LT(msg.descriptor->size(), kInt32Max);
descriptor_size = static_cast<int32_t>(msg.descriptor->size());
- total_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size);
+ header_size += 1 + WireFormatLite::LengthDelimitedSize(descriptor_size);
}
const arrow::ipc::internal::IpcPayload& ipc_msg = msg.ipc_message;
@@ -128,98 +152,94 @@ Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out, bool* own_
const int32_t metadata_size = static_cast<int32_t>(ipc_msg.metadata->size());
// 1 byte for metadata tag
- total_size += 1 + WireFormatLite::LengthDelimitedSize(metadata_size);
+ header_size += 1 + WireFormatLite::LengthDelimitedSize(metadata_size);
- int64_t body_size = 0;
for (const auto& buffer : ipc_msg.body_buffers) {
// Buffer may be null when the row length is zero, or when all
// entries are invalid.
if (!buffer) continue;
- body_size += buffer->size();
-
- const int64_t remainder = buffer->size() % 8;
- if (remainder) {
- body_size += 8 - remainder;
- }
+ body_size += static_cast<size_t>(BitUtil::RoundUpToMultipleOf8(buffer->size()));
}
// 2 bytes for body tag
// Only written when there are body buffers
if (ipc_msg.body_length > 0) {
- total_size += 2 + WireFormatLite::LengthDelimitedSize(static_cast<size_t>(body_size));
+ // We write the body tag in the header but not the actual body data
+ header_size += 2 + WireFormatLite::LengthDelimitedSize(body_size) - body_size;
}
// TODO(wesm): messages over 2GB unlikely to be yet supported
- if (total_size > kInt32Max) {
+ if (body_size > kInt32Max) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Cannot send record batches exceeding 2GB yet");
}
- // Allocate slice, assign to output buffer
- grpc::Slice slice(total_size);
+ // Allocate and initialize slices
+ std::vector<grpc::Slice> slices;
+ grpc::Slice header_slice(header_size);
+ slices.push_back(header_slice);
// XXX(wesm): for debugging
// std::cout << "Writing record batch with total size " << total_size << std::endl;
- ArrayOutputStream writer(const_cast<uint8_t*>(slice.begin()),
- static_cast<int>(slice.size()));
- CodedOutputStream pb_stream(&writer);
+ ArrayOutputStream header_writer(const_cast<uint8_t*>(header_slice.begin()),
+ static_cast<int>(header_slice.size()));
+ CodedOutputStream header_stream(&header_writer);
// Write descriptor
if (msg.descriptor != nullptr) {
WireFormatLite::WriteTag(pb::FlightData::kFlightDescriptorFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream);
- pb_stream.WriteVarint32(descriptor_size);
- pb_stream.WriteRawMaybeAliased(msg.descriptor->data(),
- static_cast<int>(msg.descriptor->size()));
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
+ header_stream.WriteVarint32(descriptor_size);
+ header_stream.WriteRawMaybeAliased(msg.descriptor->data(),
+ static_cast<int>(msg.descriptor->size()));
}
// Write header
WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream);
- pb_stream.WriteVarint32(metadata_size);
- pb_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
- static_cast<int>(ipc_msg.metadata->size()));
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
+ header_stream.WriteVarint32(metadata_size);
+ header_stream.WriteRawMaybeAliased(ipc_msg.metadata->data(),
+ static_cast<int>(ipc_msg.metadata->size()));
- // Don't write tag if there are no body buffers
+ // Don't write body tag if there are no body buffers
if (ipc_msg.body_length > 0) {
- // Write body
+ // Write body tag
WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber,
- WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream);
- pb_stream.WriteVarint32(static_cast<uint32_t>(body_size));
-
- constexpr uint8_t kPaddingBytes[8] = {0};
+ WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &header_stream);
+ header_stream.WriteVarint32(static_cast<uint32_t>(body_size));
+ // Enqueue body buffers for writing, without copying
for (const auto& buffer : ipc_msg.body_buffers) {
// Buffer may be null when the row length is zero, or when all
// entries are invalid.
if (!buffer) continue;
- pb_stream.WriteRawMaybeAliased(buffer->data(), static_cast<int>(buffer->size()));
+ slices.push_back(SliceFromBuffer(buffer));
// Write padding if not multiple of 8
- const int remainder = static_cast<int>(buffer->size() % 8);
+ const auto remainder = static_cast<int>(
+ BitUtil::RoundUpToMultipleOf8(buffer->size()) - buffer->size());
if (remainder) {
- pb_stream.WriteRawMaybeAliased(kPaddingBytes, 8 - remainder);
+ slices.push_back(grpc::Slice(kPaddingBytes, remainder));
}
}
}
- DCHECK_EQ(static_cast<int>(total_size), pb_stream.ByteCount());
+ DCHECK_EQ(static_cast<int>(header_size), header_stream.ByteCount());
- // Hand off the slice to the returned ByteBuffer
- grpc::ByteBuffer tmp(&slice, 1);
- out->Swap(&tmp);
+ // Hand off the slices to the returned ByteBuffer
+ *out = grpc::ByteBuffer(slices.data(), slices.size());
*own_buffer = true;
return grpc::Status::OK;
}
// Read internal::FlightData from grpc::ByteBuffer containing FlightData
// protobuf without copying
-Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
+grpc::Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
if (!buffer) {
- return Status(StatusCode::INTERNAL, "No payload");
+ return grpc::Status(grpc::StatusCode::INTERNAL, "No payload");
}
std::shared_ptr<arrow::Buffer> wrapped_buffer;
@@ -240,15 +260,16 @@ Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
pb::FlightDescriptor pb_descriptor;
uint32_t length;
if (!pb_stream.ReadVarint32(&length)) {
- return Status(StatusCode::INTERNAL,
- "Unable to parse length of FlightDescriptor");
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Unable to parse length of FlightDescriptor");
}
// Can't use ParseFromCodedStream as this reads the entire
// rest of the stream into the descriptor command field.
std::string buffer;
pb_stream.ReadString(&buffer, length);
if (!pb_descriptor.ParseFromString(buffer)) {
- return Status(StatusCode::INTERNAL, "Unable to parse FlightDescriptor");
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Unable to parse FlightDescriptor");
}
arrow::flight::FlightDescriptor descriptor;
GRPC_RETURN_NOT_OK(
@@ -257,12 +278,14 @@ Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
} break;
case pb::FlightData::kDataHeaderFieldNumber: {
if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) {
- return Status(StatusCode::INTERNAL, "Unable to read FlightData metadata");
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Unable to read FlightData metadata");
}
} break;
case pb::FlightData::kDataBodyFieldNumber: {
if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) {
- return Status(StatusCode::INTERNAL, "Unable to read FlightData body");
+ return grpc::Status(grpc::StatusCode::INTERNAL,
+ "Unable to read FlightData body");
}
} break;
default:
@@ -274,7 +297,9 @@ Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out) {
// TODO(wesm): Where and when should we verify that the FlightData is not
// malformed or missing components?
- return Status::OK;
+ return grpc::Status::OK;
}
-} // namespace grpc
+} // namespace internal
+} // namespace flight
+} // namespace arrow
diff --git a/cpp/src/arrow/flight/serialization-internal.h b/cpp/src/arrow/flight/serialization-internal.h
index d8e7aad..4576290 100644
--- a/cpp/src/arrow/flight/serialization-internal.h
+++ b/cpp/src/arrow/flight/serialization-internal.h
@@ -23,32 +23,19 @@
// Enable gRPC customizations
#include "arrow/flight/protocol-internal.h" // IWYU pragma: keep
-#include <cstdint>
-#include <limits>
#include <memory>
#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/wire_format_lite.h>
-#include <grpcpp/grpcpp.h>
-#include <grpcpp/impl/codegen/proto_utils.h>
-#include "grpc/byte_buffer_reader.h"
-
-#include "arrow/ipc/writer.h"
-#include "arrow/record_batch.h"
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
#include "arrow/flight/internal.h"
#include "arrow/flight/types.h"
-namespace pb = arrow::flight::protocol;
+namespace arrow {
-constexpr int64_t kInt32Max = std::numeric_limits<int32_t>::max();
+class Buffer;
-namespace arrow {
namespace flight {
+namespace internal {
/// Internal, not user-visible type used for memory-efficient reads from gRPC
/// stream
@@ -63,49 +50,6 @@ struct FlightData {
std::shared_ptr<Buffer> body;
};
-namespace internal {
-
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-
-bool ReadBytesZeroCopy(const std::shared_ptr<arrow::Buffer>& source_data,
- CodedInputStream* input, std::shared_ptr<arrow::Buffer>* out);
-
} // namespace internal
} // namespace flight
} // namespace arrow
-
-namespace grpc {
-
-using arrow::flight::FlightData;
-
-using google::protobuf::internal::WireFormatLite;
-using google::protobuf::io::ArrayOutputStream;
-using google::protobuf::io::CodedInputStream;
-using google::protobuf::io::CodedOutputStream;
-
-// Helper to log status code, as gRPC doesn't expose why
-// (de)serialization fails
-inline Status FailSerialization(Status status) {
- if (!status.ok()) {
- ARROW_LOG(WARNING) << "Error deserializing Flight message: "
- << status.error_message();
- }
- return status;
-}
-
-inline arrow::Status FailSerialization(arrow::Status status) {
- if (!status.ok()) {
- ARROW_LOG(WARNING) << "Error deserializing Flight message: " << status.ToString();
- }
- return status;
-}
-
-// Write FlightData to a grpc::ByteBuffer without extra copying
-Status FlightDataSerialize(const FlightPayload& msg, ByteBuffer* out, bool* own_buffer);
-
-// Read internal::FlightData from grpc::ByteBuffer containing FlightData
-// protobuf without copying
-Status FlightDataDeserialize(ByteBuffer* buffer, FlightData* out);
-
-} // namespace grpc
diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc
index 0b95e53..fe9c1bb 100644
--- a/cpp/src/arrow/flight/server.cc
+++ b/cpp/src/arrow/flight/server.cc
@@ -73,7 +73,7 @@ class FlightMessageReaderImpl : public FlightMessageReader {
return Status::OK();
}
- FlightData data;
+ internal::FlightData data;
// Pretend to be pb::FlightData and intercept in SerializationTraits
if (reader_->Read(reinterpret_cast<pb::FlightData*>(&data))) {
std::unique_ptr<ipc::Message> message;
@@ -209,7 +209,7 @@ class FlightServiceImpl : public FlightService::Service {
grpc::Status DoPut(ServerContext* context, grpc::ServerReader<pb::FlightData>* reader,
pb::PutResult* response) {
// Get metadata
- FlightData data;
+ internal::FlightData data;
if (reader->Read(reinterpret_cast<pb::FlightData*>(&data))) {
// Message only lives as long as data
std::unique_ptr<ipc::Message> message;