You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2017/03/20 09:48:41 UTC
[2/3] arrow git commit: ARROW-661: [C++] Add LargeRecordBatch
metadata type, IPC support, associated refactoring
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
deleted file mode 100644
index be0d282..0000000
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ /dev/null
@@ -1,597 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "arrow/ipc/metadata-internal.h"
-
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <sstream>
-#include <string>
-
-#include "flatbuffers/flatbuffers.h"
-
-#include "arrow/array.h"
-#include "arrow/buffer.h"
-#include "arrow/ipc/Message_generated.h"
-#include "arrow/schema.h"
-#include "arrow/status.h"
-#include "arrow/type.h"
-
-namespace arrow {
-
-namespace flatbuf = org::apache::arrow::flatbuf;
-
-namespace ipc {
-
-static Status IntFromFlatbuffer(
- const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
- if (int_data->bitWidth() > 64) {
- return Status::NotImplemented("Integers with more than 64 bits not implemented");
- }
- if (int_data->bitWidth() < 8) {
- return Status::NotImplemented("Integers with less than 8 bits not implemented");
- }
-
- switch (int_data->bitWidth()) {
- case 8:
- *out = int_data->is_signed() ? int8() : uint8();
- break;
- case 16:
- *out = int_data->is_signed() ? int16() : uint16();
- break;
- case 32:
- *out = int_data->is_signed() ? int32() : uint32();
- break;
- case 64:
- *out = int_data->is_signed() ? int64() : uint64();
- break;
- default:
- return Status::NotImplemented("Integers not in cstdint are not implemented");
- }
- return Status::OK();
-}
-
-static Status FloatFromFlatuffer(
- const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
- if (float_data->precision() == flatbuf::Precision_HALF) {
- *out = float16();
- } else if (float_data->precision() == flatbuf::Precision_SINGLE) {
- *out = float32();
- } else {
- *out = float64();
- }
- return Status::OK();
-}
-
-// Forward declaration
-static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
- DictionaryMemo* dictionary_memo, FieldOffset* offset);
-
-static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
- return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
-}
-
-static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) {
- return flatbuf::CreateFloatingPoint(fbb, precision).Union();
-}
-
-static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) {
- FieldOffset field;
- for (int i = 0; i < type->num_children(); ++i) {
- RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field));
- out_children->push_back(field);
- }
- return Status::OK();
-}
-
-static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
- Offset* offset) {
- RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
- *offset = flatbuf::CreateList(fbb).Union();
- return Status::OK();
-}
-
-static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
- Offset* offset) {
- RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
- *offset = flatbuf::CreateStruct_(fbb).Union();
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Union implementation
-
-static Status UnionFromFlatbuffer(const flatbuf::Union* union_data,
- const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
- UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE
- : UnionMode::DENSE;
-
- std::vector<uint8_t> type_codes;
-
- const flatbuffers::Vector<int32_t>* fb_type_ids = union_data->typeIds();
- if (fb_type_ids == nullptr) {
- for (uint8_t i = 0; i < children.size(); ++i) {
- type_codes.push_back(i);
- }
- } else {
- for (int32_t id : (*fb_type_ids)) {
- // TODO(wesm): can these values exceed 255?
- type_codes.push_back(static_cast<uint8_t>(id));
- }
- }
-
- *out = union_(children, type_codes, mode);
- return Status::OK();
-}
-
-static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
- Offset* offset) {
- RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
-
- const auto& union_type = static_cast<const UnionType&>(*type);
-
- flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE
- ? flatbuf::UnionMode_Sparse
- : flatbuf::UnionMode_Dense;
-
- std::vector<int32_t> type_ids;
- type_ids.reserve(union_type.type_codes.size());
- for (uint8_t code : union_type.type_codes) {
- type_ids.push_back(code);
- }
-
- auto fb_type_ids = fbb.CreateVector(type_ids);
-
- *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union();
- return Status::OK();
-}
-
-#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \
- *out_type = flatbuf::Type_Int; \
- *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
- break;
-
-static inline flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit unit) {
- switch (unit) {
- case TimeUnit::SECOND:
- return flatbuf::TimeUnit_SECOND;
- case TimeUnit::MILLI:
- return flatbuf::TimeUnit_MILLISECOND;
- case TimeUnit::MICRO:
- return flatbuf::TimeUnit_MICROSECOND;
- case TimeUnit::NANO:
- return flatbuf::TimeUnit_NANOSECOND;
- default:
- break;
- }
- return flatbuf::TimeUnit_MIN;
-}
-
-static inline TimeUnit FromFlatbufferUnit(flatbuf::TimeUnit unit) {
- switch (unit) {
- case flatbuf::TimeUnit_SECOND:
- return TimeUnit::SECOND;
- case flatbuf::TimeUnit_MILLISECOND:
- return TimeUnit::MILLI;
- case flatbuf::TimeUnit_MICROSECOND:
- return TimeUnit::MICRO;
- case flatbuf::TimeUnit_NANOSECOND:
- return TimeUnit::NANO;
- default:
- break;
- }
- // cannot reach
- return TimeUnit::SECOND;
-}
-
-static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
- const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
- switch (type) {
- case flatbuf::Type_NONE:
- return Status::Invalid("Type metadata cannot be none");
- case flatbuf::Type_Int:
- return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out);
- case flatbuf::Type_FloatingPoint:
- return FloatFromFlatuffer(
- static_cast<const flatbuf::FloatingPoint*>(type_data), out);
- case flatbuf::Type_Binary:
- *out = binary();
- return Status::OK();
- case flatbuf::Type_FixedWidthBinary: {
- auto fw_binary = static_cast<const flatbuf::FixedWidthBinary*>(type_data);
- *out = fixed_width_binary(fw_binary->byteWidth());
- return Status::OK();
- }
- case flatbuf::Type_Utf8:
- *out = utf8();
- return Status::OK();
- case flatbuf::Type_Bool:
- *out = boolean();
- return Status::OK();
- case flatbuf::Type_Decimal:
- return Status::NotImplemented("Decimal");
- case flatbuf::Type_Date:
- *out = date();
- return Status::OK();
- case flatbuf::Type_Time: {
- auto time_type = static_cast<const flatbuf::Time*>(type_data);
- *out = time(FromFlatbufferUnit(time_type->unit()));
- return Status::OK();
- }
- case flatbuf::Type_Timestamp: {
- auto ts_type = static_cast<const flatbuf::Timestamp*>(type_data);
- *out = timestamp(FromFlatbufferUnit(ts_type->unit()));
- return Status::OK();
- }
- case flatbuf::Type_Interval:
- return Status::NotImplemented("Interval");
- case flatbuf::Type_List:
- if (children.size() != 1) {
- return Status::Invalid("List must have exactly 1 child field");
- }
- *out = std::make_shared<ListType>(children[0]);
- return Status::OK();
- case flatbuf::Type_Struct_:
- *out = std::make_shared<StructType>(children);
- return Status::OK();
- case flatbuf::Type_Union:
- return UnionFromFlatbuffer(
- static_cast<const flatbuf::Union*>(type_data), children, out);
- default:
- return Status::Invalid("Unrecognized type");
- }
-}
-
-// TODO(wesm): Convert this to visitor pattern
-static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
- flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) {
- if (type->type == Type::DICTIONARY) {
- // In this library, the dictionary "type" is a logical construct. Here we
- // pass through to the value type, as we've already captured the index
- // type in the DictionaryEncoding metadata in the parent field
- const auto& dict_type = static_cast<const DictionaryType&>(*type);
- return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout,
- out_type, dictionary_memo, offset);
- }
-
- std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
- for (const BufferDescr& descr : buffer_layout) {
- flatbuf::VectorType vector_type;
- switch (descr.type()) {
- case BufferType::OFFSET:
- vector_type = flatbuf::VectorType_OFFSET;
- break;
- case BufferType::DATA:
- vector_type = flatbuf::VectorType_DATA;
- break;
- case BufferType::VALIDITY:
- vector_type = flatbuf::VectorType_VALIDITY;
- break;
- case BufferType::TYPE:
- vector_type = flatbuf::VectorType_TYPE;
- break;
- default:
- vector_type = flatbuf::VectorType_DATA;
- break;
- }
- auto offset = flatbuf::CreateVectorLayout(
- fbb, static_cast<int16_t>(descr.bit_width()), vector_type);
- layout->push_back(offset);
- }
-
- switch (type->type) {
- case Type::BOOL:
- *out_type = flatbuf::Type_Bool;
- *offset = flatbuf::CreateBool(fbb).Union();
- break;
- case Type::UINT8:
- INT_TO_FB_CASE(8, false);
- case Type::INT8:
- INT_TO_FB_CASE(8, true);
- case Type::UINT16:
- INT_TO_FB_CASE(16, false);
- case Type::INT16:
- INT_TO_FB_CASE(16, true);
- case Type::UINT32:
- INT_TO_FB_CASE(32, false);
- case Type::INT32:
- INT_TO_FB_CASE(32, true);
- case Type::UINT64:
- INT_TO_FB_CASE(64, false);
- case Type::INT64:
- INT_TO_FB_CASE(64, true);
- case Type::FLOAT:
- *out_type = flatbuf::Type_FloatingPoint;
- *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE);
- break;
- case Type::DOUBLE:
- *out_type = flatbuf::Type_FloatingPoint;
- *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE);
- break;
- case Type::FIXED_WIDTH_BINARY: {
- const auto& fw_type = static_cast<const FixedWidthBinaryType&>(*type);
- *out_type = flatbuf::Type_FixedWidthBinary;
- *offset = flatbuf::CreateFixedWidthBinary(fbb, fw_type.byte_width()).Union();
- } break;
- case Type::BINARY:
- *out_type = flatbuf::Type_Binary;
- *offset = flatbuf::CreateBinary(fbb).Union();
- break;
- case Type::STRING:
- *out_type = flatbuf::Type_Utf8;
- *offset = flatbuf::CreateUtf8(fbb).Union();
- break;
- case Type::DATE:
- *out_type = flatbuf::Type_Date;
- *offset = flatbuf::CreateDate(fbb).Union();
- break;
- case Type::TIME: {
- const auto& time_type = static_cast<const TimeType&>(*type);
- *out_type = flatbuf::Type_Time;
- *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit)).Union();
- } break;
- case Type::TIMESTAMP: {
- const auto& ts_type = static_cast<const TimestampType&>(*type);
- *out_type = flatbuf::Type_Timestamp;
- *offset = flatbuf::CreateTimestamp(fbb, ToFlatbufferUnit(ts_type.unit)).Union();
- } break;
- case Type::LIST:
- *out_type = flatbuf::Type_List;
- return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset);
- case Type::STRUCT:
- *out_type = flatbuf::Type_Struct_;
- return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset);
- case Type::UNION:
- *out_type = flatbuf::Type_Union;
- return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset);
- default:
- *out_type = flatbuf::Type_NONE; // Make clang-tidy happy
- std::stringstream ss;
- ss << "Unable to convert type: " << type->ToString() << std::endl;
- return Status::NotImplemented(ss.str());
- }
- return Status::OK();
-}
-
-using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
-
-static DictionaryOffset GetDictionaryEncoding(
- FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) {
- int64_t dictionary_id = memo->GetId(type.dictionary());
-
- // We assume that the dictionary index type (as an integer) has already been
- // validated elsewhere, and can safely assume we are dealing with signed
- // integers
- const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type());
-
- auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true);
-
- // TODO(wesm): ordered dictionaries
- return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset);
-}
-
-static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
- DictionaryMemo* dictionary_memo, FieldOffset* offset) {
- auto fb_name = fbb.CreateString(field->name);
-
- flatbuf::Type type_enum;
- Offset type_offset;
- Offset type_layout;
- std::vector<FieldOffset> children;
- std::vector<VectorLayoutOffset> layout;
-
- RETURN_NOT_OK(TypeToFlatbuffer(
- fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset));
- auto fb_children = fbb.CreateVector(children);
- auto fb_layout = fbb.CreateVector(layout);
-
- DictionaryOffset dictionary = 0;
- if (field->type->type == Type::DICTIONARY) {
- dictionary = GetDictionaryEncoding(
- fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo);
- }
-
- // TODO: produce the list of VectorTypes
- *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset,
- dictionary, fb_children, fb_layout);
-
- return Status::OK();
-}
-
-Status FieldFromFlatbufferDictionary(
- const flatbuf::Field* field, std::shared_ptr<Field>* out) {
- // Need an empty memo to pass down for constructing children
- DictionaryMemo dummy_memo;
-
- // Any DictionaryEncoding set is ignored here
-
- std::shared_ptr<DataType> type;
- auto children = field->children();
- std::vector<std::shared_ptr<Field>> child_fields(children->size());
- for (int i = 0; i < static_cast<int>(children->size()); ++i) {
- RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i]));
- }
-
- RETURN_NOT_OK(
- TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
-
- *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
- return Status::OK();
-}
-
-Status FieldFromFlatbuffer(const flatbuf::Field* field,
- const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) {
- std::shared_ptr<DataType> type;
-
- const flatbuf::DictionaryEncoding* encoding = field->dictionary();
-
- if (encoding == nullptr) {
- // The field is not dictionary encoded. We must potentially visit its
- // children to fully reconstruct the data type
- auto children = field->children();
- std::vector<std::shared_ptr<Field>> child_fields(children->size());
- for (int i = 0; i < static_cast<int>(children->size()); ++i) {
- RETURN_NOT_OK(
- FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
- }
- RETURN_NOT_OK(
- TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
- } else {
- // The field is dictionary encoded. The type of the dictionary values has
- // been determined elsewhere, and is stored in the DictionaryMemo. Here we
- // construct the logical DictionaryType object
-
- std::shared_ptr<Array> dictionary;
- RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary));
-
- std::shared_ptr<DataType> index_type;
- RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type));
- type = std::make_shared<DictionaryType>(index_type, dictionary);
- }
- *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
- return Status::OK();
-}
-
-// Implement MessageBuilder
-
-// will return the endianness of the system we are running on
-// based the NUMPY_API function. See NOTICE.txt
-flatbuf::Endianness endianness() {
- union {
- uint32_t i;
- char c[4];
- } bint = {0x01020304};
-
- return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little;
-}
-
-Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo,
- flatbuffers::Offset<flatbuf::Schema>* out) {
- std::vector<FieldOffset> field_offsets;
- for (int i = 0; i < schema.num_fields(); ++i) {
- std::shared_ptr<Field> field = schema.field(i);
- FieldOffset offset;
- RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset));
- field_offsets.push_back(offset);
- }
-
- *out = flatbuf::CreateSchema(fbb, endianness(), fbb.CreateVector(field_offsets));
- return Status::OK();
-}
-
-class MessageBuilder {
- public:
- Status SetSchema(const Schema& schema, DictionaryMemo* dictionary_memo) {
- flatbuffers::Offset<flatbuf::Schema> fb_schema;
- RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, dictionary_memo, &fb_schema));
-
- header_type_ = flatbuf::MessageHeader_Schema;
- header_ = fb_schema.Union();
- body_length_ = 0;
- return Status::OK();
- }
-
- Status SetRecordBatch(int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers) {
- header_type_ = flatbuf::MessageHeader_RecordBatch;
- header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes),
- fbb_.CreateVectorOfStructs(buffers))
- .Union();
- body_length_ = body_length;
-
- return Status::OK();
- }
-
- Status SetDictionary(int64_t id, int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers) {
- header_type_ = flatbuf::MessageHeader_DictionaryBatch;
-
- auto record_batch = flatbuf::CreateRecordBatch(fbb_, length,
- fbb_.CreateVectorOfStructs(nodes), fbb_.CreateVectorOfStructs(buffers));
-
- header_ = flatbuf::CreateDictionaryBatch(fbb_, id, record_batch).Union();
- body_length_ = body_length;
- return Status::OK();
- }
-
- Status Finish();
-
- Status GetBuffer(std::shared_ptr<Buffer>* out);
-
- private:
- flatbuf::MessageHeader header_type_;
- flatbuffers::Offset<void> header_;
- int64_t body_length_;
- flatbuffers::FlatBufferBuilder fbb_;
-};
-
-Status WriteSchemaMessage(
- const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) {
- MessageBuilder message;
- RETURN_NOT_OK(message.SetSchema(schema, dictionary_memo));
- RETURN_NOT_OK(message.Finish());
- return message.GetBuffer(out);
-}
-
-Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
- MessageBuilder builder;
- RETURN_NOT_OK(builder.SetRecordBatch(length, body_length, nodes, buffers));
- RETURN_NOT_OK(builder.Finish());
- return builder.GetBuffer(out);
-}
-
-Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
- MessageBuilder builder;
- RETURN_NOT_OK(builder.SetDictionary(id, length, body_length, nodes, buffers));
- RETURN_NOT_OK(builder.Finish());
- return builder.GetBuffer(out);
-}
-
-Status MessageBuilder::Finish() {
- auto message =
- flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_);
- fbb_.Finish(message);
- return Status::OK();
-}
-
-Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
- int32_t size = fbb_.GetSize();
-
- auto result = std::make_shared<PoolBuffer>();
- RETURN_NOT_OK(result->Resize(size));
-
- uint8_t* dst = result->mutable_data();
- memcpy(dst, fbb_.GetBufferPointer(), size);
-
- *out = result;
- return Status::OK();
-}
-
-} // namespace ipc
-} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
deleted file mode 100644
index 59afecb..0000000
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ /dev/null
@@ -1,83 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef ARROW_IPC_METADATA_INTERNAL_H
-#define ARROW_IPC_METADATA_INTERNAL_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "flatbuffers/flatbuffers.h"
-
-#include "arrow/ipc/File_generated.h"
-#include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata.h"
-
-namespace arrow {
-
-namespace flatbuf = org::apache::arrow::flatbuf;
-
-class Buffer;
-struct Field;
-class Schema;
-class Status;
-
-namespace ipc {
-
-using FBB = flatbuffers::FlatBufferBuilder;
-using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>;
-using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
-using Offset = flatbuffers::Offset<void>;
-
-static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
-
-// Construct a field with type for a dictionary-encoded field. None of its
-// children or children's descendents can be dictionary encoded
-Status FieldFromFlatbufferDictionary(
- const flatbuf::Field* field, std::shared_ptr<Field>* out);
-
-// Construct a field for a non-dictionary-encoded field. Its children may be
-// dictionary encoded
-Status FieldFromFlatbuffer(const flatbuf::Field* field,
- const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out);
-
-Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo,
- flatbuffers::Offset<flatbuf::Schema>* out);
-
-// Serialize arrow::Schema as a Flatbuffer
-//
-// \param[in] schema a Schema instance
-// \param[inout] dictionary_memo class for tracking dictionaries and assigning
-// dictionary ids
-// \param[out] out the serialized arrow::Buffer
-// \return Status outcome
-Status WriteSchemaMessage(
- const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
-
-Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
-
-Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
-
-} // namespace ipc
-} // namespace arrow
-
-#endif // ARROW_IPC_METADATA_INTERNAL_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 71bc5c9..a418d48 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -24,14 +24,14 @@
#include "flatbuffers/flatbuffers.h"
+#include "arrow/array.h"
+#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/File_generated.h"
#include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata-internal.h"
-
-#include "arrow/buffer.h"
#include "arrow/schema.h"
#include "arrow/status.h"
+#include "arrow/type.h"
namespace arrow {
@@ -39,6 +39,643 @@ namespace flatbuf = org::apache::arrow::flatbuf;
namespace ipc {
+using FBB = flatbuffers::FlatBufferBuilder;
+using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
+using FieldOffset = flatbuffers::Offset<flatbuf::Field>;
+using LargeRecordBatchOffset = flatbuffers::Offset<flatbuf::LargeRecordBatch>;
+using RecordBatchOffset = flatbuffers::Offset<flatbuf::RecordBatch>;
+using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
+using Offset = flatbuffers::Offset<void>;
+
+static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
+
+static Status IntFromFlatbuffer(
+ const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
+ if (int_data->bitWidth() > 64) {
+ return Status::NotImplemented("Integers with more than 64 bits not implemented");
+ }
+ if (int_data->bitWidth() < 8) {
+ return Status::NotImplemented("Integers with less than 8 bits not implemented");
+ }
+
+ switch (int_data->bitWidth()) {
+ case 8:
+ *out = int_data->is_signed() ? int8() : uint8();
+ break;
+ case 16:
+ *out = int_data->is_signed() ? int16() : uint16();
+ break;
+ case 32:
+ *out = int_data->is_signed() ? int32() : uint32();
+ break;
+ case 64:
+ *out = int_data->is_signed() ? int64() : uint64();
+ break;
+ default:
+ return Status::NotImplemented("Integers not in cstdint are not implemented");
+ }
+ return Status::OK();
+}
+
+static Status FloatFromFlatuffer(
+ const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
+ if (float_data->precision() == flatbuf::Precision_HALF) {
+ *out = float16();
+ } else if (float_data->precision() == flatbuf::Precision_SINGLE) {
+ *out = float32();
+ } else {
+ *out = float64();
+ }
+ return Status::OK();
+}
+
+// Forward declaration
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+ DictionaryMemo* dictionary_memo, FieldOffset* offset);
+
+static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
+ return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
+}
+
+static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) {
+ return flatbuf::CreateFloatingPoint(fbb, precision).Union();
+}
+
+static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) {
+ FieldOffset field;
+ for (int i = 0; i < type->num_children(); ++i) {
+ RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field));
+ out_children->push_back(field);
+ }
+ return Status::OK();
+}
+
+static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+ Offset* offset) {
+ RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+ *offset = flatbuf::CreateList(fbb).Union();
+ return Status::OK();
+}
+
+static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+ Offset* offset) {
+ RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+ *offset = flatbuf::CreateStruct_(fbb).Union();
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Union implementation
+
+static Status UnionFromFlatbuffer(const flatbuf::Union* union_data,
+ const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
+ UnionMode mode = union_data->mode() == flatbuf::UnionMode_Sparse ? UnionMode::SPARSE
+ : UnionMode::DENSE;
+
+ std::vector<uint8_t> type_codes;
+
+ const flatbuffers::Vector<int32_t>* fb_type_ids = union_data->typeIds();
+ if (fb_type_ids == nullptr) {
+ for (uint8_t i = 0; i < children.size(); ++i) {
+ type_codes.push_back(i);
+ }
+ } else {
+ for (int32_t id : (*fb_type_ids)) {
+ // TODO(wesm): can these values exceed 255?
+ type_codes.push_back(static_cast<uint8_t>(id));
+ }
+ }
+
+ *out = union_(children, type_codes, mode);
+ return Status::OK();
+}
+
+static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+ Offset* offset) {
+ RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+
+ const auto& union_type = static_cast<const UnionType&>(*type);
+
+ flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE
+ ? flatbuf::UnionMode_Sparse
+ : flatbuf::UnionMode_Dense;
+
+ std::vector<int32_t> type_ids;
+ type_ids.reserve(union_type.type_codes.size());
+ for (uint8_t code : union_type.type_codes) {
+ type_ids.push_back(code);
+ }
+
+ auto fb_type_ids = fbb.CreateVector(type_ids);
+
+ *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union();
+ return Status::OK();
+}
+
+#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \
+ *out_type = flatbuf::Type_Int; \
+ *offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
+ break;
+
+static inline flatbuf::TimeUnit ToFlatbufferUnit(TimeUnit unit) {
+ switch (unit) {
+ case TimeUnit::SECOND:
+ return flatbuf::TimeUnit_SECOND;
+ case TimeUnit::MILLI:
+ return flatbuf::TimeUnit_MILLISECOND;
+ case TimeUnit::MICRO:
+ return flatbuf::TimeUnit_MICROSECOND;
+ case TimeUnit::NANO:
+ return flatbuf::TimeUnit_NANOSECOND;
+ default:
+ break;
+ }
+ return flatbuf::TimeUnit_MIN;
+}
+
+static inline TimeUnit FromFlatbufferUnit(flatbuf::TimeUnit unit) {
+ switch (unit) {
+ case flatbuf::TimeUnit_SECOND:
+ return TimeUnit::SECOND;
+ case flatbuf::TimeUnit_MILLISECOND:
+ return TimeUnit::MILLI;
+ case flatbuf::TimeUnit_MICROSECOND:
+ return TimeUnit::MICRO;
+ case flatbuf::TimeUnit_NANOSECOND:
+ return TimeUnit::NANO;
+ default:
+ break;
+ }
+ // cannot reach
+ return TimeUnit::SECOND;
+}
+
+static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
+ const std::vector<std::shared_ptr<Field>>& children, std::shared_ptr<DataType>* out) {
+ switch (type) {
+ case flatbuf::Type_NONE:
+ return Status::Invalid("Type metadata cannot be none");
+ case flatbuf::Type_Int:
+ return IntFromFlatbuffer(static_cast<const flatbuf::Int*>(type_data), out);
+ case flatbuf::Type_FloatingPoint:
+ return FloatFromFlatuffer(
+ static_cast<const flatbuf::FloatingPoint*>(type_data), out);
+ case flatbuf::Type_Binary:
+ *out = binary();
+ return Status::OK();
+ case flatbuf::Type_FixedWidthBinary: {
+ auto fw_binary = static_cast<const flatbuf::FixedWidthBinary*>(type_data);
+ *out = fixed_width_binary(fw_binary->byteWidth());
+ return Status::OK();
+ }
+ case flatbuf::Type_Utf8:
+ *out = utf8();
+ return Status::OK();
+ case flatbuf::Type_Bool:
+ *out = boolean();
+ return Status::OK();
+ case flatbuf::Type_Decimal:
+ return Status::NotImplemented("Decimal");
+ case flatbuf::Type_Date:
+ *out = date();
+ return Status::OK();
+ case flatbuf::Type_Time: {
+ auto time_type = static_cast<const flatbuf::Time*>(type_data);
+ *out = time(FromFlatbufferUnit(time_type->unit()));
+ return Status::OK();
+ }
+ case flatbuf::Type_Timestamp: {
+ auto ts_type = static_cast<const flatbuf::Timestamp*>(type_data);
+ *out = timestamp(FromFlatbufferUnit(ts_type->unit()));
+ return Status::OK();
+ }
+ case flatbuf::Type_Interval:
+ return Status::NotImplemented("Interval");
+ case flatbuf::Type_List:
+ if (children.size() != 1) {
+ return Status::Invalid("List must have exactly 1 child field");
+ }
+ *out = std::make_shared<ListType>(children[0]);
+ return Status::OK();
+ case flatbuf::Type_Struct_:
+ *out = std::make_shared<StructType>(children);
+ return Status::OK();
+ case flatbuf::Type_Union:
+ return UnionFromFlatbuffer(
+ static_cast<const flatbuf::Union*>(type_data), children, out);
+ default:
+ return Status::Invalid("Unrecognized type");
+ }
+}
+
+// TODO(wesm): Convert this to visitor pattern
+static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
+ flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) {
+ if (type->type == Type::DICTIONARY) {
+ // In this library, the dictionary "type" is a logical construct. Here we
+ // pass through to the value type, as we've already captured the index
+ // type in the DictionaryEncoding metadata in the parent field
+ const auto& dict_type = static_cast<const DictionaryType&>(*type);
+ return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout,
+ out_type, dictionary_memo, offset);
+ }
+
+ std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
+ for (const BufferDescr& descr : buffer_layout) {
+ flatbuf::VectorType vector_type;
+ switch (descr.type()) {
+ case BufferType::OFFSET:
+ vector_type = flatbuf::VectorType_OFFSET;
+ break;
+ case BufferType::DATA:
+ vector_type = flatbuf::VectorType_DATA;
+ break;
+ case BufferType::VALIDITY:
+ vector_type = flatbuf::VectorType_VALIDITY;
+ break;
+ case BufferType::TYPE:
+ vector_type = flatbuf::VectorType_TYPE;
+ break;
+ default:
+ vector_type = flatbuf::VectorType_DATA;
+ break;
+ }
+ auto offset = flatbuf::CreateVectorLayout(
+ fbb, static_cast<int16_t>(descr.bit_width()), vector_type);
+ layout->push_back(offset);
+ }
+
+ switch (type->type) {
+ case Type::BOOL:
+ *out_type = flatbuf::Type_Bool;
+ *offset = flatbuf::CreateBool(fbb).Union();
+ break;
+ case Type::UINT8:
+ INT_TO_FB_CASE(8, false);
+ case Type::INT8:
+ INT_TO_FB_CASE(8, true);
+ case Type::UINT16:
+ INT_TO_FB_CASE(16, false);
+ case Type::INT16:
+ INT_TO_FB_CASE(16, true);
+ case Type::UINT32:
+ INT_TO_FB_CASE(32, false);
+ case Type::INT32:
+ INT_TO_FB_CASE(32, true);
+ case Type::UINT64:
+ INT_TO_FB_CASE(64, false);
+ case Type::INT64:
+ INT_TO_FB_CASE(64, true);
+ case Type::FLOAT:
+ *out_type = flatbuf::Type_FloatingPoint;
+ *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_SINGLE);
+ break;
+ case Type::DOUBLE:
+ *out_type = flatbuf::Type_FloatingPoint;
+ *offset = FloatToFlatbuffer(fbb, flatbuf::Precision_DOUBLE);
+ break;
+ case Type::FIXED_WIDTH_BINARY: {
+ const auto& fw_type = static_cast<const FixedWidthBinaryType&>(*type);
+ *out_type = flatbuf::Type_FixedWidthBinary;
+ *offset = flatbuf::CreateFixedWidthBinary(fbb, fw_type.byte_width()).Union();
+ } break;
+ case Type::BINARY:
+ *out_type = flatbuf::Type_Binary;
+ *offset = flatbuf::CreateBinary(fbb).Union();
+ break;
+ case Type::STRING:
+ *out_type = flatbuf::Type_Utf8;
+ *offset = flatbuf::CreateUtf8(fbb).Union();
+ break;
+ case Type::DATE:
+ *out_type = flatbuf::Type_Date;
+ *offset = flatbuf::CreateDate(fbb).Union();
+ break;
+ case Type::TIME: {
+ const auto& time_type = static_cast<const TimeType&>(*type);
+ *out_type = flatbuf::Type_Time;
+ *offset = flatbuf::CreateTime(fbb, ToFlatbufferUnit(time_type.unit)).Union();
+ } break;
+ case Type::TIMESTAMP: {
+ const auto& ts_type = static_cast<const TimestampType&>(*type);
+ *out_type = flatbuf::Type_Timestamp;
+ *offset = flatbuf::CreateTimestamp(fbb, ToFlatbufferUnit(ts_type.unit)).Union();
+ } break;
+ case Type::LIST:
+ *out_type = flatbuf::Type_List;
+ return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset);
+ case Type::STRUCT:
+ *out_type = flatbuf::Type_Struct_;
+ return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset);
+ case Type::UNION:
+ *out_type = flatbuf::Type_Union;
+ return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset);
+ default:
+ *out_type = flatbuf::Type_NONE; // Make clang-tidy happy
+ std::stringstream ss;
+ ss << "Unable to convert type: " << type->ToString() << std::endl;
+ return Status::NotImplemented(ss.str());
+ }
+ return Status::OK();
+}
+
+static DictionaryOffset GetDictionaryEncoding(
+ FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) {
+ int64_t dictionary_id = memo->GetId(type.dictionary());
+
+ // We assume that the dictionary index type (as an integer) has already been
+ // validated elsewhere, and can safely assume we are dealing with signed
+ // integers
+ const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type());
+
+ auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true);
+
+ // TODO(wesm): ordered dictionaries
+ return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset);
+}
+
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+ DictionaryMemo* dictionary_memo, FieldOffset* offset) {
+ auto fb_name = fbb.CreateString(field->name);
+
+ flatbuf::Type type_enum;
+ Offset type_offset;
+ Offset type_layout;
+ std::vector<FieldOffset> children;
+ std::vector<VectorLayoutOffset> layout;
+
+ RETURN_NOT_OK(TypeToFlatbuffer(
+ fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset));
+ auto fb_children = fbb.CreateVector(children);
+ auto fb_layout = fbb.CreateVector(layout);
+
+ DictionaryOffset dictionary = 0;
+ if (field->type->type == Type::DICTIONARY) {
+ dictionary = GetDictionaryEncoding(
+ fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo);
+ }
+
+ // TODO: produce the list of VectorTypes
+ *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset,
+ dictionary, fb_children, fb_layout);
+
+ return Status::OK();
+}
+
+static Status FieldFromFlatbuffer(const flatbuf::Field* field,
+ const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) {
+ std::shared_ptr<DataType> type;
+
+ const flatbuf::DictionaryEncoding* encoding = field->dictionary();
+
+ if (encoding == nullptr) {
+ // The field is not dictionary encoded. We must potentially visit its
+ // children to fully reconstruct the data type
+ auto children = field->children();
+ std::vector<std::shared_ptr<Field>> child_fields(children->size());
+ for (int i = 0; i < static_cast<int>(children->size()); ++i) {
+ RETURN_NOT_OK(
+ FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
+ }
+ RETURN_NOT_OK(
+ TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
+ } else {
+ // The field is dictionary encoded. The type of the dictionary values has
+ // been determined elsewhere, and is stored in the DictionaryMemo. Here we
+ // construct the logical DictionaryType object
+
+ std::shared_ptr<Array> dictionary;
+ RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary));
+
+ std::shared_ptr<DataType> index_type;
+ RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type));
+ type = std::make_shared<DictionaryType>(index_type, dictionary);
+ }
+ *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
+ return Status::OK();
+}
+
+static Status FieldFromFlatbufferDictionary(
+ const flatbuf::Field* field, std::shared_ptr<Field>* out) {
+ // Need an empty memo to pass down for constructing children
+ DictionaryMemo dummy_memo;
+
+ // Any DictionaryEncoding set is ignored here
+
+ std::shared_ptr<DataType> type;
+ auto children = field->children();
+ std::vector<std::shared_ptr<Field>> child_fields(children->size());
+ for (int i = 0; i < static_cast<int>(children->size()); ++i) {
+ RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i]));
+ }
+
+ RETURN_NOT_OK(
+ TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
+
+ *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
+ return Status::OK();
+}
+
+// will return the endianness of the system we are running on
+// based the NUMPY_API function. See NOTICE.txt
+flatbuf::Endianness endianness() {
+ union {
+ uint32_t i;
+ char c[4];
+ } bint = {0x01020304};
+
+ return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little;
+}
+
+static Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema,
+ DictionaryMemo* dictionary_memo, flatbuffers::Offset<flatbuf::Schema>* out) {
+ std::vector<FieldOffset> field_offsets;
+ for (int i = 0; i < schema.num_fields(); ++i) {
+ std::shared_ptr<Field> field = schema.field(i);
+ FieldOffset offset;
+ RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset));
+ field_offsets.push_back(offset);
+ }
+
+ *out = flatbuf::CreateSchema(fbb, endianness(), fbb.CreateVector(field_offsets));
+ return Status::OK();
+}
+
+static Status WriteFlatbufferBuilder(FBB& fbb, std::shared_ptr<Buffer>* out) {
+ int32_t size = fbb.GetSize();
+
+ auto result = std::make_shared<PoolBuffer>();
+ RETURN_NOT_OK(result->Resize(size));
+
+ uint8_t* dst = result->mutable_data();
+ memcpy(dst, fbb.GetBufferPointer(), size);
+ *out = result;
+ return Status::OK();
+}
+
+static Status WriteMessage(FBB& fbb, flatbuf::MessageHeader header_type,
+ flatbuffers::Offset<void> header, int64_t body_length, std::shared_ptr<Buffer>* out) {
+ auto message =
+ flatbuf::CreateMessage(fbb, kMetadataVersion, header_type, header, body_length);
+ fbb.Finish(message);
+ return WriteFlatbufferBuilder(fbb, out);
+}
+
+Status WriteSchemaMessage(
+ const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) {
+ FBB fbb;
+ flatbuffers::Offset<flatbuf::Schema> fb_schema;
+ RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
+ return WriteMessage(fbb, flatbuf::MessageHeader_Schema, fb_schema.Union(), 0, out);
+}
+
+using FieldNodeVector =
+ flatbuffers::Offset<flatbuffers::Vector<const flatbuf::FieldNode*>>;
+using LargeFieldNodeVector =
+ flatbuffers::Offset<flatbuffers::Vector<const flatbuf::LargeFieldNode*>>;
+using BufferVector = flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Buffer*>>;
+
+static Status WriteFieldNodes(
+ FBB& fbb, const std::vector<FieldMetadata>& nodes, FieldNodeVector* out) {
+ std::vector<flatbuf::FieldNode> fb_nodes;
+ fb_nodes.reserve(nodes.size());
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ const FieldMetadata& node = nodes[i];
+ if (node.offset != 0) {
+ return Status::Invalid("Field metadata for IPC must have offset 0");
+ }
+ fb_nodes.emplace_back(
+ static_cast<int32_t>(node.length), static_cast<int32_t>(node.null_count));
+ }
+ *out = fbb.CreateVectorOfStructs(fb_nodes);
+ return Status::OK();
+}
+
+static Status WriteLargeFieldNodes(
+ FBB& fbb, const std::vector<FieldMetadata>& nodes, LargeFieldNodeVector* out) {
+ std::vector<flatbuf::LargeFieldNode> fb_nodes;
+ fb_nodes.reserve(nodes.size());
+
+ for (size_t i = 0; i < nodes.size(); ++i) {
+ const FieldMetadata& node = nodes[i];
+ if (node.offset != 0) {
+ return Status::Invalid("Field metadata for IPC must have offset 0");
+ }
+ fb_nodes.emplace_back(node.length, node.null_count);
+ }
+ *out = fbb.CreateVectorOfStructs(fb_nodes);
+ return Status::OK();
+}
+
+static Status WriteBuffers(
+ FBB& fbb, const std::vector<BufferMetadata>& buffers, BufferVector* out) {
+ std::vector<flatbuf::Buffer> fb_buffers;
+ fb_buffers.reserve(buffers.size());
+
+ for (size_t i = 0; i < buffers.size(); ++i) {
+ const BufferMetadata& buffer = buffers[i];
+ fb_buffers.emplace_back(buffer.page, buffer.offset, buffer.length);
+ }
+ *out = fbb.CreateVectorOfStructs(fb_buffers);
+ return Status::OK();
+}
+
+static Status MakeRecordBatch(FBB& fbb, int32_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ RecordBatchOffset* offset) {
+ FieldNodeVector fb_nodes;
+ BufferVector fb_buffers;
+
+ RETURN_NOT_OK(WriteFieldNodes(fbb, nodes, &fb_nodes));
+ RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));
+
+ *offset = flatbuf::CreateRecordBatch(fbb, length, fb_nodes, fb_buffers);
+ return Status::OK();
+}
+
+static Status MakeLargeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ LargeRecordBatchOffset* offset) {
+ LargeFieldNodeVector fb_nodes;
+ BufferVector fb_buffers;
+
+ RETURN_NOT_OK(WriteLargeFieldNodes(fbb, nodes, &fb_nodes));
+ RETURN_NOT_OK(WriteBuffers(fbb, buffers, &fb_buffers));
+
+ *offset = flatbuf::CreateLargeRecordBatch(fbb, length, fb_nodes, fb_buffers);
+ return Status::OK();
+}
+
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out) {
+ FBB fbb;
+ RecordBatchOffset record_batch;
+ RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+ return WriteMessage(
+ fbb, flatbuf::MessageHeader_RecordBatch, record_batch.Union(), body_length, out);
+}
+
+Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out) {
+ FBB fbb;
+ LargeRecordBatchOffset large_batch;
+ RETURN_NOT_OK(
+ MakeLargeRecordBatch(fbb, length, body_length, nodes, buffers, &large_batch));
+ return WriteMessage(fbb, flatbuf::MessageHeader_LargeRecordBatch, large_batch.Union(),
+ body_length, out);
+}
+
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out) {
+ FBB fbb;
+ RecordBatchOffset record_batch;
+ RETURN_NOT_OK(MakeRecordBatch(fbb, length, body_length, nodes, buffers, &record_batch));
+ auto dictionary_batch = flatbuf::CreateDictionaryBatch(fbb, id, record_batch).Union();
+ return WriteMessage(
+ fbb, flatbuf::MessageHeader_DictionaryBatch, dictionary_batch, body_length, out);
+}
+
+static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
+FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
+ std::vector<flatbuf::Block> fb_blocks;
+
+ for (const FileBlock& block : blocks) {
+ fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
+ }
+
+ return fbb.CreateVectorOfStructs(fb_blocks);
+}
+
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+ const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
+ io::OutputStream* out) {
+ FBB fbb;
+
+ flatbuffers::Offset<flatbuf::Schema> fb_schema;
+ RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
+
+ auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
+ auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
+
+ auto footer = flatbuf::CreateFooter(
+ fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
+
+ fbb.Finish(footer);
+
+ int32_t size = fbb.GetSize();
+
+ return out->Write(fbb.GetBufferPointer(), size);
+}
+
// ----------------------------------------------------------------------
// Memoization data structure for handling shared dictionaries
@@ -158,7 +795,18 @@ int64_t Message::body_length() const {
// ----------------------------------------------------------------------
// SchemaMetadata
-class SchemaMetadata::SchemaMetadataImpl {
+class MessageHolder {
+ public:
+ void set_message(const std::shared_ptr<Message>& message) { message_ = message; }
+ void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; }
+
+ protected:
+ // Possible parents, owns the flatbuffer data
+ std::shared_ptr<Message> message_;
+ std::shared_ptr<Buffer> buffer_;
+};
+
+class SchemaMetadata::SchemaMetadataImpl : public MessageHolder {
public:
explicit SchemaMetadataImpl(const void* schema)
: schema_(static_cast<const flatbuf::Schema*>(schema)) {}
@@ -196,15 +844,19 @@ class SchemaMetadata::SchemaMetadataImpl {
const flatbuf::Schema* schema_;
};
-SchemaMetadata::SchemaMetadata(
- const std::shared_ptr<Message>& message, const void* flatbuf) {
- message_ = message;
- impl_.reset(new SchemaMetadataImpl(flatbuf));
+SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message)
+ : SchemaMetadata(message->impl_->header()) {
+ impl_->set_message(message);
}
-SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) {
- message_ = message;
- impl_.reset(new SchemaMetadataImpl(message->impl_->header()));
+SchemaMetadata::SchemaMetadata(const void* header) {
+ impl_.reset(new SchemaMetadataImpl(header));
+}
+
+SchemaMetadata::SchemaMetadata(const std::shared_ptr<Buffer>& buffer, int64_t offset)
+ : SchemaMetadata(buffer->data() + offset) {
+ // Preserve ownership
+ impl_->set_buffer(buffer);
}
SchemaMetadata::~SchemaMetadata() {}
@@ -231,7 +883,7 @@ Status SchemaMetadata::GetSchema(
// ----------------------------------------------------------------------
// RecordBatchMetadata
-class RecordBatchMetadata::RecordBatchMetadataImpl {
+class RecordBatchMetadata::RecordBatchMetadataImpl : public MessageHolder {
public:
explicit RecordBatchMetadataImpl(const void* batch)
: batch_(static_cast<const flatbuf::RecordBatch*>(batch)) {
@@ -249,22 +901,14 @@ class RecordBatchMetadata::RecordBatchMetadataImpl {
int num_fields() const { return batch_->nodes()->size(); }
- void set_message(const std::shared_ptr<Message>& message) { message_ = message; }
-
- void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; }
-
private:
const flatbuf::RecordBatch* batch_;
const flatbuffers::Vector<const flatbuf::FieldNode*>* nodes_;
const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_;
-
- // Possible parents, owns the flatbuffer data
- std::shared_ptr<Message> message_;
- std::shared_ptr<Buffer> buffer_;
};
-RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) {
- impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
+RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message)
+ : RecordBatchMetadata(message->impl_->header()) {
impl_->set_message(message);
}
@@ -358,8 +1002,8 @@ const RecordBatchMetadata& DictionaryBatchMetadata::record_batch() const {
// ----------------------------------------------------------------------
// Conveniences
-Status ReadMessage(int64_t offset, int32_t metadata_length,
- io::RandomAccessFile* file, std::shared_ptr<Message>* message) {
+Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
+ std::shared_ptr<Message>* message) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 4eb0186..41e6c5e 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -107,10 +107,9 @@ class Message;
// Container for serialized Schema metadata contained in an IPC message
class ARROW_EXPORT SchemaMetadata {
public:
+ explicit SchemaMetadata(const void* header);
explicit SchemaMetadata(const std::shared_ptr<Message>& message);
-
- // Accepts an opaque flatbuffer pointer
- SchemaMetadata(const std::shared_ptr<Message>& message, const void* schema);
+ SchemaMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
~SchemaMetadata();
@@ -127,9 +126,6 @@ class ARROW_EXPORT SchemaMetadata {
const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const;
private:
- // Parent, owns the flatbuffer data
- std::shared_ptr<Message> message_;
-
class SchemaMetadataImpl;
std::unique_ptr<SchemaMetadataImpl> impl_;
@@ -145,8 +141,6 @@ struct ARROW_EXPORT BufferMetadata {
// Container for serialized record batch metadata contained in an IPC message
class ARROW_EXPORT RecordBatchMetadata {
public:
- // Instantiate from opaque pointer. Memory ownership must be preserved
- // elsewhere (e.g. in a dictionary batch)
explicit RecordBatchMetadata(const void* header);
explicit RecordBatchMetadata(const std::shared_ptr<Message>& message);
RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
@@ -218,8 +212,34 @@ class ARROW_EXPORT Message {
/// \param[in] file the seekable file interface to read from
/// \param[out] message the message read
/// \return Status success or failure
-Status ReadMessage(int64_t offset, int32_t metadata_length,
- io::RandomAccessFile* file, std::shared_ptr<Message>* message);
+Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
+ std::shared_ptr<Message>* message);
+
+// Serialize arrow::Schema as a Flatbuffer
+//
+// \param[in] schema a Schema instance
+// \param[inout] dictionary_memo class for tracking dictionaries and assigning
+// dictionary ids
+// \param[out] out the serialized arrow::Buffer
+// \return Status outcome
+Status WriteSchemaMessage(
+ const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
+
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out);
+
+Status WriteLargeRecordBatchMessage(int64_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out);
+
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+ const std::vector<FieldMetadata>& nodes, const std::vector<BufferMetadata>& buffers,
+ std::shared_ptr<Buffer>* out);
+
+Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
+ const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
+ io::OutputStream* out);
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 9575364..a2b20a9 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -26,17 +26,115 @@
#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata-internal.h"
+#include "arrow/ipc/File_generated.h"
+#include "arrow/ipc/Message_generated.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
+#include "arrow/schema.h"
#include "arrow/status.h"
+#include "arrow/table.h"
#include "arrow/util/logging.h"
namespace arrow {
+
+namespace flatbuf = org::apache::arrow::flatbuf;
+
namespace ipc {
// ----------------------------------------------------------------------
+// Record batch read path
+
+class IpcComponentSource : public ArrayComponentSource {
+ public:
+ IpcComponentSource(const RecordBatchMetadata& metadata, io::RandomAccessFile* file)
+ : metadata_(metadata), file_(file) {}
+
+ Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
+ BufferMetadata buffer_meta = metadata_.buffer(buffer_index);
+ if (buffer_meta.length == 0) {
+ *out = nullptr;
+ return Status::OK();
+ } else {
+ return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out);
+ }
+ }
+
+ Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
+ // pop off a field
+ if (field_index >= metadata_.num_fields()) {
+ return Status::Invalid("Ran out of field metadata, likely malformed");
+ }
+ *metadata = metadata_.field(field_index);
+ return Status::OK();
+ }
+
+ private:
+ const RecordBatchMetadata& metadata_;
+ io::RandomAccessFile* file_;
+};
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+ const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
+ std::shared_ptr<RecordBatch>* out) {
+ return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
+}
+
+static Status LoadRecordBatchFromSource(const std::shared_ptr<Schema>& schema,
+ int64_t num_rows, int max_recursion_depth, ArrayComponentSource* source,
+ std::shared_ptr<RecordBatch>* out) {
+ std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
+
+ ArrayLoaderContext context;
+ context.source = source;
+ context.field_index = 0;
+ context.buffer_index = 0;
+ context.max_recursion_depth = max_recursion_depth;
+
+ for (int i = 0; i < schema->num_fields(); ++i) {
+ RETURN_NOT_OK(LoadArray(schema->field(i)->type, &context, &arrays[i]));
+ }
+
+ *out = std::make_shared<RecordBatch>(schema, num_rows, arrays);
+ return Status::OK();
+}
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+ const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+ io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
+ IpcComponentSource source(metadata, file);
+ return LoadRecordBatchFromSource(
+ schema, metadata.length(), max_recursion_depth, &source, out);
+}
+
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+ const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
+ std::shared_ptr<Array>* out) {
+ int64_t id = metadata.id();
+ auto it = dictionary_types.find(id);
+ if (it == dictionary_types.end()) {
+ std::stringstream ss;
+ ss << "Do not have type metadata for dictionary with id: " << id;
+ return Status::KeyError(ss.str());
+ }
+
+ std::vector<std::shared_ptr<Field>> fields = {it->second};
+
+ // We need a schema for the record batch
+ auto dummy_schema = std::make_shared<Schema>(fields);
+
+ // The dictionary is embedded in a record batch with a single column
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, &batch));
+
+ if (batch->num_columns() != 1) {
+ return Status::Invalid("Dictionary record batch must only contain one field");
+ }
+
+ *out = batch->column(0);
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
// StreamReader implementation
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
@@ -228,7 +326,7 @@ class FileReader::FileReaderImpl {
// TODO(wesm): Verify the footer
footer_ = flatbuf::GetFooter(footer_buffer_->data());
- schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema()));
+ schema_metadata_.reset(new SchemaMetadata(footer_->schema()));
return Status::OK();
}
@@ -307,8 +405,7 @@ class FileReader::FileReaderImpl {
return schema_metadata_->GetSchema(*dictionary_memo_, &schema_);
}
- Status Open(
- const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset) {
+ Status Open(const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset) {
file_ = file;
footer_offset_ = footer_offset;
RETURN_NOT_OK(ReadFooter());
@@ -371,5 +468,69 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
return impl_->GetRecordBatch(i, batch);
}
+// ----------------------------------------------------------------------
+// Read LargeRecordBatch
+
+class LargeRecordBatchSource : public ArrayComponentSource {
+ public:
+ LargeRecordBatchSource(
+ const flatbuf::LargeRecordBatch* metadata, io::RandomAccessFile* file)
+ : metadata_(metadata), file_(file) {}
+
+ Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
+ if (buffer_index >= static_cast<int>(metadata_->buffers()->size())) {
+ return Status::Invalid("Ran out of buffer metadata, likely malformed");
+ }
+ const flatbuf::Buffer* buffer = metadata_->buffers()->Get(buffer_index);
+
+ if (buffer->length() == 0) {
+ *out = nullptr;
+ return Status::OK();
+ } else {
+ return file_->ReadAt(buffer->offset(), buffer->length(), out);
+ }
+ }
+
+ Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
+ // pop off a field
+ if (field_index >= static_cast<int>(metadata_->nodes()->size())) {
+ return Status::Invalid("Ran out of field metadata, likely malformed");
+ }
+ const flatbuf::LargeFieldNode* node = metadata_->nodes()->Get(field_index);
+
+ metadata->length = node->length();
+ metadata->null_count = node->null_count();
+ metadata->offset = 0;
+ return Status::OK();
+ }
+
+ private:
+ const flatbuf::LargeRecordBatch* metadata_;
+ io::RandomAccessFile* file_;
+};
+
+Status ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema, int64_t offset,
+ io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out) {
+ std::shared_ptr<Buffer> buffer;
+ RETURN_NOT_OK(file->Seek(offset));
+
+ RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer));
+ int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+
+ RETURN_NOT_OK(file->Read(flatbuffer_size, &buffer));
+ auto message = flatbuf::GetMessage(buffer->data());
+ auto batch = reinterpret_cast<const flatbuf::LargeRecordBatch*>(message->header());
+
+ // TODO(ARROW-388): The buffer offsets start at 0, so we must construct a
+ // RandomAccessFile according to that frame of reference
+ std::shared_ptr<Buffer> buffer_payload;
+ RETURN_NOT_OK(file->Read(message->bodyLength(), &buffer_payload));
+ io::BufferReader buffer_reader(buffer_payload);
+
+ LargeRecordBatchSource source(batch, &buffer_reader);
+ return LoadRecordBatchFromSource(
+ schema, batch->length(), kMaxNestingDepth, &source, out);
+}
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index ca91765..1c1314a 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -43,6 +43,20 @@ class RandomAccessFile;
namespace ipc {
+// Generic read functionsh; does not copy data if the input supports zero copy reads
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+ const std::shared_ptr<Schema>& schema, io::RandomAccessFile* file,
+ std::shared_ptr<RecordBatch>* out);
+
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
+ const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+ io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
+
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+ const DictionaryTypeMap& dictionary_types, io::RandomAccessFile* file,
+ std::shared_ptr<Array>* out);
+
class ARROW_EXPORT StreamReader {
public:
~StreamReader();
@@ -106,6 +120,14 @@ class ARROW_EXPORT FileReader {
std::unique_ptr<FileReaderImpl> impl_;
};
+// ----------------------------------------------------------------------
+//
+
+/// EXPERIMENTAL: Read length-prefixed LargeRecordBatch metadata (64-bit array
+/// lengths) at offset and reconstruct RecordBatch
+Status ARROW_EXPORT ReadLargeRecordBatch(const std::shared_ptr<Schema>& schema,
+ int64_t offset, io::RandomAccessFile* file, std::shared_ptr<RecordBatch>* out);
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 66a5e09..ba203b0 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -103,7 +103,7 @@ Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_li
typedef Status MakeRecordBatch(std::shared_ptr<RecordBatch>* out);
Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
- const int length = 1000;
+ const int length = 10;
// Make the schema
auto f0 = field("f0", int32());
http://git-wip-us.apache.org/repos/asf/arrow/blob/df2220f3/cpp/src/arrow/ipc/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 58402b5..82c119e 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -17,28 +17,510 @@
#include "arrow/ipc/writer.h"
+#include <algorithm>
#include <cstdint>
#include <cstring>
+#include <limits>
#include <sstream>
#include <vector>
+#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
-#include "arrow/ipc/adapter.h"
-#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
+#include "arrow/loader.h"
#include "arrow/memory_pool.h"
#include "arrow/schema.h"
#include "arrow/status.h"
#include "arrow/table.h"
+#include "arrow/type.h"
+#include "arrow/util/bit-util.h"
#include "arrow/util/logging.h"
namespace arrow {
namespace ipc {
// ----------------------------------------------------------------------
+// Record batch write path
+
+class RecordBatchWriter : public ArrayVisitor {
+ public:
+ RecordBatchWriter(
+ MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth)
+ : pool_(pool),
+ max_recursion_depth_(max_recursion_depth),
+ buffer_start_offset_(buffer_start_offset) {
+ DCHECK_GT(max_recursion_depth, 0);
+ }
+
+ virtual ~RecordBatchWriter() = default;
+
+ virtual Status CheckArrayMetadata(const Array& arr) {
+ if (arr.length() > std::numeric_limits<int32_t>::max()) {
+ return Status::Invalid("Cannot write arrays larger than 2^31 - 1 in length");
+ }
+ return Status::OK();
+ }
+
+ Status VisitArray(const Array& arr) {
+ if (max_recursion_depth_ <= 0) {
+ return Status::Invalid("Max recursion depth reached");
+ }
+
+ RETURN_NOT_OK(CheckArrayMetadata(arr));
+
+ // push back all common elements
+ field_nodes_.emplace_back(arr.length(), arr.null_count(), 0);
+
+ if (arr.null_count() > 0) {
+ std::shared_ptr<Buffer> bitmap = arr.null_bitmap();
+
+ if (arr.offset() != 0) {
+ // With a sliced array / non-zero offset, we must copy the bitmap
+ RETURN_NOT_OK(
+ CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap));
+ }
+
+ buffers_.push_back(bitmap);
+ } else {
+ // Push a dummy zero-length buffer, not to be copied
+ buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
+ }
+ return arr.Accept(this);
+ }
+
+ Status Assemble(const RecordBatch& batch, int64_t* body_length) {
+ if (field_nodes_.size() > 0) {
+ field_nodes_.clear();
+ buffer_meta_.clear();
+ buffers_.clear();
+ }
+
+ // Perform depth-first traversal of the row-batch
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ RETURN_NOT_OK(VisitArray(*batch.column(i)));
+ }
+
+ // The position for the start of a buffer relative to the passed frame of
+ // reference. May be 0 or some other position in an address space
+ int64_t offset = buffer_start_offset_;
+
+ buffer_meta_.reserve(buffers_.size());
+
+ const int32_t kNoPageId = -1;
+
+ // Construct the buffer metadata for the record batch header
+ for (size_t i = 0; i < buffers_.size(); ++i) {
+ const Buffer* buffer = buffers_[i].get();
+ int64_t size = 0;
+ int64_t padding = 0;
+
+ // The buffer might be null if we are handling zero row lengths.
+ if (buffer) {
+ size = buffer->size();
+ padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+ }
+
+ // TODO(wesm): We currently have no notion of shared memory page id's,
+ // but we've included it in the metadata IDL for when we have it in the
+ // future. Use page = -1 for now
+ //
+ // Note that page ids are a bespoke notion for Arrow and not a feature we
+ // are using from any OS-level shared memory. The thought is that systems
+ // may (in the future) associate integer page id's with physical memory
+ // pages (according to whatever is the desired shared memory mechanism)
+ buffer_meta_.push_back({kNoPageId, offset, size + padding});
+ offset += size + padding;
+ }
+
+ *body_length = offset - buffer_start_offset_;
+ DCHECK(BitUtil::IsMultipleOf64(*body_length));
+
+ return Status::OK();
+ }
+
+ // Override this for writing dictionary metadata
+ virtual Status WriteMetadataMessage(
+ int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
+ return WriteRecordBatchMessage(
+ static_cast<int32_t>(num_rows), body_length, field_nodes_, buffer_meta_, out);
+ }
+
+ Status WriteMetadata(int64_t num_rows, int64_t body_length, io::OutputStream* dst,
+ int32_t* metadata_length) {
+ // Now that we have computed the locations of all of the buffers in shared
+ // memory, the data header can be converted to a flatbuffer and written out
+ //
+ // Note: The memory written here is prefixed by the size of the flatbuffer
+ // itself as an int32_t.
+ std::shared_ptr<Buffer> metadata_fb;
+ RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb));
+
+ // Need to write 4 bytes (metadata size), the metadata, plus padding to
+ // end on an 8-byte offset
+ int64_t start_offset;
+ RETURN_NOT_OK(dst->Tell(&start_offset));
+
+ int32_t padded_metadata_length = static_cast<int32_t>(metadata_fb->size()) + 4;
+ const int32_t remainder =
+ (padded_metadata_length + static_cast<int32_t>(start_offset)) % 8;
+ if (remainder != 0) { padded_metadata_length += 8 - remainder; }
+
+ // The returned metadata size includes the length prefix, the flatbuffer,
+ // plus padding
+ *metadata_length = padded_metadata_length;
+
+ // Write the flatbuffer size prefix including padding
+ int32_t flatbuffer_size = padded_metadata_length - 4;
+ RETURN_NOT_OK(
+ dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
+
+ // Write the flatbuffer
+ RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size()));
+
+ // Write any padding
+ int32_t padding =
+ padded_metadata_length - static_cast<int32_t>(metadata_fb->size()) - 4;
+ if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+
+ return Status::OK();
+ }
+
+ Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length) {
+ RETURN_NOT_OK(Assemble(batch, body_length));
+
+#ifndef NDEBUG
+ int64_t start_position, current_position;
+ RETURN_NOT_OK(dst->Tell(&start_position));
+#endif
+
+ RETURN_NOT_OK(WriteMetadata(batch.num_rows(), *body_length, dst, metadata_length));
+
+#ifndef NDEBUG
+ RETURN_NOT_OK(dst->Tell(¤t_position));
+ DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+ // Now write the buffers
+ for (size_t i = 0; i < buffers_.size(); ++i) {
+ const Buffer* buffer = buffers_[i].get();
+ int64_t size = 0;
+ int64_t padding = 0;
+
+ // The buffer might be null if we are handling zero row lengths.
+ if (buffer) {
+ size = buffer->size();
+ padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+ }
+
+ if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
+
+ if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+ }
+
+#ifndef NDEBUG
+ RETURN_NOT_OK(dst->Tell(¤t_position));
+ DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+ return Status::OK();
+ }
+
+ Status GetTotalSize(const RecordBatch& batch, int64_t* size) {
+ // emulates the behavior of Write without actually writing
+ int32_t metadata_length = 0;
+ int64_t body_length = 0;
+ MockOutputStream dst;
+ RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length));
+ *size = dst.GetExtentBytesWritten();
+ return Status::OK();
+ }
+
+ protected:
+ template <typename ArrayType>
+ Status VisitFixedWidth(const ArrayType& array) {
+ std::shared_ptr<Buffer> data_buffer = array.data();
+
+ if (array.offset() != 0) {
+ // Non-zero offset, slice the buffer
+ const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
+ const int type_width = fw_type.bit_width() / 8;
+ const int64_t byte_offset = array.offset() * type_width;
+
+ // Send padding if it's available
+ const int64_t buffer_length =
+ std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
+ data_buffer->size() - byte_offset);
+ data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length);
+ }
+ buffers_.push_back(data_buffer);
+ return Status::OK();
+ }
+
+ template <typename ArrayType>
+ Status GetZeroBasedValueOffsets(
+ const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) {
+ // Share slicing logic between ListArray and BinaryArray
+
+ auto offsets = array.value_offsets();
+
+ if (array.offset() != 0) {
+ // If we have a non-zero offset, then the value offsets do not start at
+ // zero. We must a) create a new offsets array with shifted offsets and
+ // b) slice the values array accordingly
+
+ std::shared_ptr<MutableBuffer> shifted_offsets;
+ RETURN_NOT_OK(AllocateBuffer(
+ pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets));
+
+ int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data());
+ const int32_t start_offset = array.value_offset(0);
+
+ for (int i = 0; i < array.length(); ++i) {
+ dest_offsets[i] = array.value_offset(i) - start_offset;
+ }
+ // Final offset
+ dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset;
+ offsets = shifted_offsets;
+ }
+
+ *value_offsets = offsets;
+ return Status::OK();
+ }
+
+ Status VisitBinary(const BinaryArray& array) {
+ std::shared_ptr<Buffer> value_offsets;
+ RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets));
+ auto data = array.data();
+
+ if (array.offset() != 0) {
+ // Slice the data buffer to include only the range we need now
+ data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length()));
+ }
+
+ buffers_.push_back(value_offsets);
+ buffers_.push_back(data);
+ return Status::OK();
+ }
+
+ Status Visit(const FixedWidthBinaryArray& array) override {
+ auto data = array.data();
+ int32_t width = array.byte_width();
+
+ if (array.offset() != 0) {
+ data = SliceBuffer(data, array.offset() * width, width * array.length());
+ }
+ buffers_.push_back(data);
+ return Status::OK();
+ }
+
+ Status Visit(const BooleanArray& array) override {
+ buffers_.push_back(array.data());
+ return Status::OK();
+ }
+
+#define VISIT_FIXED_WIDTH(TYPE) \
+ Status Visit(const TYPE& array) override { return VisitFixedWidth<TYPE>(array); }
+
+ VISIT_FIXED_WIDTH(Int8Array);
+ VISIT_FIXED_WIDTH(Int16Array);
+ VISIT_FIXED_WIDTH(Int32Array);
+ VISIT_FIXED_WIDTH(Int64Array);
+ VISIT_FIXED_WIDTH(UInt8Array);
+ VISIT_FIXED_WIDTH(UInt16Array);
+ VISIT_FIXED_WIDTH(UInt32Array);
+ VISIT_FIXED_WIDTH(UInt64Array);
+ VISIT_FIXED_WIDTH(HalfFloatArray);
+ VISIT_FIXED_WIDTH(FloatArray);
+ VISIT_FIXED_WIDTH(DoubleArray);
+ VISIT_FIXED_WIDTH(DateArray);
+ VISIT_FIXED_WIDTH(Date32Array);
+ VISIT_FIXED_WIDTH(TimeArray);
+ VISIT_FIXED_WIDTH(TimestampArray);
+
+#undef VISIT_FIXED_WIDTH
+
+ Status Visit(const StringArray& array) override { return VisitBinary(array); }
+
+ Status Visit(const BinaryArray& array) override { return VisitBinary(array); }
+
+ Status Visit(const ListArray& array) override {
+ std::shared_ptr<Buffer> value_offsets;
+ RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
+ buffers_.push_back(value_offsets);
+
+ --max_recursion_depth_;
+ std::shared_ptr<Array> values = array.values();
+
+ if (array.offset() != 0) {
+ // For non-zero offset, we slice the values array accordingly
+ const int32_t offset = array.value_offset(0);
+ const int32_t length = array.value_offset(array.length()) - offset;
+ values = values->Slice(offset, length);
+ }
+ RETURN_NOT_OK(VisitArray(*values));
+ ++max_recursion_depth_;
+ return Status::OK();
+ }
+
+ Status Visit(const StructArray& array) override {
+ --max_recursion_depth_;
+ for (std::shared_ptr<Array> field : array.fields()) {
+ if (array.offset() != 0) {
+ // If offset is non-zero, slice the child array
+ field = field->Slice(array.offset(), array.length());
+ }
+ RETURN_NOT_OK(VisitArray(*field));
+ }
+ ++max_recursion_depth_;
+ return Status::OK();
+ }
+
+ Status Visit(const UnionArray& array) override {
+ auto type_ids = array.type_ids();
+ if (array.offset() != 0) {
+ type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t),
+ array.length() * sizeof(UnionArray::type_id_t));
+ }
+
+ buffers_.push_back(type_ids);
+
+ --max_recursion_depth_;
+ if (array.mode() == UnionMode::DENSE) {
+ const auto& type = static_cast<const UnionType&>(*array.type());
+ auto value_offsets = array.value_offsets();
+
+ // The Union type codes are not necessary 0-indexed
+ uint8_t max_code = 0;
+ for (uint8_t code : type.type_codes) {
+ if (code > max_code) { max_code = code; }
+ }
+
+ // Allocate an array of child offsets. Set all to -1 to indicate that we
+ // haven't observed a first occurrence of a particular child yet
+ std::vector<int32_t> child_offsets(max_code + 1);
+ std::vector<int32_t> child_lengths(max_code + 1, 0);
+
+ if (array.offset() != 0) {
+ // This is an unpleasant case. Because the offsets are different for
+ // each child array, when we have a sliced array, we need to "rebase"
+ // the value_offsets for each array
+
+ const int32_t* unshifted_offsets = array.raw_value_offsets();
+ const uint8_t* type_ids = array.raw_type_ids();
+
+ // Allocate the shifted offsets
+ std::shared_ptr<MutableBuffer> shifted_offsets_buffer;
+ RETURN_NOT_OK(AllocateBuffer(
+ pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer));
+ int32_t* shifted_offsets =
+ reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
+
+ for (int64_t i = 0; i < array.length(); ++i) {
+ const uint8_t code = type_ids[i];
+ int32_t shift = child_offsets[code];
+ if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; }
+ shifted_offsets[i] = unshifted_offsets[i] - shift;
+
+ // Update the child length to account for observed value
+ ++child_lengths[code];
+ }
+
+ value_offsets = shifted_offsets_buffer;
+ }
+ buffers_.push_back(value_offsets);
+
+ // Visit children and slice accordingly
+ for (int i = 0; i < type.num_children(); ++i) {
+ std::shared_ptr<Array> child = array.child(i);
+ if (array.offset() != 0) {
+ const uint8_t code = type.type_codes[i];
+ child = child->Slice(child_offsets[code], child_lengths[code]);
+ }
+ RETURN_NOT_OK(VisitArray(*child));
+ }
+ } else {
+ for (std::shared_ptr<Array> child : array.children()) {
+ // Sparse union, slicing is simpler
+ if (array.offset() != 0) {
+ // If offset is non-zero, slice the child array
+ child = child->Slice(array.offset(), array.length());
+ }
+ RETURN_NOT_OK(VisitArray(*child));
+ }
+ }
+ ++max_recursion_depth_;
+ return Status::OK();
+ }
+
+ Status Visit(const DictionaryArray& array) override {
+ // Dictionary written out separately. Slice offset contained in the indices
+ return array.indices()->Accept(this);
+ }
+
+ // In some cases, intermediate buffers may need to be allocated (with sliced arrays)
+ MemoryPool* pool_;
+
+ std::vector<FieldMetadata> field_nodes_;
+ std::vector<BufferMetadata> buffer_meta_;
+ std::vector<std::shared_ptr<Buffer>> buffers_;
+
+ int64_t max_recursion_depth_;
+ int64_t buffer_start_offset_;
+};
+
+class DictionaryWriter : public RecordBatchWriter {
+ public:
+ using RecordBatchWriter::RecordBatchWriter;
+
+ Status WriteMetadataMessage(
+ int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
+ return WriteDictionaryMessage(dictionary_id_, static_cast<int32_t>(num_rows),
+ body_length, field_nodes_, buffer_meta_, out);
+ }
+
+ Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
+ io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+ dictionary_id_ = dictionary_id;
+
+ // Make a dummy record batch. A bit tedious as we have to make a schema
+ std::vector<std::shared_ptr<Field>> fields = {
+ arrow::field("dictionary", dictionary->type())};
+ auto schema = std::make_shared<Schema>(fields);
+ RecordBatch batch(schema, dictionary->length(), {dictionary});
+
+ return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
+ }
+
+ private:
+ // TODO(wesm): Setting this in Write is a bit unclean, but it works
+ int64_t dictionary_id_;
+};
+
+Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+ io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+ MemoryPool* pool, int max_recursion_depth) {
+ RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
+ return writer.Write(batch, dst, metadata_length, body_length);
+}
+
+Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
+ int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, MemoryPool* pool) {
+ DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth);
+ return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
+}
+
+Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
+ RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth);
+ RETURN_NOT_OK(writer.GetTotalSize(batch, size));
+ return Status::OK();
+}
+
+// ----------------------------------------------------------------------
// Stream writer implementation
class StreamWriter::StreamWriterImpl {
@@ -199,38 +681,6 @@ Status StreamWriter::Close() {
// ----------------------------------------------------------------------
// File writer implementation
-static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
-FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
- std::vector<flatbuf::Block> fb_blocks;
-
- for (const FileBlock& block : blocks) {
- fb_blocks.emplace_back(block.offset, block.metadata_length, block.body_length);
- }
-
- return fbb.CreateVectorOfStructs(fb_blocks);
-}
-
-Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
- io::OutputStream* out) {
- FBB fbb;
-
- flatbuffers::Offset<flatbuf::Schema> fb_schema;
- RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
-
- auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
- auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
-
- auto footer = flatbuf::CreateFooter(
- fbb, kMetadataVersion, fb_schema, fb_dictionaries, fb_record_batches);
-
- fbb.Finish(footer);
-
- int32_t size = fbb.GetSize();
-
- return out->Write(fbb.GetBufferPointer(), size);
-}
-
class FileWriter::FileWriterImpl : public StreamWriter::StreamWriterImpl {
public:
using BASE = StreamWriter::StreamWriterImpl;
@@ -283,5 +733,31 @@ Status FileWriter::Close() {
return impl_->Close();
}
+// ----------------------------------------------------------------------
+// Write record batches with 64-bit size metadata
+
+class LargeRecordBatchWriter : public RecordBatchWriter {
+ public:
+ using RecordBatchWriter::RecordBatchWriter;
+
+ Status CheckArrayMetadata(const Array& arr) override {
+ // No < INT32_MAX length check
+ return Status::OK();
+ }
+
+ Status WriteMetadataMessage(
+ int64_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
+ return WriteLargeRecordBatchMessage(
+ num_rows, body_length, field_nodes_, buffer_meta_, out);
+ }
+};
+
+Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+ io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+ MemoryPool* pool, int max_recursion_depth) {
+ LargeRecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
+ return writer.Write(batch, dst, metadata_length, body_length);
+}
+
} // namespace ipc
} // namespace arrow