You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/25 02:56:00 UTC
[4/7] arrow git commit: ARROW-1219: [C++] Use Google C++ code
formatting
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 49c24c7..20fd280 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -58,8 +58,8 @@ static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion =
static constexpr flatbuf::MetadataVersion kMinMetadataVersion =
flatbuf::MetadataVersion_V3;
-static Status IntFromFlatbuffer(
- const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
+static Status IntFromFlatbuffer(const flatbuf::Int* int_data,
+ std::shared_ptr<DataType>* out) {
if (int_data->bitWidth() > 64) {
return Status::NotImplemented("Integers with more than 64 bits not implemented");
}
@@ -86,8 +86,8 @@ static Status IntFromFlatbuffer(
return Status::OK();
}
-static Status FloatFromFlatuffer(
- const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
+static Status FloatFromFlatuffer(const flatbuf::FloatingPoint* float_data,
+ std::shared_ptr<DataType>* out) {
if (float_data->precision() == flatbuf::Precision_HALF) {
*out = float16();
} else if (float_data->precision() == flatbuf::Precision_SINGLE) {
@@ -100,7 +100,7 @@ static Status FloatFromFlatuffer(
// Forward declaration
static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
- DictionaryMemo* dictionary_memo, FieldOffset* offset);
+ DictionaryMemo* dictionary_memo, FieldOffset* offset);
static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
@@ -111,7 +111,8 @@ static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) {
}
static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) {
+ std::vector<FieldOffset>* out_children,
+ DictionaryMemo* dictionary_memo) {
FieldOffset field;
for (int i = 0; i < type->num_children(); ++i) {
RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field));
@@ -121,16 +122,16 @@ static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type,
}
static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
- Offset* offset) {
+ std::vector<FieldOffset>* out_children,
+ DictionaryMemo* dictionary_memo, Offset* offset) {
RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
*offset = flatbuf::CreateList(fbb).Union();
return Status::OK();
}
static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
- Offset* offset) {
+ std::vector<FieldOffset>* out_children,
+ DictionaryMemo* dictionary_memo, Offset* offset) {
RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
*offset = flatbuf::CreateStruct_(fbb).Union();
return Status::OK();
@@ -140,7 +141,8 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type
// Union implementation
static Status UnionFromFlatbuffer(const flatbuf::Union* union_data,
- const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
+ const std::vector<std::shared_ptr<Field>>& children,
+ std::shared_ptr<DataType>* out) {
UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE
: UnionMode::DENSE;
@@ -163,8 +165,8 @@ static Status UnionFromFlatbuffer(const flatbuf::Union* union_data,
}
static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
- Offset* offset) {
+ std::vector<FieldOffset>* out_children,
+ DictionaryMemo* dictionary_memo, Offset* offset) {
RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
const auto& union_type = static_cast<const UnionType&>(*type);
@@ -224,15 +226,16 @@ static inline TimeUnit::type FromFlatbufferUnit(flatbuf::TimeUnit unit) {
}
static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
- const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
+ const std::vector<std::shared_ptr<Field>>& children,
+ std::shared_ptr<DataType>* out) {
switch (type) {
case flatbuf::Type_NONE:
return Status::Invalid("Type metadata cannot be none");
case flatbuf::Type_Int:
return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out);
case flatbuf::Type_FloatingPoint:
- return FloatFromFlatuffer(
- static_cast<const flatbuf::FloatingPoint*>(type_data), out);
+ return FloatFromFlatuffer(static_cast<const flatbuf::FloatingPoint*>(type_data),
+ out);
case flatbuf::Type_Binary:
*out = binary();
return Status::OK();
@@ -301,8 +304,8 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
*out = std::make_shared<StructType>(children);
return Status::OK();
case flatbuf::Type_Union:
- return UnionFromFlatbuffer(
- static_cast<const flatbuf::Union*>(type_data), children, out);
+ return UnionFromFlatbuffer(static_cast<const flatbuf::Union*>(type_data), children,
+ out);
default:
return Status::Invalid("Unrecognized type");
}
@@ -310,15 +313,17 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
// TODO(wesm): Convert this to visitor pattern
static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
- flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) {
+ std::vector<FieldOffset>* children,
+ std::vector<VectorLayoutOffset>* layout,
+ flatbuf::Type* out_type, DictionaryMemo* dictionary_memo,
+ Offset* offset) {
if (type->id() == Type::DICTIONARY) {
// In this library, the dictionary "type" is a logical construct. Here we
// pass through to the value type, as we've already captured the index
// type in the DictionaryEncoding metadata in the parent field
const auto& dict_type = static_cast<const DictionaryType&>(*type);
return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout,
- out_type, dictionary_memo, offset);
+ out_type, dictionary_memo, offset);
}
std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
@@ -436,7 +441,7 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
}
static Status TensorTypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- flatbuf::Type* out_type, Offset* offset) {
+ flatbuf::Type* out_type, Offset* offset) {
switch (type->id()) {
case Type::UINT8:
INT_TO_FB_CASE(8, false);
@@ -475,8 +480,8 @@ static Status TensorTypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>&
return Status::OK();
}
-static DictionaryOffset GetDictionaryEncoding(
- FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) {
+static DictionaryOffset GetDictionaryEncoding(FBB& fbb, const DictionaryType& type,
+ DictionaryMemo* memo) {
int64_t dictionary_id = memo->GetId(type.dictionary());
// We assume that the dictionary index type (as an integer) has already been
@@ -491,7 +496,7 @@ static DictionaryOffset GetDictionaryEncoding(
}
static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
- DictionaryMemo* dictionary_memo, FieldOffset* offset) {
+ DictionaryMemo* dictionary_memo, FieldOffset* offset) {
auto fb_name = fbb.CreateString(field->name());
flatbuf::Type type_enum;
@@ -500,8 +505,8 @@ static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
std::vector<FieldOffset> children;
std::vector<VectorLayoutOffset> layout;
- RETURN_NOT_OK(TypeToFlatbuffer(
- fbb, field->type(), &children, &layout, &type_enum, dictionary_memo, &type_offset));
+ RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type(), &children, &layout, &type_enum,
+ dictionary_memo, &type_offset));
auto fb_children = fbb.CreateVector(children);
auto fb_layout = fbb.CreateVector(layout);
@@ -513,13 +518,14 @@ static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
// TODO: produce the list of VectorTypes
*offset = flatbuf::CreateField(fbb, fb_name, field->nullable(), type_enum, type_offset,
- dictionary, fb_children, fb_layout);
+ dictionary, fb_children, fb_layout);
return Status::OK();
}
static Status FieldFromFlatbuffer(const flatbuf::Field* field,
- const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) {
+ const DictionaryMemo& dictionary_memo,
+ std::shared_ptr<Field>* out) {
std::shared_ptr<DataType> type;
const flatbuf::DictionaryEncoding* encoding = field->dictionary();
@@ -551,8 +557,8 @@ static Status FieldFromFlatbuffer(const flatbuf::Field* field,
return Status::OK();
}
-static Status FieldFromFlatbufferDictionary(
- const flatbuf::Field* field, std::shared_ptr<Field>* out) {
+static Status FieldFromFlatbufferDictionary(const flatbuf::Field* field,
+ std::shared_ptr<Field>* out) {
// Need an empty memo to pass down for constructing children
DictionaryMemo dummy_memo;
@@ -584,7 +590,8 @@ flatbuf::Endianness endianness() {
}
static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema,
- DictionaryMemo* dictionary_memo, flatbuffers::Offset<flatbuf::Schema>* out) {
+ DictionaryMemo* dictionary_memo,
+ flatbuffers::Offset<flatbuf::Schema>* out) {
/// Fields
std::vector<FieldOffset> field_offsets;
for (int i = 0; i < schema.num_fields(); ++i) {
@@ -609,8 +616,8 @@ static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema,
key_value_offsets.push_back(
flatbuf::CreateKeyValue(fbb, fbb.CreateString(key), fbb.CreateString(value)));
}
- *out = flatbuf::CreateSchema(
- fbb, endianness(), fb_offsets, fbb.CreateVector(key_value_offsets));
+ *out = flatbuf::CreateSchema(fbb, endianness(), fb_offsets,
+ fbb.CreateVector(key_value_offsets));
} else {
*out = flatbuf::CreateSchema(fbb, endianness(), fb_offsets);
}
@@ -631,15 +638,16 @@ static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr<Buffer>* out) {
}
static Status WriteFBMessage(FBB& fbb, flatbuf::MessageHeader header_type,
- flatbuffers::Offset<void> header, int64_t body_length, std::shared_ptr<Buffer>* out) {
- auto message = flatbuf::CreateMessage(
- fbb, kCurrentMetadataVersion, header_type, header, body_length);
+ flatbuffers::Offset<void> header, int64_t body_length,
+ std::shared_ptr<Buffer>* out) {
+ auto message = flatbuf::CreateMessage(fbb, kCurrentMetadataVersion, header_type, header,
+ body_length);
fbb.Finish(message);
return WriteFlatbufferBuilder(fbb, out);
}
-Status WriteSchemaMessage(
- const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) {
+Status WriteSchemaMessage(const Schema& schema, DictionaryMemo* dictionary_memo,
+ std::shared_ptr<Buffer>* out) {
FBB fbb;
flatbuffers::Offset<flatbuf::Schema> fb_schema;
RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
@@ -650,8 +658,8 @@ using FieldNodeVector =
flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>;
-static Status WriteFieldNodes(
- FBB& fbb, const std::vector<FieldMetadata>& nodes, FieldNodeVector* out) {
+static Status WriteFieldNodes(FBB& fbb, const std::vector<FieldMetadata>& nodes,
+ FieldNodeVector* out) {
std::vector<flatbuf::FieldNode> fb_nodes;
fb_nodes.reserve(nodes.size());
@@ -666,8 +674,8 @@ static Status WriteFieldNodes(
return Status::OK();
}
-static Status WriteBuffers(
- FBB& fbb, const std::vector<BufferMetadata>& buffers, BufferVector* out) {
+static Status WriteBuffers(FBB& fbb, const std::vector<BufferMetadata>& buffers,
+ BufferVector* out) {
std::vector<flatbuf::Buffer> fb_buffers;
fb_buffers.reserve(buffers.size());
@@ -680,8 +688,9 @@ static Status WriteBuffers(
}
static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
- const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- RecordBatchOffset* offset) {
+ const std::vector<FieldMetadata>& nodes,
+ const std::vector<BufferMetadata>& buffers,
+ RecordBatchOffset* offset) {
FieldNodeVector fb_nodes;
BufferVector fb_buffers;
@@ -693,17 +702,18 @@ static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
}
Status WriteRecordBatchMessage(int64_t length, int64_t body_length,
- const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out) {
+ const std::vector<FieldMetadata>& nodes,
+ const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
- return WriteFBMessage(
- fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), body_length, out);
+ return WriteFBMessage(fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(),
+ body_length, out);
}
-Status WriteTensorMessage(
- const Tensor& tensor, int64_t buffer_start_offset, std::shared_ptr<Buffer>* out) {
+Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
+ std::shared_ptr<Buffer>* out) {
using TensorDimOffset = flatbuffers::Offset<flatbuf::TensorDim>;
using TensorOffset = flatbuffers::Offset<flatbuf::Tensor>;
@@ -727,19 +737,20 @@ Status WriteTensorMessage(
TensorOffset fb_tensor =
flatbuf::CreateTensor(fbb, fb_type_type, fb_type, fb_shape, fb_strides, &buffer);
- return WriteFBMessage(
- fbb, flatbuf::MessageHeader_Tensor, fb_tensor.Union(), body_length, out);
+ return WriteFBMessage(fbb, flatbuf::MessageHeader_Tensor, fb_tensor.Union(),
+ body_length, out);
}
Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
- const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out) {
+ const std::vector<FieldMetadata>& nodes,
+ const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out) {
FBB fbb;
RecordBatchOffset record_batch;
RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union();
- return WriteFBMessage(
- fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch, body_length, out);
+ return WriteFBMessage(fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch,
+ body_length, out);
}
static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
@@ -754,8 +765,8 @@ FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
}
Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
- io::OutputStream* out) {
+ const std::vector<FileBlock>& record_batches,
+ DictionaryMemo* dictionary_memo, io::OutputStream* out) {
FBB fbb;
flatbuffers::Offset<flatbuf::Schema> fb_schema;
@@ -764,8 +775,8 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti
auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
- auto footer = flatbuf::CreateFooter(
- fbb, kCurrentMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
+ auto footer = flatbuf::CreateFooter(fbb, kCurrentMetadataVersion, fb_schema,
+ fb_dictionaries, fb_record_batches);
fbb.Finish(footer);
@@ -780,8 +791,8 @@ Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dicti
DictionaryMemo::DictionaryMemo() {}
// Returns KeyError if dictionary not found
-Status DictionaryMemo::GetDictionary(
- int64_t id, std::shared_ptr<Array>* dictionary) const {
+Status DictionaryMemo::GetDictionary(int64_t id,
+ std::shared_ptr<Array>* dictionary) const {
auto it = id_to_dictionary_.find(id);
if (it == id_to_dictionary_.end()) {
std::stringstream ss;
@@ -817,8 +828,8 @@ bool DictionaryMemo::HasDictionaryId(int64_t id) const {
return it != id_to_dictionary_.end();
}
-Status DictionaryMemo::AddDictionary(
- int64_t id, const std::shared_ptr<Array>& dictionary) {
+Status DictionaryMemo::AddDictionary(int64_t id,
+ const std::shared_ptr<Array>& dictionary) {
if (HasDictionaryId(id)) {
std::stringstream ss;
ss << "Dictionary with id " << id << " already exists";
@@ -835,8 +846,8 @@ Status DictionaryMemo::AddDictionary(
class Message::MessageImpl {
public:
- explicit MessageImpl(
- const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body)
+ explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
+ const std::shared_ptr<Buffer>& body)
: metadata_(metadata), message_(nullptr), body_(body) {}
Status Open() {
@@ -897,43 +908,35 @@ class Message::MessageImpl {
std::shared_ptr<Buffer> body_;
};
-Message::Message(
- const std::shared_ptr<Buffer>& metadata, const std::shared_ptr<Buffer>& body) {
+Message::Message(const std::shared_ptr<Buffer>& metadata,
+ const std::shared_ptr<Buffer>& body) {
impl_.reset(new MessageImpl(metadata, body));
}
Status Message::Open(const std::shared_ptr<Buffer>& metadata,
- const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) {
+ const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out) {
out->reset(new Message(metadata, body));
return (*out)->impl_->Open();
}
Message::~Message() {}
-std::shared_ptr<Buffer> Message::body() const {
- return impl_->body();
-}
+std::shared_ptr<Buffer> Message::body() const { return impl_->body(); }
-std::shared_ptr<Buffer> Message::metadata() const {
- return impl_->metadata();
-}
+std::shared_ptr<Buffer> Message::metadata() const { return impl_->metadata(); }
-Message::Type Message::type() const {
- return impl_->type();
-}
+Message::Type Message::type() const { return impl_->type(); }
-MetadataVersion Message::metadata_version() const {
- return impl_->version();
-}
+MetadataVersion Message::metadata_version() const { return impl_->version(); }
-const void* Message::header() const {
- return impl_->header();
-}
+const void* Message::header() const { return impl_->header(); }
bool Message::Equals(const Message& other) const {
int64_t metadata_bytes = std::min(metadata()->size(), other.metadata()->size());
- if (!metadata()->Equals(*other.metadata(), metadata_bytes)) { return false; }
+ if (!metadata()->Equals(*other.metadata(), metadata_bytes)) {
+ return false;
+ }
// Compare bodies, if they have them
auto this_body = body();
@@ -1012,7 +1015,7 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi
}
Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_memo,
- std::shared_ptr<Schema>* out) {
+ std::shared_ptr<Schema>* out) {
auto schema = static_cast<const flatbuf::Schema*>(opaque_schema);
int num_fields = static_cast<int>(schema->fields()->size());
@@ -1036,8 +1039,8 @@ Status GetSchema(const void* opaque_schema, const DictionaryMemo& dictionary_mem
}
Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type,
- std::vector<int64_t>* shape, std::vector<int64_t>* strides,
- std::vector<std::string>* dim_names) {
+ std::vector<int64_t>* shape, std::vector<int64_t>* strides,
+ std::vector<std::string>* dim_names) {
auto message = flatbuf::GetMessage(metadata.data());
auto tensor = reinterpret_cast<const flatbuf::Tensor*>(message->header());
@@ -1068,7 +1071,8 @@ Status GetTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>* type
// Read and write messages
static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata,
- io::InputStream* stream, std::unique_ptr<Message>* message) {
+ io::InputStream* stream,
+ std::unique_ptr<Message>* message) {
auto fb_message = flatbuf::GetMessage(metadata->data());
int64_t body_length = fb_message->bodyLength();
@@ -1087,7 +1091,7 @@ static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata,
}
Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
- std::unique_ptr<Message>* message) {
+ std::unique_ptr<Message>* message) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
@@ -1141,8 +1145,8 @@ InputStreamMessageReader::~InputStreamMessageReader() {}
// ----------------------------------------------------------------------
// Implement message writing
-Status WriteMessage(
- const Buffer& message, io::OutputStream* file, int32_t* message_length) {
+Status WriteMessage(const Buffer& message, io::OutputStream* file,
+ int32_t* message_length) {
// Need to write 4 bytes (message size), the message, plus padding to
// end on an 8-byte offset
int64_t start_offset;
@@ -1151,7 +1155,9 @@ Status WriteMessage(
int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
const int32_t remainder =
(padded_message_length + static_cast<int32_t>(start_offset)) % 8;
- if (remainder != 0) { padded_message_length += 8 - remainder; }
+ if (remainder != 0) {
+ padded_message_length += 8 - remainder;
+ }
// The returned message size includes the length prefix, the flatbuffer,
// plus padding
@@ -1167,7 +1173,9 @@ Status WriteMessage(
// Write any padding
int32_t padding = padded_message_length - static_cast<int32_t>(message.size()) - 4;
- if (padding > 0) { RETURN_NOT_OK(file->Write(kPaddingBytes, padding)); }
+ if (padding > 0) {
+ RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
+ }
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 614f7a6..90e4def 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -133,11 +133,14 @@ Status GetDictionaryTypes(const void* opaque_schema, DictionaryTypeMap* id_to_fi
// Construct a complete Schema from the message. May be expensive for very
// large schemas if you are only interested in a few fields
Status ARROW_EXPORT GetSchema(const void* opaque_schema,
- const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out);
+ const DictionaryMemo& dictionary_memo,
+ std::shared_ptr<Schema>* out);
Status ARROW_EXPORT GetTensorMetadata(const Buffer& metadata,
- std::shared_ptr<DataType>* type, std::vector<int64_t>* shape,
- std::vector<int64_t>* strides, std::vector<std::string>* dim_names);
+ std::shared_ptr<DataType>* type,
+ std::vector<int64_t>* shape,
+ std::vector<int64_t>* strides,
+ std::vector<std::string>* dim_names);
/// \brief An IPC message including metadata and body
class ARROW_EXPORT Message {
@@ -157,7 +160,7 @@ class ARROW_EXPORT Message {
/// \param[in] body a buffer containing the message body, which may be nullptr
/// \param[out] out the created message
static Status Open(const std::shared_ptr<Buffer>& metadata,
- const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out);
+ const std::shared_ptr<Buffer>& body, std::unique_ptr<Message>* out);
/// \brief Write length-prefixed metadata and body to output stream
///
@@ -242,22 +245,23 @@ class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
/// \param[out] message the message read
/// \return Status success or failure
Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length,
- io::RandomAccessFile* file, std::unique_ptr<Message>* message);
+ io::RandomAccessFile* file,
+ std::unique_ptr<Message>* message);
/// \brief Read encapulated RPC message (metadata and body) from InputStream
///
/// Read length-prefixed message with as-yet unknown length. Returns nullptr if
/// there are not enough bytes available or the message length is 0 (e.g. EOS
/// in a stream)
-Status ARROW_EXPORT ReadMessage(
- io::InputStream* stream, std::unique_ptr<Message>* message);
+Status ARROW_EXPORT ReadMessage(io::InputStream* stream,
+ std::unique_ptr<Message>* message);
/// Write a serialized message metadata with a length-prefix and padding to an
/// 8-byte offset
///
/// <message_size: int32><message: const void*><padding>
-Status ARROW_EXPORT WriteMessage(
- const Buffer& message, io::OutputStream* file, int32_t* message_length);
+Status ARROW_EXPORT WriteMessage(const Buffer& message, io::OutputStream* file,
+ int32_t* message_length);
// Serialize arrow::Schema as a Flatbuffer
//
@@ -266,23 +270,26 @@ Status ARROW_EXPORT WriteMessage(
// dictionary ids
// \param[out] out the serialized arrow::Buffer
// \return Status outcome
-Status ARROW_EXPORT WriteSchemaMessage(
- const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
+Status ARROW_EXPORT WriteSchemaMessage(const Schema& schema,
+ DictionaryMemo* dictionary_memo,
+ std::shared_ptr<Buffer>* out);
Status ARROW_EXPORT WriteRecordBatchMessage(int64_t length, int64_t body_length,
- const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out);
+ const std::vector<FieldMetadata>& nodes,
+ const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out);
-Status ARROW_EXPORT WriteTensorMessage(
- const Tensor& tensor, int64_t buffer_start_offset, std::shared_ptr<Buffer>* out);
+Status ARROW_EXPORT WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
+ std::shared_ptr<Buffer>* out);
Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
- const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
- std::shared_ptr<Buffer>* out);
+ const std::vector<FieldMetadata>& nodes,
+ const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out);
Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
- io::OutputStream* out);
+ const std::vector<FileBlock>& record_batches,
+ DictionaryMemo* dictionary_memo, io::OutputStream* out);
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 88ab330..8ae8280 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -95,12 +95,12 @@ struct ArrayLoaderContext {
};
static Status LoadArray(const std::shared_ptr<DataType>& type,
- ArrayLoaderContext* context, internal::ArrayData* out);
+ ArrayLoaderContext* context, internal::ArrayData* out);
class ArrayLoader {
public:
ArrayLoader(const std::shared_ptr<DataType>& type, internal::ArrayData* out,
- ArrayLoaderContext* context)
+ ArrayLoaderContext* context)
: type_(type), context_(context), out_(out) {}
Status Load() {
@@ -184,7 +184,7 @@ class ArrayLoader {
typename std::enable_if<std::is_base_of<FixedWidthType, T>::value &&
!std::is_base_of<FixedSizeBinaryType, T>::value &&
!std::is_base_of<DictionaryType, T>::value,
- Status>::type
+ Status>::type
Visit(const T& type) {
return LoadPrimitive<T>();
}
@@ -252,18 +252,18 @@ class ArrayLoader {
};
static Status LoadArray(const std::shared_ptr<DataType>& type,
- ArrayLoaderContext* context, internal::ArrayData* out) {
+ ArrayLoaderContext* context, internal::ArrayData* out) {
ArrayLoader loader(type, out, context);
return loader.Load();
}
Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
- io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
+ io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
}
Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<RecordBatch>* out) {
+ std::shared_ptr<RecordBatch>* out) {
io::BufferReader reader(message.body());
DCHECK_EQ(message.type(), Message::RECORD_BATCH);
return ReadRecordBatch(*message.metadata(), schema, kMaxNestingDepth, &reader, out);
@@ -273,8 +273,9 @@ Status ReadRecordBatch(const Message& message, const std::shared_ptr<Schema>& sc
// Array loading
static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
- int64_t num_rows, int max_recursion_depth, IpcComponentSource* source,
- std::shared_ptr<RecordBatch>* out) {
+ int64_t num_rows, int max_recursion_depth,
+ IpcComponentSource* source,
+ std::shared_ptr<RecordBatch>* out) {
ArrayLoaderContext context;
context.source = source;
context.field_index = 0;
@@ -294,16 +295,17 @@ static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
}
static inline Status ReadRecordBatch(const flatbuf::RecordBatch* metadata,
- const std::shared_ptr<Schema>& schema, int max_recursion_depth,
- io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
+ const std::shared_ptr<Schema>& schema,
+ int max_recursion_depth, io::RandomAccessFile* file,
+ std::shared_ptr<RecordBatch>* out) {
IpcComponentSource source(metadata, file);
- return LoadRecordBatchFromSource(
- schema, metadata->length(), max_recursion_depth, &source, out);
+ return LoadRecordBatchFromSource(schema, metadata->length(), max_recursion_depth,
+ &source, out);
}
Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& schema,
- int max_recursion_depth, io::RandomAccessFile* file,
- std::shared_ptr<RecordBatch>* out) {
+ int max_recursion_depth, io::RandomAccessFile* file,
+ std::shared_ptr<RecordBatch>* out) {
auto message = flatbuf::GetMessage(metadata.data());
if (message->header_type() != flatbuf::MessageHeader_RecordBatch) {
DCHECK_EQ(message->header_type(), flatbuf::MessageHeader_RecordBatch);
@@ -313,7 +315,8 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc
}
Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionary_types,
- io::RandomAccessFile* file, int64_t* dictionary_id, std::shared_ptr<Array>* out) {
+ io::RandomAccessFile* file, int64_t* dictionary_id,
+ std::shared_ptr<Array>* out) {
auto message = flatbuf::GetMessage(metadata.data());
auto dictionary_batch =
reinterpret_cast<const flatbuf::DictionaryBatch*>(message->header());
@@ -347,7 +350,7 @@ Status ReadDictionary(const Buffer& metadata, const DictionaryTypeMap& dictionar
}
static Status ReadMessageAndValidate(MessageReader* reader, Message::Type expected_type,
- bool allow_null, std::unique_ptr<Message>* message) {
+ bool allow_null, std::unique_ptr<Message>* message) {
RETURN_NOT_OK(reader->ReadNextMessage(message));
if (!(*message) && !allow_null) {
@@ -357,7 +360,9 @@ static Status ReadMessageAndValidate(MessageReader* reader, Message::Type expect
return Status::Invalid(ss.str());
}
- if ((*message) == nullptr) { return Status::OK(); }
+ if ((*message) == nullptr) {
+ return Status::OK();
+ }
if ((*message)->type() != expected_type) {
std::stringstream ss;
@@ -389,15 +394,15 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
Status ReadNextDictionary() {
std::unique_ptr<Message> message;
- RETURN_NOT_OK(ReadMessageAndValidate(
- message_reader_.get(), Message::DICTIONARY_BATCH, false, &message));
+ RETURN_NOT_OK(ReadMessageAndValidate(message_reader_.get(), Message::DICTIONARY_BATCH,
+ false, &message));
io::BufferReader reader(message->body());
std::shared_ptr<Array> dictionary;
int64_t id;
- RETURN_NOT_OK(ReadDictionary(
- *message->metadata(), dictionary_types_, &reader, &id, &dictionary));
+ RETURN_NOT_OK(ReadDictionary(*message->metadata(), dictionary_types_, &reader, &id,
+ &dictionary));
return dictionary_memo_.AddDictionary(id, dictionary);
}
@@ -420,8 +425,8 @@ class RecordBatchStreamReader::RecordBatchStreamReaderImpl {
Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) {
std::unique_ptr<Message> message;
- RETURN_NOT_OK(ReadMessageAndValidate(
- message_reader_.get(), Message::RECORD_BATCH, true, &message));
+ RETURN_NOT_OK(ReadMessageAndValidate(message_reader_.get(), Message::RECORD_BATCH,
+ true, &message));
if (message == nullptr) {
// End of stream
@@ -451,14 +456,14 @@ RecordBatchStreamReader::RecordBatchStreamReader() {
RecordBatchStreamReader::~RecordBatchStreamReader() {}
Status RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_reader,
- std::shared_ptr<RecordBatchStreamReader>* reader) {
+ std::shared_ptr<RecordBatchStreamReader>* reader) {
// Private ctor
*reader = std::shared_ptr<RecordBatchStreamReader>(new RecordBatchStreamReader());
return (*reader)->impl_->Open(std::move(message_reader));
}
Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
- std::shared_ptr<RecordBatchStreamReader>* out) {
+ std::shared_ptr<RecordBatchStreamReader>* out) {
std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream));
return Open(std::move(message_reader), out);
}
@@ -502,8 +507,8 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
}
// Now read the footer
- RETURN_NOT_OK(file_->ReadAt(
- footer_offset_ - footer_length - file_end_size, footer_length, &footer_buffer_));
+ RETURN_NOT_OK(file_->ReadAt(footer_offset_ - footer_length - file_end_size,
+ footer_length, &footer_buffer_));
// TODO(wesm): Verify the footer
footer_ = flatbuf::GetFooter(footer_buffer_->data());
@@ -568,7 +573,7 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl {
std::shared_ptr<Array> dictionary;
int64_t dictionary_id;
RETURN_NOT_OK(ReadDictionary(*message->metadata(), dictionary_fields_, &reader,
- &dictionary_id, &dictionary));
+ &dictionary_id, &dictionary));
RETURN_NOT_OK(dictionary_memo_->AddDictionary(dictionary_id, dictionary));
}
@@ -610,37 +615,34 @@ RecordBatchFileReader::RecordBatchFileReader() {
RecordBatchFileReader::~RecordBatchFileReader() {}
Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
- std::shared_ptr<RecordBatchFileReader>* reader) {
+ std::shared_ptr<RecordBatchFileReader>* reader) {
int64_t footer_offset;
RETURN_NOT_OK(file->GetSize(&footer_offset));
return Open(file, footer_offset, reader);
}
Status RecordBatchFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
- int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader) {
+ int64_t footer_offset,
+ std::shared_ptr<RecordBatchFileReader>* reader) {
*reader = std::shared_ptr<RecordBatchFileReader>(new RecordBatchFileReader());
return (*reader)->impl_->Open(file, footer_offset);
}
-std::shared_ptr<Schema> RecordBatchFileReader::schema() const {
- return impl_->schema();
-}
+std::shared_ptr<Schema> RecordBatchFileReader::schema() const { return impl_->schema(); }
int RecordBatchFileReader::num_record_batches() const {
return impl_->num_record_batches();
}
-MetadataVersion RecordBatchFileReader::version() const {
- return impl_->version();
-}
+MetadataVersion RecordBatchFileReader::version() const { return impl_->version(); }
-Status RecordBatchFileReader::ReadRecordBatch(
- int i, std::shared_ptr<RecordBatch>* batch) {
+Status RecordBatchFileReader::ReadRecordBatch(int i,
+ std::shared_ptr<RecordBatch>* batch) {
return impl_->ReadRecordBatch(i, batch);
}
-static Status ReadContiguousPayload(
- int64_t offset, io::RandomAccessFile* file, std::unique_ptr<Message>* message) {
+static Status ReadContiguousPayload(int64_t offset, io::RandomAccessFile* file,
+ std::unique_ptr<Message>* message) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(file->Seek(offset));
RETURN_NOT_OK(ReadMessage(file, message));
@@ -652,16 +654,16 @@ static Status ReadContiguousPayload(
}
Status ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
- io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
+ io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(offset, file, &message));
io::BufferReader buffer_reader(message->body());
- return ReadRecordBatch(
- *message->metadata(), schema, kMaxNestingDepth, &buffer_reader, out);
+ return ReadRecordBatch(*message->metadata(), schema, kMaxNestingDepth, &buffer_reader,
+ out);
}
-Status ReadTensor(
- int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out) {
+Status ReadTensor(int64_t offset, io::RandomAccessFile* file,
+ std::shared_ptr<Tensor>* out) {
// Respect alignment of Tensor messages (see WriteTensor)
offset = PaddedLength(offset);
std::unique_ptr<Message> message;
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index d6c2614..c0d3fb1 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -72,7 +72,7 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
/// \param(out) out the created RecordBatchStreamReader object
/// \return Status
static Status Open(std::unique_ptr<MessageReader> message_reader,
- std::shared_ptr<RecordBatchStreamReader>* out);
+ std::shared_ptr<RecordBatchStreamReader>* out);
/// \Create Record batch stream reader from InputStream
///
@@ -80,7 +80,7 @@ class ARROW_EXPORT RecordBatchStreamReader : public RecordBatchReader {
/// \param(out) out the created RecordBatchStreamReader object
/// \return Status
static Status Open(const std::shared_ptr<io::InputStream>& stream,
- std::shared_ptr<RecordBatchStreamReader>* out);
+ std::shared_ptr<RecordBatchStreamReader>* out);
std::shared_ptr<Schema> schema() const override;
Status ReadNextRecordBatch(std::shared_ptr<RecordBatch>* batch) override;
@@ -103,7 +103,7 @@ class ARROW_EXPORT RecordBatchFileReader {
// need only locate the end of the Arrow file stream to discover the metadata
// and then proceed to read the data into memory.
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
- std::shared_ptr<RecordBatchFileReader>* reader);
+ std::shared_ptr<RecordBatchFileReader>* reader);
// If the file is embedded within some larger file or memory region, you can
// pass the absolute memory offset to the end of the file (which contains the
@@ -113,7 +113,8 @@ class ARROW_EXPORT RecordBatchFileReader {
// @param file: the data source
// @param footer_offset: the position of the end of the Arrow "file"
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file,
- int64_t footer_offset, std::shared_ptr<RecordBatchFileReader>* reader);
+ int64_t footer_offset,
+ std::shared_ptr<RecordBatchFileReader>* reader);
/// The schema includes any dictionaries
std::shared_ptr<Schema> schema() const;
@@ -148,8 +149,9 @@ class ARROW_EXPORT RecordBatchFileReader {
/// \param(in) file a random access file
/// \param(out) out the read record batch
Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
- const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
- std::shared_ptr<RecordBatch>* out);
+ const std::shared_ptr<Schema>& schema,
+ io::RandomAccessFile* file,
+ std::shared_ptr<RecordBatch>* out);
/// \brief Read record batch from fully encapulated Message
///
@@ -158,7 +160,8 @@ Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
/// \param[out] out the resulting RecordBatch
/// \return Status
Status ARROW_EXPORT ReadRecordBatch(const Message& message,
- const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out);
+ const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<RecordBatch>* out);
/// Read record batch from file given metadata and schema
///
@@ -168,8 +171,9 @@ Status ARROW_EXPORT ReadRecordBatch(const Message& message,
/// \param(in) max_recursion_depth the maximum permitted nesting depth
/// \param(out) out the read record batch
Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
- const std::shared_ptr<Schema>& schema, int max_recursion_depth,
- io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
+ const std::shared_ptr<Schema>& schema,
+ int max_recursion_depth, io::RandomAccessFile* file,
+ std::shared_ptr<RecordBatch>* out);
/// Read record batch as encapsulated IPC message with metadata size prefix and
/// header
@@ -179,15 +183,16 @@ Status ARROW_EXPORT ReadRecordBatch(const Buffer& metadata,
/// \param(in) file the file where the batch is located
/// \param(out) out the read record batch
Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
- io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
+ io::RandomAccessFile* file,
+ std::shared_ptr<RecordBatch>* out);
/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
///
/// \param(in) offset the file location of the start of the message
/// \param(in) file the file where the batch is located
/// \param(out) out the read tensor
-Status ARROW_EXPORT ReadTensor(
- int64_t offset, io::RandomAccessFile* file, std::shared_ptr<Tensor>* out);
+Status ARROW_EXPORT ReadTensor(int64_t offset, io::RandomAccessFile* file,
+ std::shared_ptr<Tensor>* out);
/// Backwards-compatibility for Arrow < 0.4.0
///
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/stream-to-file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream-to-file.cc b/cpp/src/arrow/ipc/stream-to-file.cc
index de65883..33719b3 100644
--- a/cpp/src/arrow/ipc/stream-to-file.cc
+++ b/cpp/src/arrow/ipc/stream-to-file.cc
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include <iostream>
#include "arrow/io/file.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/status.h"
-#include <iostream>
#include "arrow/util/io-util.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 67a41ba..a876792 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -69,8 +69,8 @@ static inline void CompareBatch(const RecordBatch& left, const RecordBatch& righ
}
}
-static inline void CompareArraysDetailed(
- int index, const Array& result, const Array& expected) {
+static inline void CompareArraysDetailed(int index, const Array& result,
+ const Array& expected) {
if (!expected.Equals(result)) {
std::stringstream pp_result;
std::stringstream pp_expected;
@@ -83,8 +83,8 @@ static inline void CompareArraysDetailed(
}
}
-static inline void CompareBatchColumnsDetailed(
- const RecordBatch& result, const RecordBatch& expected) {
+static inline void CompareBatchColumnsDetailed(const RecordBatch& result,
+ const RecordBatch& expected) {
for (int i = 0; i < expected.num_columns(); ++i) {
auto left = result.column(i);
auto right = expected.column(i);
@@ -95,16 +95,16 @@ static inline void CompareBatchColumnsDetailed(
const auto kListInt32 = list(int32());
const auto kListListInt32 = list(kListInt32);
-Status MakeRandomInt32Array(
- int64_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) {
+Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool,
+ std::shared_ptr<Array>* out) {
std::shared_ptr<PoolBuffer> data;
RETURN_NOT_OK(test::MakeRandomInt32PoolBuffer(length, pool, &data));
Int32Builder builder(pool, int32());
if (include_nulls) {
std::shared_ptr<PoolBuffer> valid_bytes;
RETURN_NOT_OK(test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes));
- RETURN_NOT_OK(builder.Append(
- reinterpret_cast<const int32_t*>(data->data()), length, valid_bytes->data()));
+ RETURN_NOT_OK(builder.Append(reinterpret_cast<const int32_t*>(data->data()), length,
+ valid_bytes->data()));
return builder.Finish(out);
}
RETURN_NOT_OK(builder.Append(reinterpret_cast<const int32_t*>(data->data()), length));
@@ -112,7 +112,8 @@ Status MakeRandomInt32Array(
}
Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_lists,
- bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) {
+ bool include_nulls, MemoryPool* pool,
+ std::shared_ptr<Array>* out) {
// Create the null list values
std::vector<uint8_t> valid_lists(num_lists);
const double null_percent = include_nulls ? 0.1 : 0;
@@ -129,15 +130,16 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li
test::rand_uniform_int(num_lists, seed, 0, max_list_size, list_sizes.data());
// make sure sizes are consistent with null
std::transform(list_sizes.begin(), list_sizes.end(), valid_lists.begin(),
- list_sizes.begin(),
- [](int32_t size, int32_t valid) { return valid == 0 ? 0 : size; });
+ list_sizes.begin(),
+ [](int32_t size, int32_t valid) { return valid == 0 ? 0 : size; });
std::partial_sum(list_sizes.begin(), list_sizes.end(), ++offsets.begin());
// Force invariants
const int32_t child_length = static_cast<int32_t>(child_array->length());
offsets[0] = 0;
std::replace_if(offsets.begin(), offsets.end(),
- [child_length](int32_t offset) { return offset > child_length; }, child_length);
+ [child_length](int32_t offset) { return offset > child_length; },
+ child_length);
}
offsets[num_lists] = static_cast<int32_t>(child_array->length());
@@ -148,14 +150,14 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li
RETURN_NOT_OK(test::CopyBufferFromVector(offsets, pool, &offsets_buffer));
*out = std::make_shared<ListArray>(list(child_array->type()), num_lists, offsets_buffer,
- child_array, null_bitmap, kUnknownNullCount);
+ child_array, null_bitmap, kUnknownNullCount);
return ValidateArray(**out);
}
typedef Status MakeRecordBatch(std::shared_ptr<RecordBatch>* out);
-Status MakeRandomBooleanArray(
- const int length, bool include_nulls, std::shared_ptr<Array>* out) {
+Status MakeRandomBooleanArray(const int length, bool include_nulls,
+ std::shared_ptr<Array>* out) {
std::vector<uint8_t> values(length);
test::random_null_bytes(length, 0.5, values.data());
std::shared_ptr<Buffer> data;
@@ -210,10 +212,10 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
}
template <class Builder, class RawType>
-Status MakeRandomBinaryArray(
- int64_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* out) {
- const std::vector<std::string> values = {
- "", "", "abc", "123", "efg", "456!@#!@#", "12312"};
+Status MakeRandomBinaryArray(int64_t length, bool include_nulls, MemoryPool* pool,
+ std::shared_ptr<Array>* out) {
+ const std::vector<std::string> values = {"", "", "abc", "123",
+ "efg", "456!@#!@#", "12312"};
Builder builder(pool);
const size_t values_len = values.size();
for (int64_t i = 0; i < length; ++i) {
@@ -223,7 +225,7 @@ Status MakeRandomBinaryArray(
} else {
const std::string& value = values[values_index];
RETURN_NOT_OK(builder.Append(reinterpret_cast<const RawType*>(value.data()),
- static_cast<int32_t>(value.size())));
+ static_cast<int32_t>(value.size())));
}
}
return builder.Finish(out);
@@ -434,11 +436,12 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
// construct individual nullable/non-nullable struct arrays
auto sparse_no_nulls =
std::make_shared<UnionArray>(sparse_type, length, sparse_children, type_ids_buffer);
- auto sparse = std::make_shared<UnionArray>(
- sparse_type, length, sparse_children, type_ids_buffer, nullptr, null_bitmask, 1);
+ auto sparse = std::make_shared<UnionArray>(sparse_type, length, sparse_children,
+ type_ids_buffer, nullptr, null_bitmask, 1);
- auto dense = std::make_shared<UnionArray>(dense_type, length, dense_children,
- type_ids_buffer, offsets_buffer, null_bitmask, 1);
+ auto dense =
+ std::make_shared<UnionArray>(dense_type, length, dense_children, type_ids_buffer,
+ offsets_buffer, null_bitmask, 1);
// construct batch
std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse, dense};
@@ -480,8 +483,8 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
std::vector<int32_t> list_offsets = {0, 0, 2, 2, 5, 6, 9};
std::shared_ptr<Array> offsets, indices3;
- ArrayFromVector<Int32Type, int32_t>(
- std::vector<bool>(list_offsets.size(), true), list_offsets, &offsets);
+ ArrayFromVector<Int32Type, int32_t>(std::vector<bool>(list_offsets.size(), true),
+ list_offsets, &offsets);
std::vector<int8_t> indices3_values = {0, 1, 2, 0, 1, 2, 0, 1, 2};
std::vector<bool> is_valid3(9, true);
@@ -490,8 +493,8 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
std::shared_ptr<Buffer> null_bitmap;
RETURN_NOT_OK(test::GetBitmapFromVector(is_valid, &null_bitmap));
- std::shared_ptr<Array> a3 = std::make_shared<ListArray>(f3_type, length,
- std::static_pointer_cast<PrimitiveArray>(offsets)->values(),
+ std::shared_ptr<Array> a3 = std::make_shared<ListArray>(
+ f3_type, length, std::static_pointer_cast<PrimitiveArray>(offsets)->values(),
std::make_shared<DictionaryArray>(f1_type, indices3), null_bitmap, 1);
// Dictionary-encoded list of integer
@@ -500,14 +503,15 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
std::shared_ptr<Array> offsets4, values4, indices4;
std::vector<int32_t> list_offsets4 = {0, 2, 2, 3};
- ArrayFromVector<Int32Type, int32_t>(
- std::vector<bool>(4, true), list_offsets4, &offsets4);
+ ArrayFromVector<Int32Type, int32_t>(std::vector<bool>(4, true), list_offsets4,
+ &offsets4);
std::vector<int8_t> list_values4 = {0, 1, 2};
ArrayFromVector<Int8Type, int8_t>(std::vector<bool>(3, true), list_values4, &values4);
- auto dict3 = std::make_shared<ListArray>(f4_value_type, 3,
- std::static_pointer_cast<PrimitiveArray>(offsets4)->values(), values4);
+ auto dict3 = std::make_shared<ListArray>(
+ f4_value_type, 3, std::static_pointer_cast<PrimitiveArray>(offsets4)->values(),
+ values4);
std::vector<int8_t> indices4_values = {0, 1, 2, 0, 1, 2};
ArrayFromVector<Int8Type, int8_t>(is_valid, indices4_values, &indices4);
@@ -516,9 +520,9 @@ Status MakeDictionary(std::shared_ptr<RecordBatch>* out) {
auto a4 = std::make_shared<DictionaryArray>(f4_type, indices4);
// construct batch
- std::shared_ptr<Schema> schema(new Schema({field("dict1", f0_type),
- field("sparse", f1_type), field("dense", f2_type),
- field("list of encoded string", f3_type), field("encoded list<int8>", f4_type)}));
+ std::shared_ptr<Schema> schema(new Schema(
+ {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type),
+ field("list of encoded string", f3_type), field("encoded list<int8>", f4_type)}));
std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2, a3, a4};
@@ -575,7 +579,8 @@ Status MakeDates(std::shared_ptr<RecordBatch>* out) {
ArrayFromVector<Date32Type, int32_t>(is_valid, date32_values, &date32_array);
std::vector<int64_t> date64_values = {1489269000000, 1489270000000, 1489271000000,
- 1489272000000, 1489272000000, 1489273000000, 1489274000000};
+ 1489272000000, 1489272000000, 1489273000000,
+ 1489274000000};
std::shared_ptr<Array> date64_array;
ArrayFromVector<Date64Type, int64_t>(is_valid, date64_values, &date64_array);
@@ -592,7 +597,7 @@ Status MakeTimestamps(std::shared_ptr<RecordBatch>* out) {
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
std::vector<int64_t> ts_values = {1489269000000, 1489270000000, 1489271000000,
- 1489272000000, 1489272000000, 1489273000000};
+ 1489272000000, 1489272000000, 1489273000000};
std::shared_ptr<Array> a0, a1, a2;
ArrayFromVector<TimestampType, int64_t>(f0->type(), is_valid, ts_values, &a0);
@@ -612,10 +617,10 @@ Status MakeTimes(std::shared_ptr<RecordBatch>* out) {
auto f3 = field("f3", time64(TimeUnit::NANO));
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2, f3}));
- std::vector<int32_t> t32_values = {
- 1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000};
+ std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
+ 1489272000, 1489272000, 1489273000};
std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
- 1489272000000, 1489272000000, 1489273000000};
+ 1489272000000, 1489272000000, 1489273000000};
std::shared_ptr<Array> a0, a1, a2, a3;
ArrayFromVector<Time32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
@@ -630,7 +635,7 @@ Status MakeTimes(std::shared_ptr<RecordBatch>* out) {
template <typename BuilderType, typename T>
void AppendValues(const std::vector<bool>& is_valid, const std::vector<T>& values,
- BuilderType* builder) {
+ BuilderType* builder) {
for (size_t i = 0; i < values.size(); ++i) {
if (is_valid[i]) {
ASSERT_OK(builder->Append(values[i]));
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 14708a1..163b27b 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -45,8 +45,9 @@ namespace ipc {
// Record batch write path
static inline Status GetTruncatedBitmap(int64_t offset, int64_t length,
- const std::shared_ptr<Buffer> input, MemoryPool* pool,
- std::shared_ptr<Buffer>* buffer) {
+ const std::shared_ptr<Buffer> input,
+ MemoryPool* pool,
+ std::shared_ptr<Buffer>* buffer) {
if (!input) {
*buffer = input;
return Status::OK();
@@ -63,8 +64,8 @@ static inline Status GetTruncatedBitmap(int64_t offset, int64_t length,
template <typename T>
inline Status GetTruncatedBuffer(int64_t offset, int64_t length,
- const std::shared_ptr<Buffer> input, MemoryPool* pool,
- std::shared_ptr<Buffer>* buffer) {
+ const std::shared_ptr<Buffer> input, MemoryPool* pool,
+ std::shared_ptr<Buffer>* buffer) {
if (!input) {
*buffer = input;
return Status::OK();
@@ -80,17 +81,19 @@ inline Status GetTruncatedBuffer(int64_t offset, int64_t length,
return Status::OK();
}
-static inline bool NeedTruncate(
- int64_t offset, const Buffer* buffer, int64_t min_length) {
+static inline bool NeedTruncate(int64_t offset, const Buffer* buffer,
+ int64_t min_length) {
// buffer can be NULL
- if (buffer == nullptr) { return false; }
+ if (buffer == nullptr) {
+ return false;
+ }
return offset != 0 || min_length < buffer->size();
}
class RecordBatchSerializer : public ArrayVisitor {
public:
RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset,
- int max_recursion_depth, bool allow_64bit)
+ int max_recursion_depth, bool allow_64bit)
: pool_(pool),
max_recursion_depth_(max_recursion_depth),
buffer_start_offset_(buffer_start_offset),
@@ -114,8 +117,8 @@ class RecordBatchSerializer : public ArrayVisitor {
if (arr.null_count() > 0) {
std::shared_ptr<Buffer> bitmap;
- RETURN_NOT_OK(GetTruncatedBitmap(
- arr.offset(), arr.length(), arr.null_bitmap(), pool_, &bitmap));
+ RETURN_NOT_OK(GetTruncatedBitmap(arr.offset(), arr.length(), arr.null_bitmap(),
+ pool_, &bitmap));
buffers_.push_back(bitmap);
} else {
// Push a dummy zero-length buffer, not to be copied
@@ -175,14 +178,14 @@ class RecordBatchSerializer : public ArrayVisitor {
}
// Override this for writing dictionary metadata
- virtual Status WriteMetadataMessage(
- int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
- return WriteRecordBatchMessage(
- num_rows, body_length, field_nodes_, buffer_meta_, out);
+ virtual Status WriteMetadataMessage(int64_t num_rows, int64_t body_length,
+ std::shared_ptr<Buffer>* out) {
+ return WriteRecordBatchMessage(num_rows, body_length, field_nodes_, buffer_meta_,
+ out);
}
Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length) {
+ int64_t* body_length) {
RETURN_NOT_OK(Assemble(batch, body_length));
#ifndef NDEBUG
@@ -216,9 +219,13 @@ class RecordBatchSerializer : public ArrayVisitor {
padding = BitUtil::RoundUpToMultipleOf64(size) - size;
}
- if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
+ if (size > 0) {
+ RETURN_NOT_OK(dst->Write(buffer->data(), size));
+ }
- if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+ if (padding > 0) {
+ RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
+ }
}
#ifndef NDEBUG
@@ -245,7 +252,7 @@ class RecordBatchSerializer : public ArrayVisitor {
// Send padding if it's available
const int64_t buffer_length =
std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
- data->size() - byte_offset);
+ data->size() - byte_offset);
data = SliceBuffer(data, byte_offset, buffer_length);
}
buffers_.push_back(data);
@@ -253,8 +260,8 @@ class RecordBatchSerializer : public ArrayVisitor {
}
template <typename ArrayType>
- Status GetZeroBasedValueOffsets(
- const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) {
+ Status GetZeroBasedValueOffsets(const ArrayType& array,
+ std::shared_ptr<Buffer>* value_offsets) {
// Share slicing logic between ListArray and BinaryArray
auto offsets = array.value_offsets();
@@ -265,8 +272,8 @@ class RecordBatchSerializer : public ArrayVisitor {
// b) slice the values array accordingly
std::shared_ptr<MutableBuffer> shifted_offsets;
- RETURN_NOT_OK(AllocateBuffer(
- pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets));
+ RETURN_NOT_OK(AllocateBuffer(pool_, sizeof(int32_t) * (array.length() + 1),
+ &shifted_offsets));
int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data());
const int32_t start_offset = array.value_offset(0);
@@ -392,13 +399,15 @@ class RecordBatchSerializer : public ArrayVisitor {
const auto& type = static_cast<const UnionType&>(*array.type());
std::shared_ptr<Buffer> value_offsets;
- RETURN_NOT_OK(GetTruncatedBuffer<int32_t>(
- offset, length, array.value_offsets(), pool_, &value_offsets));
+ RETURN_NOT_OK(GetTruncatedBuffer<int32_t>(offset, length, array.value_offsets(),
+ pool_, &value_offsets));
// The Union type codes are not necessary 0-indexed
uint8_t max_code = 0;
for (uint8_t code : type.type_codes()) {
- if (code > max_code) { max_code = code; }
+ if (code > max_code) {
+ max_code = code;
+ }
}
// Allocate an array of child offsets. Set all to -1 to indicate that we
@@ -424,7 +433,9 @@ class RecordBatchSerializer : public ArrayVisitor {
for (int64_t i = 0; i < length; ++i) {
const uint8_t code = type_ids[i];
int32_t shift = child_offsets[code];
- if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; }
+ if (shift == -1) {
+ child_offsets[code] = shift = unshifted_offsets[i];
+ }
shifted_offsets[i] = unshifted_offsets[i] - shift;
// Update the child length to account for observed value
@@ -486,14 +497,14 @@ class DictionaryWriter : public RecordBatchSerializer {
public:
using RecordBatchSerializer::RecordBatchSerializer;
- Status WriteMetadataMessage(
- int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
- return WriteDictionaryMessage(
- dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_, out);
+ Status WriteMetadataMessage(int64_t num_rows, int64_t body_length,
+ std::shared_ptr<Buffer>* out) override {
+ return WriteDictionaryMessage(dictionary_id_, num_rows, body_length, field_nodes_,
+ buffer_meta_, out);
}
Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+ io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
dictionary_id_ = dictionary_id;
// Make a dummy record batch. A bit tedious as we have to make a schema
@@ -516,27 +527,30 @@ Status AlignStreamPosition(io::OutputStream* stream) {
int64_t position;
RETURN_NOT_OK(stream->Tell(&position));
int64_t remainder = PaddedLength(position) - position;
- if (remainder > 0) { return stream->Write(kPaddingBytes, remainder); }
+ if (remainder > 0) {
+ return stream->Write(kPaddingBytes, remainder);
+ }
return Status::OK();
}
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- MemoryPool* pool, int max_recursion_depth, bool allow_64bit) {
- RecordBatchSerializer writer(
- pool, buffer_start_offset, max_recursion_depth, allow_64bit);
+ io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, MemoryPool* pool, int max_recursion_depth,
+ bool allow_64bit) {
+ RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth,
+ allow_64bit);
return writer.Write(batch, dst, metadata_length, body_length);
}
Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
- io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- MemoryPool* pool) {
+ io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, MemoryPool* pool) {
return WriteRecordBatch(batch, buffer_start_offset, dst, metadata_length, body_length,
- pool, kMaxNestingDepth, true);
+ pool, kMaxNestingDepth, true);
}
Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length) {
+ int64_t* body_length) {
if (!tensor.is_contiguous()) {
return Status::Invalid("No support yet for writing non-contiguous tensors");
}
@@ -556,8 +570,8 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat
}
Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
- int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool) {
+ int64_t buffer_start_offset, io::OutputStream* dst,
+ int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth, false);
return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
}
@@ -568,7 +582,7 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
int64_t body_length = 0;
io::MockOutputStream dst;
RETURN_NOT_OK(WriteRecordBatch(batch, 0, &dst, &metadata_length, &body_length,
- default_memory_pool(), kMaxNestingDepth, true));
+ default_memory_pool(), kMaxNestingDepth, true));
*size = dst.GetExtentBytesWritten();
return Status::OK();
}
@@ -632,7 +646,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
}
Status CheckStarted() {
- if (!started_) { return Start(); }
+ if (!started_) {
+ return Start();
+ }
return Status::OK();
}
@@ -653,7 +669,7 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
// Frame of reference in file format is 0, see ARROW-384
const int64_t buffer_start_offset = 0;
RETURN_NOT_OK(WriteDictionary(entry.first, entry.second, buffer_start_offset, sink_,
- &block->metadata_length, &block->body_length, pool_));
+ &block->metadata_length, &block->body_length, pool_));
RETURN_NOT_OK(UpdatePosition());
DCHECK(position_ % 8 == 0) << "WriteDictionary did not perform aligned writes";
}
@@ -668,9 +684,9 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
// Frame of reference in file format is 0, see ARROW-384
const int64_t buffer_start_offset = 0;
- RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_,
- &block->metadata_length, &block->body_length, pool_, kMaxNestingDepth,
- allow_64bit));
+ RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
+ batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length,
+ pool_, kMaxNestingDepth, allow_64bit));
RETURN_NOT_OK(UpdatePosition());
DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes";
@@ -681,15 +697,17 @@ class RecordBatchStreamWriter::RecordBatchStreamWriterImpl {
Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit) {
// Push an empty FileBlock. Can be written in the footer later
record_batches_.push_back({0, 0, 0});
- return WriteRecordBatch(
- batch, allow_64bit, &record_batches_[record_batches_.size() - 1]);
+ return WriteRecordBatch(batch, allow_64bit,
+ &record_batches_[record_batches_.size() - 1]);
}
// Adds padding bytes if necessary to ensure all memory blocks are written on
// 64-byte (or other alignment) boundaries.
Status Align(int64_t alignment = kArrowAlignment) {
int64_t remainder = PaddedLength(position_, alignment) - position_;
- if (remainder > 0) { return Write(kPaddingBytes, remainder); }
+ if (remainder > 0) {
+ return Write(kPaddingBytes, remainder);
+ }
return Status::OK();
}
@@ -725,8 +743,8 @@ RecordBatchStreamWriter::RecordBatchStreamWriter() {
RecordBatchStreamWriter::~RecordBatchStreamWriter() {}
-Status RecordBatchStreamWriter::WriteRecordBatch(
- const RecordBatch& batch, bool allow_64bit) {
+Status RecordBatchStreamWriter::WriteRecordBatch(const RecordBatch& batch,
+ bool allow_64bit) {
return impl_->WriteRecordBatch(batch, allow_64bit);
}
@@ -735,16 +753,14 @@ void RecordBatchStreamWriter::set_memory_pool(MemoryPool* pool) {
}
Status RecordBatchStreamWriter::Open(io::OutputStream* sink,
- const std::shared_ptr<Schema>& schema,
- std::shared_ptr<RecordBatchStreamWriter>* out) {
+ const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<RecordBatchStreamWriter>* out) {
// ctor is private
*out = std::shared_ptr<RecordBatchStreamWriter>(new RecordBatchStreamWriter());
return (*out)->impl_->Open(sink, schema);
}
-Status RecordBatchStreamWriter::Close() {
- return impl_->Close();
-}
+Status RecordBatchStreamWriter::Close() { return impl_->Close(); }
// ----------------------------------------------------------------------
// File writer implementation
@@ -756,8 +772,8 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl
Status Start() override {
// It is only necessary to align to 8-byte boundary at the start of the file
- RETURN_NOT_OK(Write(
- reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
+ RETURN_NOT_OK(Write(reinterpret_cast<const uint8_t*>(kArrowMagicBytes),
+ strlen(kArrowMagicBytes)));
RETURN_NOT_OK(Align(8));
// We write the schema at the start of the file (and the end). This also
@@ -768,21 +784,23 @@ class RecordBatchFileWriter::RecordBatchFileWriterImpl
Status Close() override {
// Write metadata
int64_t initial_position = position_;
- RETURN_NOT_OK(WriteFileFooter(
- *schema_, dictionaries_, record_batches_, &dictionary_memo_, sink_));
+ RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_,
+ &dictionary_memo_, sink_));
RETURN_NOT_OK(UpdatePosition());
// Write footer length
int32_t footer_length = static_cast<int32_t>(position_ - initial_position);
- if (footer_length <= 0) { return Status::Invalid("Invalid file footer"); }
+ if (footer_length <= 0) {
+ return Status::Invalid("Invalid file footer");
+ }
RETURN_NOT_OK(
Write(reinterpret_cast<const uint8_t*>(&footer_length), sizeof(int32_t)));
// Write magic bytes to end file
- return Write(
- reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes));
+ return Write(reinterpret_cast<const uint8_t*>(kArrowMagicBytes),
+ strlen(kArrowMagicBytes));
}
};
@@ -793,20 +811,19 @@ RecordBatchFileWriter::RecordBatchFileWriter() {
RecordBatchFileWriter::~RecordBatchFileWriter() {}
Status RecordBatchFileWriter::Open(io::OutputStream* sink,
- const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatchFileWriter>* out) {
+ const std::shared_ptr<Schema>& schema,
+ std::shared_ptr<RecordBatchFileWriter>* out) {
*out = std::shared_ptr<RecordBatchFileWriter>(
new RecordBatchFileWriter()); // ctor is private
return (*out)->impl_->Open(sink, schema);
}
-Status RecordBatchFileWriter::WriteRecordBatch(
- const RecordBatch& batch, bool allow_64bit) {
+Status RecordBatchFileWriter::WriteRecordBatch(const RecordBatch& batch,
+ bool allow_64bit) {
return impl_->WriteRecordBatch(batch, allow_64bit);
}
-Status RecordBatchFileWriter::Close() {
- return impl_->Close();
-}
+Status RecordBatchFileWriter::Close() { return impl_->Close(); }
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/ipc/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h
index 899a1b2..c28dfe0 100644
--- a/cpp/src/arrow/ipc/writer.h
+++ b/cpp/src/arrow/ipc/writer.h
@@ -85,7 +85,7 @@ class ARROW_EXPORT RecordBatchStreamWriter : public RecordBatchWriter {
/// \param(out) out the created stream writer
/// \return Status indicating success or failure
static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<RecordBatchStreamWriter>* out);
+ std::shared_ptr<RecordBatchStreamWriter>* out);
Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
Status Close() override;
@@ -113,7 +113,7 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
/// \param(out) out the created stream writer
/// \return Status indicating success or failure
static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
- std::shared_ptr<RecordBatchFileWriter>* out);
+ std::shared_ptr<RecordBatchFileWriter>* out);
Status WriteRecordBatch(const RecordBatch& batch, bool allow_64bit = false) override;
Status Close() override;
@@ -145,14 +145,16 @@ class ARROW_EXPORT RecordBatchFileWriter : public RecordBatchStreamWriter {
/// \param(out) body_length: the size of the contiguous buffer block plus
/// padding bytes
Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
- int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth,
- bool allow_64bit = false);
+ int64_t buffer_start_offset, io::OutputStream* dst,
+ int32_t* metadata_length, int64_t* body_length,
+ MemoryPool* pool,
+ int max_recursion_depth = kMaxNestingDepth,
+ bool allow_64bit = false);
// Write Array as a DictionaryBatch message
Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
- int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool);
+ int64_t buffer_start_offset, io::OutputStream* dst,
+ int32_t* metadata_length, int64_t* body_length, MemoryPool* pool);
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the record batch. This involves generating the complete serialized
@@ -166,13 +168,14 @@ Status ARROW_EXPORT GetTensorSize(const Tensor& tensor, int64_t* size);
/// EXPERIMENTAL: Write RecordBatch allowing lengths over INT32_MAX. This data
/// may not be readable by all Arrow implementations
Status ARROW_EXPORT WriteLargeRecordBatch(const RecordBatch& batch,
- int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool);
+ int64_t buffer_start_offset,
+ io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, MemoryPool* pool);
/// EXPERIMENTAL: Write arrow::Tensor as a contiguous message
/// <metadata size><metadata><tensor data>
Status ARROW_EXPORT WriteTensor(const Tensor& tensor, io::OutputStream* dst,
- int32_t* metadata_length, int64_t* body_length);
+ int32_t* metadata_length, int64_t* body_length);
/// Backwards-compatibility for Arrow < 0.4.0
///
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/memory_pool-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc
index 8a185ab..52e48db 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -27,9 +27,7 @@ class TestDefaultMemoryPool : public ::arrow::test::TestMemoryPoolBase {
::arrow::MemoryPool* memory_pool() override { return ::arrow::default_memory_pool(); }
};
-TEST_F(TestDefaultMemoryPool, MemoryTracking) {
- this->TestMemoryTracking();
-}
+TEST_F(TestDefaultMemoryPool, MemoryTracking) { this->TestMemoryTracking(); }
TEST_F(TestDefaultMemoryPool, OOM) {
#ifndef ADDRESS_SANITIZER
@@ -37,9 +35,7 @@ TEST_F(TestDefaultMemoryPool, OOM) {
#endif
}
-TEST_F(TestDefaultMemoryPool, Reallocate) {
- this->TestReallocate();
-}
+TEST_F(TestDefaultMemoryPool, Reallocate) { this->TestReallocate(); }
// Death tests and valgrind are known to not play well 100% of the time. See
// googletest documentation
@@ -53,7 +49,7 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
#ifndef NDEBUG
EXPECT_EXIT(pool->Free(data, 120), ::testing::ExitedWithCode(1),
- ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
+ ".*Check failed: \\(bytes_allocated_\\) >= \\(size\\)");
#endif
pool->Free(data, 100);
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/memory_pool.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index e7de5c4..769fc10 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -17,12 +17,12 @@
#include "arrow/memory_pool.h"
+#include <stdlib.h>
#include <algorithm>
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <sstream>
-#include <stdlib.h>
#include "arrow/status.h"
#include "arrow/util/logging.h"
@@ -60,8 +60,8 @@ Status AllocateAligned(int64_t size, uint8_t** out) {
return Status::OutOfMemory(ss.str());
}
#else
- const int result = posix_memalign(
- reinterpret_cast<void**>(out), kAlignment, static_cast<size_t>(size));
+ const int result = posix_memalign(reinterpret_cast<void**>(out), kAlignment,
+ static_cast<size_t>(size));
if (result == ENOMEM) {
std::stringstream ss;
ss << "malloc of size " << size << " failed";
@@ -82,13 +82,9 @@ MemoryPool::MemoryPool() {}
MemoryPool::~MemoryPool() {}
-int64_t MemoryPool::max_memory() const {
- return -1;
-}
+int64_t MemoryPool::max_memory() const { return -1; }
-DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) {
- max_memory_ = 0;
-}
+DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; }
Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) {
RETURN_NOT_OK(AllocateAligned(size, out));
@@ -96,7 +92,9 @@ Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) {
{
std::lock_guard<std::mutex> guard(lock_);
- if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); }
+ if (bytes_allocated_ > max_memory_) {
+ max_memory_ = bytes_allocated_.load();
+ }
}
return Status::OK();
}
@@ -128,15 +126,15 @@ Status DefaultMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t
bytes_allocated_ += new_size - old_size;
{
std::lock_guard<std::mutex> guard(lock_);
- if (bytes_allocated_ > max_memory_) { max_memory_ = bytes_allocated_.load(); }
+ if (bytes_allocated_ > max_memory_) {
+ max_memory_ = bytes_allocated_.load();
+ }
}
return Status::OK();
}
-int64_t DefaultMemoryPool::bytes_allocated() const {
- return bytes_allocated_.load();
-}
+int64_t DefaultMemoryPool::bytes_allocated() const { return bytes_allocated_.load(); }
void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) {
DCHECK_GE(bytes_allocated_, size);
@@ -150,9 +148,7 @@ void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) {
bytes_allocated_ -= size;
}
-int64_t DefaultMemoryPool::max_memory() const {
- return max_memory_.load();
-}
+int64_t DefaultMemoryPool::max_memory() const { return max_memory_.load(); }
DefaultMemoryPool::~DefaultMemoryPool() {}
http://git-wip-us.apache.org/repos/asf/arrow/blob/07b89bf3/cpp/src/arrow/pretty_print-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc
index 10a91f5..049f5a5 100644
--- a/cpp/src/arrow/pretty_print-test.cc
+++ b/cpp/src/arrow/pretty_print-test.cc
@@ -57,7 +57,7 @@ void CheckArray(const Array& arr, int indent, const char* expected) {
template <typename TYPE, typename C_TYPE>
void CheckPrimitive(int indent, const std::vector<bool>& is_valid,
- const std::vector<C_TYPE>& values, const char* expected) {
+ const std::vector<C_TYPE>& values, const char* expected) {
std::shared_ptr<Array> array;
ArrayFromVector<TYPE, C_TYPE>(is_valid, values, &array);
CheckArray(*array, indent, expected);