You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/02/24 14:16:42 UTC
[2/2] arrow git commit: ARROW-459: [C++] Dictionary IPC support in
file and stream formats
ARROW-459: [C++] Dictionary IPC support in file and stream formats
Also fixes ARROW-565
Author: Wes McKinney <we...@twosigma.com>
Closes #347 from wesm/ARROW-459 and squashes the following commits:
6a987b7 [Wes McKinney] Fix clang warning with forward declaration
8e0e6fb [Wes McKinney] Fix bug causing valgrind failure
dee044e [Wes McKinney] Review comments
7ac756e [Wes McKinney] Fix Python build
e5cec27 [Wes McKinney] Add some less trivial dictionary-encoded arrays to test case
acfa994 [Wes McKinney] cpplint
ef9dea8 [Wes McKinney] More dictionary support in FileReader. Simple test passes
cb04a41 [Wes McKinney] Refactoring. Remove FileFooter class in favor of private impl in FileReader
1cee0ff [Wes McKinney] More progress toward file/stream roundtrips with dictionaries
ae389fa [Wes McKinney] WIP progress toward stream/file dictionary roundtrip
6858e12 [Wes McKinney] Add union and dictionary to file/stream tests
d537004 [Wes McKinney] Add support for deconstructing and reconstructing DictionaryArray with known schema
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d28f1c1e
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d28f1c1e
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d28f1c1e
Branch: refs/heads/master
Commit: d28f1c1e0f21bc578b84ab4bed4cf259c333fbc9
Parents: 5e279f0
Author: Wes McKinney <we...@twosigma.com>
Authored: Fri Feb 24 09:16:32 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Feb 24 09:16:32 2017 -0500
----------------------------------------------------------------------
cpp/CMakeLists.txt | 4 +-
cpp/src/arrow/array.h | 2 +
cpp/src/arrow/io/CMakeLists.txt | 9 +-
cpp/src/arrow/ipc/CMakeLists.txt | 15 +-
cpp/src/arrow/ipc/adapter.cc | 189 ++++++++++------
cpp/src/arrow/ipc/adapter.h | 30 +--
cpp/src/arrow/ipc/file.cc | 306 +++++++++++++-------------
cpp/src/arrow/ipc/file.h | 51 +----
cpp/src/arrow/ipc/ipc-adapter-test.cc | 46 ++--
cpp/src/arrow/ipc/ipc-file-test.cc | 78 +++----
cpp/src/arrow/ipc/ipc-metadata-test.cc | 17 +-
cpp/src/arrow/ipc/metadata-internal.cc | 239 ++++++++++++++++----
cpp/src/arrow/ipc/metadata-internal.h | 45 ++--
cpp/src/arrow/ipc/metadata.cc | 185 ++++++++++++++--
cpp/src/arrow/ipc/metadata.h | 97 ++++++--
cpp/src/arrow/ipc/stream.cc | 207 ++++++++++++-----
cpp/src/arrow/ipc/stream.h | 34 ++-
cpp/src/arrow/ipc/test-common.h | 80 +++++++
cpp/src/arrow/type.cc | 6 +-
cpp/src/arrow/type.h | 14 +-
python/pyarrow/includes/libarrow_ipc.pxd | 1 -
python/pyarrow/io.pyx | 5 -
22 files changed, 1093 insertions(+), 567 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 0888a8b..b77f8c7 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -102,7 +102,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
ON)
endif()
-if(NOT ARROW_BUILD_TESTS)
+if(ARROW_BUILD_TESTS)
+ set(ARROW_BUILD_STATIC ON)
+else()
set(NO_TESTS 1)
endif()
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 32d156b..9bb06af 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -40,6 +40,8 @@ class Status;
class ArrayVisitor {
public:
+ virtual ~ArrayVisitor() = default;
+
virtual Status Visit(const NullArray& array) = 0;
virtual Status Visit(const BooleanArray& array) = 0;
virtual Status Visit(const Int8Array& array) = 0;
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/io/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt
index b8882e4..ceb7b73 100644
--- a/cpp/src/arrow/io/CMakeLists.txt
+++ b/cpp/src/arrow/io/CMakeLists.txt
@@ -70,13 +70,8 @@ set(ARROW_IO_STATIC_PRIVATE_LINK_LIBS
boost_system_static
boost_filesystem_static)
-if (ARROW_BUILD_STATIC)
- set(ARROW_IO_TEST_LINK_LIBS
- arrow_io_static)
-else()
- set(ARROW_IO_TEST_LINK_LIBS
- arrow_io_shared)
-endif()
+set(ARROW_IO_TEST_LINK_LIBS
+ arrow_io_static)
set(ARROW_IO_SRCS
file.cc
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index c047f53..e7a3fdb 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -24,20 +24,9 @@ set(ARROW_IPC_SHARED_LINK_LIBS
arrow_shared
)
-set(ARROW_IPC_STATIC_LINK_LIBS
- arrow_static
+set(ARROW_IPC_TEST_LINK_LIBS
arrow_io_static
-)
-
-if (ARROW_BUILD_STATIC)
- set(ARROW_IPC_TEST_LINK_LIBS
- arrow_io_static
- arrow_ipc_static)
-else()
- set(ARROW_IPC_TEST_LINK_LIBS
- arrow_io_shared
- arrow_ipc_shared)
-endif()
+ arrow_ipc_static)
set(ARROW_IPC_SRCS
adapter.cc
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index a24c007..08ac983 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -51,12 +51,15 @@ namespace ipc {
class RecordBatchWriter : public ArrayVisitor {
public:
- RecordBatchWriter(MemoryPool* pool, const RecordBatch& batch,
- int64_t buffer_start_offset, int max_recursion_depth)
+ RecordBatchWriter(
+ MemoryPool* pool, int64_t buffer_start_offset, int max_recursion_depth)
: pool_(pool),
- batch_(batch),
max_recursion_depth_(max_recursion_depth),
- buffer_start_offset_(buffer_start_offset) {}
+ buffer_start_offset_(buffer_start_offset) {
+ DCHECK_GT(max_recursion_depth, 0);
+ }
+
+ virtual ~RecordBatchWriter() = default;
Status VisitArray(const Array& arr) {
if (max_recursion_depth_ <= 0) {
@@ -81,7 +84,7 @@ class RecordBatchWriter : public ArrayVisitor {
return arr.Accept(this);
}
- Status Assemble(int64_t* body_length) {
+ Status Assemble(const RecordBatch& batch, int64_t* body_length) {
if (field_nodes_.size() > 0) {
field_nodes_.clear();
buffer_meta_.clear();
@@ -89,8 +92,8 @@ class RecordBatchWriter : public ArrayVisitor {
}
// Perform depth-first traversal of the row-batch
- for (int i = 0; i < batch_.num_columns(); ++i) {
- RETURN_NOT_OK(VisitArray(*batch_.column(i)));
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ RETURN_NOT_OK(VisitArray(*batch.column(i)));
}
// The position for the start of a buffer relative to the passed frame of
@@ -127,16 +130,22 @@ class RecordBatchWriter : public ArrayVisitor {
return Status::OK();
}
- Status WriteMetadata(
- int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) {
+ // Override this for writing dictionary metadata
+ virtual Status WriteMetadataMessage(
+ int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) {
+ return WriteRecordBatchMessage(
+ num_rows, body_length, field_nodes_, buffer_meta_, out);
+ }
+
+ Status WriteMetadata(int32_t num_rows, int64_t body_length, io::OutputStream* dst,
+ int32_t* metadata_length) {
// Now that we have computed the locations of all of the buffers in shared
// memory, the data header can be converted to a flatbuffer and written out
//
// Note: The memory written here is prefixed by the size of the flatbuffer
// itself as an int32_t.
std::shared_ptr<Buffer> metadata_fb;
- RETURN_NOT_OK(WriteRecordBatchMetadata(
- batch_.num_rows(), body_length, field_nodes_, buffer_meta_, &metadata_fb));
+ RETURN_NOT_OK(WriteMetadataMessage(num_rows, body_length, &metadata_fb));
// Need to write 4 bytes (metadata size), the metadata, plus padding to
// end on an 8-byte offset
@@ -166,15 +175,16 @@ class RecordBatchWriter : public ArrayVisitor {
return Status::OK();
}
- Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
- RETURN_NOT_OK(Assemble(body_length));
+ Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length) {
+ RETURN_NOT_OK(Assemble(batch, body_length));
#ifndef NDEBUG
int64_t start_position, current_position;
RETURN_NOT_OK(dst->Tell(&start_position));
#endif
- RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length));
+ RETURN_NOT_OK(WriteMetadata(batch.num_rows(), *body_length, dst, metadata_length));
#ifndef NDEBUG
RETURN_NOT_OK(dst->Tell(¤t_position));
@@ -206,17 +216,17 @@ class RecordBatchWriter : public ArrayVisitor {
return Status::OK();
}
- Status GetTotalSize(int64_t* size) {
+ Status GetTotalSize(const RecordBatch& batch, int64_t* size) {
// emulates the behavior of Write without actually writing
int32_t metadata_length = 0;
int64_t body_length = 0;
MockOutputStream dst;
- RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length));
+ RETURN_NOT_OK(Write(batch, &dst, &metadata_length, &body_length));
*size = dst.GetExtentBytesWritten();
return Status::OK();
}
- private:
+ protected:
Status Visit(const NullArray& array) override { return Status::NotImplemented("null"); }
template <typename ArrayType>
@@ -468,15 +478,12 @@ class RecordBatchWriter : public ArrayVisitor {
}
Status Visit(const DictionaryArray& array) override {
- // Dictionary written out separately
- const auto& indices = static_cast<const PrimitiveArray&>(*array.indices().get());
- buffers_.push_back(indices.data());
- return Status::OK();
+ // Dictionary written out separately. Slice offset contained in the indices
+ return array.indices()->Accept(this);
}
// In some cases, intermediate buffers may need to be allocated (with sliced arrays)
MemoryPool* pool_;
- const RecordBatch& batch_;
std::vector<flatbuf::FieldNode> field_nodes_;
std::vector<flatbuf::Buffer> buffer_meta_;
@@ -486,17 +493,51 @@ class RecordBatchWriter : public ArrayVisitor {
int64_t buffer_start_offset_;
};
+class DictionaryWriter : public RecordBatchWriter {
+ public:
+ using RecordBatchWriter::RecordBatchWriter;
+
+ Status WriteMetadataMessage(
+ int32_t num_rows, int64_t body_length, std::shared_ptr<Buffer>* out) override {
+ return WriteDictionaryMessage(
+ dictionary_id_, num_rows, body_length, field_nodes_, buffer_meta_, out);
+ }
+
+ Status Write(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
+ io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+ dictionary_id_ = dictionary_id;
+
+ // Make a dummy record batch. A bit tedious as we have to make a schema
+ std::vector<std::shared_ptr<Field>> fields = {
+ arrow::field("dictionary", dictionary->type())};
+ auto schema = std::make_shared<Schema>(fields);
+ RecordBatch batch(schema, dictionary->length(), {dictionary});
+
+ return RecordBatchWriter::Write(batch, dst, metadata_length, body_length);
+ }
+
+ private:
+ // TODO(wesm): Setting this in Write is a bit unclean, but it works
+ int64_t dictionary_id_;
+};
+
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
MemoryPool* pool, int max_recursion_depth) {
- DCHECK_GT(max_recursion_depth, 0);
- RecordBatchWriter serializer(pool, batch, buffer_start_offset, max_recursion_depth);
- return serializer.Write(dst, metadata_length, body_length);
+ RecordBatchWriter writer(pool, buffer_start_offset, max_recursion_depth);
+ return writer.Write(batch, dst, metadata_length, body_length);
+}
+
+Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
+ int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
+ int64_t* body_length, MemoryPool* pool) {
+ DictionaryWriter writer(pool, buffer_start_offset, kMaxIpcRecursionDepth);
+ return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
}
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
- RecordBatchWriter serializer(default_memory_pool(), batch, 0, kMaxIpcRecursionDepth);
- RETURN_NOT_OK(serializer.GetTotalSize(size));
+ RecordBatchWriter writer(default_memory_pool(), 0, kMaxIpcRecursionDepth);
+ RETURN_NOT_OK(writer.GetTotalSize(batch, size));
return Status::OK();
}
@@ -580,10 +621,9 @@ class ArrayLoader : public TypeVisitor {
Status LoadPrimitive(const DataType& type) {
FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap;
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+ std::shared_ptr<Buffer> null_bitmap, data;
- std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
if (field_meta.length > 0) {
RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
} else {
@@ -597,11 +637,9 @@ class ArrayLoader : public TypeVisitor {
template <typename CONTAINER>
Status LoadBinary() {
FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap;
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+ std::shared_ptr<Buffer> null_bitmap, offsets, values;
- std::shared_ptr<Buffer> offsets;
- std::shared_ptr<Buffer> values;
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
if (field_meta.length > 0) {
RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
@@ -661,11 +699,9 @@ class ArrayLoader : public TypeVisitor {
Status Visit(const ListType& type) override {
FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap;
+ std::shared_ptr<Buffer> null_bitmap, offsets;
RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-
- std::shared_ptr<Buffer> offsets;
if (field_meta.length > 0) {
RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets));
} else {
@@ -715,12 +751,9 @@ class ArrayLoader : public TypeVisitor {
Status Visit(const UnionType& type) override {
FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap;
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-
- std::shared_ptr<Buffer> type_ids = nullptr;
- std::shared_ptr<Buffer> offsets = nullptr;
+ std::shared_ptr<Buffer> null_bitmap, type_ids, offsets;
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
if (field_meta.length > 0) {
RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids));
if (type.mode == UnionMode::DENSE) {
@@ -738,13 +771,23 @@ class ArrayLoader : public TypeVisitor {
}
Status Visit(const DictionaryType& type) override {
- return Status::NotImplemented("dictionary");
+ FieldMetadata field_meta;
+ std::shared_ptr<Buffer> null_bitmap, indices_data;
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &indices_data));
+
+ std::shared_ptr<Array> indices;
+ RETURN_NOT_OK(MakePrimitiveArray(type.index_type(), field_meta.length, indices_data,
+ null_bitmap, field_meta.null_count, 0, &indices));
+
+ result_ = std::make_shared<DictionaryArray>(field_.type, indices);
+ return Status::OK();
};
};
class RecordBatchReader {
public:
- RecordBatchReader(const std::shared_ptr<RecordBatchMetadata>& metadata,
+ RecordBatchReader(const RecordBatchMetadata& metadata,
const std::shared_ptr<Schema>& schema, int max_recursion_depth,
io::ReadableFileInterface* file)
: metadata_(metadata),
@@ -758,7 +801,7 @@ class RecordBatchReader {
// The field_index and buffer_index are incremented in the ArrayLoader
// based on how much of the batch is "consumed" (through nested data
// reconstruction, for example)
- context_.metadata = metadata_.get();
+ context_.metadata = &metadata_;
context_.field_index = 0;
context_.buffer_index = 0;
context_.max_recursion_depth = max_recursion_depth_;
@@ -768,50 +811,58 @@ class RecordBatchReader {
RETURN_NOT_OK(loader.Load(&arrays[i]));
}
- *out = std::make_shared<RecordBatch>(schema_, metadata_->length(), arrays);
+ *out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays);
return Status::OK();
}
private:
RecordBatchContext context_;
- std::shared_ptr<RecordBatchMetadata> metadata_;
+ const RecordBatchMetadata& metadata_;
std::shared_ptr<Schema> schema_;
int max_recursion_depth_;
io::ReadableFileInterface* file_;
};
-Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
- io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata) {
- std::shared_ptr<Buffer> buffer;
- RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
-
- int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
-
- if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
- std::stringstream ss;
- ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset
- << ", metadata length: " << metadata_length;
- return Status::Invalid(ss.str());
- }
-
- std::shared_ptr<Message> message;
- RETURN_NOT_OK(Message::Open(buffer, 4, &message));
- *metadata = std::make_shared<RecordBatchMetadata>(message);
- return Status::OK();
-}
-
-Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
std::shared_ptr<RecordBatch>* out) {
return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out);
}
-Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
const std::shared_ptr<Schema>& schema, int max_recursion_depth,
io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) {
RecordBatchReader reader(metadata, schema, max_recursion_depth, file);
return reader.Read(out);
}
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+ const DictionaryTypeMap& dictionary_types, io::ReadableFileInterface* file,
+ std::shared_ptr<Array>* out) {
+ int64_t id = metadata.id();
+ auto it = dictionary_types.find(id);
+ if (it == dictionary_types.end()) {
+ std::stringstream ss;
+ ss << "Do not have type metadata for dictionary with id: " << id;
+ return Status::KeyError(ss.str());
+ }
+
+ std::vector<std::shared_ptr<Field>> fields = {it->second};
+
+ // We need a schema for the record batch
+ auto dummy_schema = std::make_shared<Schema>(fields);
+
+ // The dictionary is embedded in a record batch with a single column
+ std::shared_ptr<RecordBatch> batch;
+ RETURN_NOT_OK(ReadRecordBatch(metadata.record_batch(), dummy_schema, file, &batch));
+
+ if (batch->num_columns() != 1) {
+ return Status::Invalid("Dictionary record batch must only contain one field");
+ }
+
+ *out = batch->column(0);
+ return Status::OK();
+}
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 83542d0..b7d8fa9 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -25,6 +25,7 @@
#include <memory>
#include <vector>
+#include "arrow/ipc/metadata.h"
#include "arrow/util/visibility.h"
namespace arrow {
@@ -44,8 +45,6 @@ class OutputStream;
namespace ipc {
-class RecordBatchMetadata;
-
// ----------------------------------------------------------------------
// Write path
// We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number
@@ -72,34 +71,35 @@ constexpr int kMaxIpcRecursionDepth = 64;
//
// @param(out) body_length: the size of the contiguous buffer block plus
// padding bytes
-Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
+Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
+ io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
+ MemoryPool* pool, int max_recursion_depth = kMaxIpcRecursionDepth);
+
+// Write Array as a DictionaryBatch message
+Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, MemoryPool* pool,
- int max_recursion_depth = kMaxIpcRecursionDepth);
+ int64_t* body_length, MemoryPool* pool);
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the record batch. This involves generating the complete serialized
// Flatbuffers metadata.
-Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
+Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
// ----------------------------------------------------------------------
// "Read" path; does not copy data if the input supports zero copy reads
-// Read the record batch flatbuffer metadata starting at the indicated file offset
-//
-// The flatbuffer is expected to be length-prefixed, so the metadata_length
-// includes at least the length prefix and the flatbuffer
-Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
- io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata);
-
-Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
std::shared_ptr<RecordBatch>* out);
-Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+Status ReadRecordBatch(const RecordBatchMetadata& metadata,
const std::shared_ptr<Schema>& schema, int max_recursion_depth,
io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out);
+Status ReadDictionary(const DictionaryBatchMetadata& metadata,
+ const DictionaryTypeMap& dictionary_types, io::ReadableFileInterface* file,
+ std::shared_ptr<Array>* out);
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index 3b18326..c1d483f 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -36,8 +36,6 @@ namespace arrow {
namespace ipc {
static constexpr const char* kArrowMagicBytes = "ARROW1";
-// ----------------------------------------------------------------------
-// File footer
static flatbuffers::Offset<flatbuffers::Vector<const flatbuf::Block*>>
FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
@@ -51,11 +49,12 @@ FileBlocksToFlatbuffer(FBB& fbb, const std::vector<FileBlock>& blocks) {
}
Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches, io::OutputStream* out) {
+ const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
+ io::OutputStream* out) {
FBB fbb;
flatbuffers::Offset<flatbuf::Schema> fb_schema;
- RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, &fb_schema));
+ RETURN_NOT_OK(SchemaToFlatbuffer(fbb, schema, dictionary_memo, &fb_schema));
auto fb_dictionaries = FileBlocksToFlatbuffer(fbb, dictionaries);
auto fb_record_batches = FileBlocksToFlatbuffer(fbb, record_batches);
@@ -74,87 +73,6 @@ static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock(block->offset(), block->metaDataLength(), block->bodyLength());
}
-class FileFooter::FileFooterImpl {
- public:
- FileFooterImpl(const std::shared_ptr<Buffer>& buffer, const flatbuf::Footer* footer)
- : buffer_(buffer), footer_(footer) {}
-
- int num_dictionaries() const { return footer_->dictionaries()->size(); }
-
- int num_record_batches() const { return footer_->recordBatches()->size(); }
-
- MetadataVersion::type version() const {
- switch (footer_->version()) {
- case flatbuf::MetadataVersion_V1:
- return MetadataVersion::V1;
- case flatbuf::MetadataVersion_V2:
- return MetadataVersion::V2;
- // Add cases as other versions become available
- default:
- return MetadataVersion::V2;
- }
- }
-
- FileBlock record_batch(int i) const {
- return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
- }
-
- FileBlock dictionary(int i) const {
- return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
- }
-
- Status GetSchema(std::shared_ptr<Schema>* out) const {
- auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
- return schema_msg->GetSchema(out);
- }
-
- private:
- // Retain reference to memory
- std::shared_ptr<Buffer> buffer_;
-
- const flatbuf::Footer* footer_;
-};
-
-FileFooter::FileFooter() {}
-
-FileFooter::~FileFooter() {}
-
-Status FileFooter::Open(
- const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out) {
- const flatbuf::Footer* footer = flatbuf::GetFooter(buffer->data());
-
- *out = std::unique_ptr<FileFooter>(new FileFooter());
-
- // TODO(wesm): Verify the footer
- (*out)->impl_.reset(new FileFooterImpl(buffer, footer));
-
- return Status::OK();
-}
-
-int FileFooter::num_dictionaries() const {
- return impl_->num_dictionaries();
-}
-
-int FileFooter::num_record_batches() const {
- return impl_->num_record_batches();
-}
-
-MetadataVersion::type FileFooter::version() const {
- return impl_->version();
-}
-
-FileBlock FileFooter::record_batch(int i) const {
- return impl_->record_batch(i);
-}
-
-FileBlock FileFooter::dictionary(int i) const {
- return impl_->dictionary(i);
-}
-
-Status FileFooter::GetSchema(std::shared_ptr<Schema>* out) const {
- return impl_->GetSchema(out);
-}
-
// ----------------------------------------------------------------------
// File writer implementation
@@ -171,22 +89,17 @@ Status FileWriter::Open(io::OutputStream* sink, const std::shared_ptr<Schema>& s
Status FileWriter::Start() {
RETURN_NOT_OK(WriteAligned(
reinterpret_cast<const uint8_t*>(kArrowMagicBytes), strlen(kArrowMagicBytes)));
- started_ = true;
- return Status::OK();
-}
-Status FileWriter::WriteRecordBatch(const RecordBatch& batch) {
- // Push an empty FileBlock
- // Append metadata, to be written in the footer later
- record_batches_.emplace_back(0, 0, 0);
- return StreamWriter::WriteRecordBatch(
- batch, &record_batches_[record_batches_.size() - 1]);
+ // We write the schema at the start of the file (and the end). This also
+ // writes all the dictionaries at the beginning of the file
+ return StreamWriter::Start();
}
Status FileWriter::Close() {
// Write metadata
int64_t initial_position = position_;
- RETURN_NOT_OK(WriteFileFooter(*schema_, dictionaries_, record_batches_, sink_));
+ RETURN_NOT_OK(WriteFileFooter(
+ *schema_, dictionaries_, record_batches_, dictionary_memo_.get(), sink_));
RETURN_NOT_OK(UpdatePosition());
// Write footer length
@@ -204,89 +117,180 @@ Status FileWriter::Close() {
// ----------------------------------------------------------------------
// Reader implementation
-FileReader::FileReader(
- const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset)
- : file_(file), footer_offset_(footer_offset) {}
+class FileReader::FileReaderImpl {
+ public:
+ FileReaderImpl() { dictionary_memo_ = std::make_shared<DictionaryMemo>(); }
-FileReader::~FileReader() {}
+ Status ReadFooter() {
+ int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
-Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
- std::shared_ptr<FileReader>* reader) {
- int64_t footer_offset;
- RETURN_NOT_OK(file->GetSize(&footer_offset));
- return Open(file, footer_offset, reader);
-}
+ if (footer_offset_ <= magic_size * 2 + 4) {
+ std::stringstream ss;
+ ss << "File is too small: " << footer_offset_;
+ return Status::Invalid(ss.str());
+ }
-Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
- int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
- *reader = std::shared_ptr<FileReader>(new FileReader(file, footer_offset));
- return (*reader)->ReadFooter();
-}
+ std::shared_ptr<Buffer> buffer;
+ int file_end_size = magic_size + sizeof(int32_t);
+ RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer));
+
+ if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
+ return Status::Invalid("Not an Arrow file");
+ }
+
+ int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
+
+ if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) {
+ return Status::Invalid("File is smaller than indicated metadata size");
+ }
-Status FileReader::ReadFooter() {
- int magic_size = static_cast<int>(strlen(kArrowMagicBytes));
+ // Now read the footer
+ RETURN_NOT_OK(file_->ReadAt(
+ footer_offset_ - footer_length - file_end_size, footer_length, &footer_buffer_));
- if (footer_offset_ <= magic_size * 2 + 4) {
- std::stringstream ss;
- ss << "File is too small: " << footer_offset_;
- return Status::Invalid(ss.str());
+ // TODO(wesm): Verify the footer
+ footer_ = flatbuf::GetFooter(footer_buffer_->data());
+ schema_metadata_.reset(new SchemaMetadata(nullptr, footer_->schema()));
+
+ return Status::OK();
+ }
+
+ int num_dictionaries() const { return footer_->dictionaries()->size(); }
+
+ int num_record_batches() const { return footer_->recordBatches()->size(); }
+
+ MetadataVersion::type version() const {
+ switch (footer_->version()) {
+ case flatbuf::MetadataVersion_V1:
+ return MetadataVersion::V1;
+ case flatbuf::MetadataVersion_V2:
+ return MetadataVersion::V2;
+ // Add cases as other versions become available
+ default:
+ return MetadataVersion::V2;
+ }
+ }
+
+ FileBlock record_batch(int i) const {
+ return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i));
+ }
+
+ FileBlock dictionary(int i) const {
+ return FileBlockFromFlatbuffer(footer_->dictionaries()->Get(i));
}
- std::shared_ptr<Buffer> buffer;
- int file_end_size = magic_size + sizeof(int32_t);
- RETURN_NOT_OK(file_->ReadAt(footer_offset_ - file_end_size, file_end_size, &buffer));
+ const SchemaMetadata& schema_metadata() const { return *schema_metadata_; }
+
+ Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
+ DCHECK_GE(i, 0);
+ DCHECK_LT(i, num_record_batches());
+ FileBlock block = record_batch(i);
+
+ std::shared_ptr<Message> message;
+ RETURN_NOT_OK(
+ ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
+ auto metadata = std::make_shared<RecordBatchMetadata>(message);
- if (memcmp(buffer->data() + sizeof(int32_t), kArrowMagicBytes, magic_size)) {
- return Status::Invalid("Not an Arrow file");
+ // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+ // ARROW-384).
+ std::shared_ptr<Buffer> buffer_block;
+ RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+ io::BufferReader reader(buffer_block);
+
+ return ReadRecordBatch(*metadata, schema_, &reader, batch);
}
- int32_t footer_length = *reinterpret_cast<const int32_t*>(buffer->data());
+ Status ReadSchema() {
+ RETURN_NOT_OK(schema_metadata_->GetDictionaryTypes(&dictionary_fields_));
+
+ // Read all the dictionaries
+ for (int i = 0; i < num_dictionaries(); ++i) {
+ FileBlock block = dictionary(i);
+ std::shared_ptr<Message> message;
+ RETURN_NOT_OK(
+ ReadMessage(block.offset, block.metadata_length, file_.get(), &message));
+
+ // TODO(wesm): ARROW-577: This code is duplicated, can be fixed with a more
+ // invasive refactor
+ DictionaryBatchMetadata metadata(message);
+
+ // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+ // ARROW-384).
+ std::shared_ptr<Buffer> buffer_block;
+ RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+ io::BufferReader reader(buffer_block);
+
+ std::shared_ptr<Array> dictionary;
+ RETURN_NOT_OK(ReadDictionary(metadata, dictionary_fields_, &reader, &dictionary));
+ RETURN_NOT_OK(dictionary_memo_->AddDictionary(metadata.id(), dictionary));
+ }
- if (footer_length <= 0 || footer_length + magic_size * 2 + 4 > footer_offset_) {
- return Status::Invalid("File is smaller than indicated metadata size");
+ // Get the schema
+ return schema_metadata_->GetSchema(*dictionary_memo_, &schema_);
}
- // Now read the footer
- RETURN_NOT_OK(file_->ReadAt(
- footer_offset_ - footer_length - file_end_size, footer_length, &buffer));
- RETURN_NOT_OK(FileFooter::Open(buffer, &footer_));
+ Status Open(
+ const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset) {
+ file_ = file;
+ footer_offset_ = footer_offset;
+ RETURN_NOT_OK(ReadFooter());
+ return ReadSchema();
+ }
+
+ std::shared_ptr<Schema> schema() const { return schema_; }
+
+ private:
+ std::shared_ptr<io::ReadableFileInterface> file_;
- // Get the schema
- return footer_->GetSchema(&schema_);
+ // The location where the Arrow file layout ends. May be the end of the file
+ // or some other location if embedded in a larger file.
+ int64_t footer_offset_;
+
+ // Footer metadata
+ std::shared_ptr<Buffer> footer_buffer_;
+ const flatbuf::Footer* footer_;
+ std::unique_ptr<SchemaMetadata> schema_metadata_;
+
+ DictionaryTypeMap dictionary_fields_;
+ std::shared_ptr<DictionaryMemo> dictionary_memo_;
+
+ // Reconstructed schema, including any read dictionaries
+ std::shared_ptr<Schema> schema_;
+};
+
+FileReader::FileReader() {
+ impl_.reset(new FileReaderImpl());
}
-std::shared_ptr<Schema> FileReader::schema() const {
- return schema_;
+FileReader::~FileReader() {}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+ std::shared_ptr<FileReader>* reader) {
+ int64_t footer_offset;
+ RETURN_NOT_OK(file->GetSize(&footer_offset));
+ return Open(file, footer_offset, reader);
+}
+
+Status FileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+ int64_t footer_offset, std::shared_ptr<FileReader>* reader) {
+ *reader = std::shared_ptr<FileReader>(new FileReader());
+ return (*reader)->impl_->Open(file, footer_offset);
}
-int FileReader::num_dictionaries() const {
- return footer_->num_dictionaries();
+std::shared_ptr<Schema> FileReader::schema() const {
+ return impl_->schema();
}
int FileReader::num_record_batches() const {
- return footer_->num_record_batches();
+ return impl_->num_record_batches();
}
MetadataVersion::type FileReader::version() const {
- return footer_->version();
+ return impl_->version();
}
Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
- DCHECK_GE(i, 0);
- DCHECK_LT(i, num_record_batches());
- FileBlock block = footer_->record_batch(i);
-
- std::shared_ptr<RecordBatchMetadata> metadata;
- RETURN_NOT_OK(ReadRecordBatchMetadata(
- block.offset, block.metadata_length, file_.get(), &metadata));
-
- // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
- // ARROW-384).
- std::shared_ptr<Buffer> buffer_block;
- RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
- io::BufferReader reader(buffer_block);
-
- return ReadRecordBatch(metadata, schema_, &reader, batch);
+ return impl_->GetRecordBatch(i, batch);
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.h b/cpp/src/arrow/ipc/file.h
index cf0baab..524766c 100644
--- a/cpp/src/arrow/ipc/file.h
+++ b/cpp/src/arrow/ipc/file.h
@@ -45,45 +45,21 @@ class ReadableFileInterface;
namespace ipc {
Status WriteFileFooter(const Schema& schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches, io::OutputStream* out);
-
-class ARROW_EXPORT FileFooter {
- public:
- ~FileFooter();
-
- static Status Open(
- const std::shared_ptr<Buffer>& buffer, std::unique_ptr<FileFooter>* out);
-
- int num_dictionaries() const;
- int num_record_batches() const;
- MetadataVersion::type version() const;
-
- FileBlock record_batch(int i) const;
- FileBlock dictionary(int i) const;
-
- Status GetSchema(std::shared_ptr<Schema>* out) const;
-
- private:
- FileFooter();
- class FileFooterImpl;
- std::unique_ptr<FileFooterImpl> impl_;
-};
+ const std::vector<FileBlock>& record_batches, DictionaryMemo* dictionary_memo,
+ io::OutputStream* out);
class ARROW_EXPORT FileWriter : public StreamWriter {
public:
static Status Open(io::OutputStream* sink, const std::shared_ptr<Schema>& schema,
std::shared_ptr<FileWriter>* out);
- Status WriteRecordBatch(const RecordBatch& batch) override;
+ using StreamWriter::WriteRecordBatch;
Status Close() override;
private:
FileWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
Status Start() override;
-
- std::vector<FileBlock> dictionaries_;
- std::vector<FileBlock> record_batches_;
};
class ARROW_EXPORT FileReader {
@@ -108,13 +84,9 @@ class ARROW_EXPORT FileReader {
static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
int64_t footer_offset, std::shared_ptr<FileReader>* reader);
+ /// The schema includes any dictionaries
std::shared_ptr<Schema> schema() const;
- // Shared dictionaries for dictionary-encoding cross record batches
- // TODO(wesm): Implement dictionary reading when we also have dictionary
- // encoding
- int num_dictionaries() const;
-
int num_record_batches() const;
MetadataVersion::type version() const;
@@ -127,19 +99,10 @@ class ARROW_EXPORT FileReader {
Status GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch);
private:
- FileReader(
- const std::shared_ptr<io::ReadableFileInterface>& file, int64_t footer_offset);
-
- Status ReadFooter();
-
- std::shared_ptr<io::ReadableFileInterface> file_;
-
- // The location where the Arrow file layout ends. May be the end of the file
- // or some other location if embedded in a larger file.
- int64_t footer_offset_;
+ FileReader();
- std::unique_ptr<FileFooter> footer_;
- std::shared_ptr<Schema> schema_;
+ class ARROW_NO_EXPORT FileReaderImpl;
+ std::unique_ptr<FileReaderImpl> impl_;
};
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index d11b95b..8999363 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -27,6 +27,7 @@
#include "arrow/io/memory.h"
#include "arrow/io/test-common.h"
#include "arrow/ipc/adapter.h"
+#include "arrow/ipc/metadata.h"
#include "arrow/ipc/test-common.h"
#include "arrow/ipc/util.h"
@@ -40,12 +41,8 @@
namespace arrow {
namespace ipc {
-class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
- public io::MemoryMapFixture {
+class IpcTestFixture : public io::MemoryMapFixture {
public:
- void SetUp() { pool_ = default_memory_pool(); }
- void TearDown() { io::MemoryMapFixture::TearDown(); }
-
Status RoundTripHelper(const RecordBatch& batch, int memory_map_size,
std::shared_ptr<RecordBatch>* batch_result) {
std::string path = "test-write-row-batch";
@@ -59,8 +56,9 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
RETURN_NOT_OK(WriteRecordBatch(
batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
- std::shared_ptr<RecordBatchMetadata> metadata;
- RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+ std::shared_ptr<Message> message;
+ RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+ auto metadata = std::make_shared<RecordBatchMetadata>(message);
// The buffer offsets start at 0, so we must construct a
// ReadableFileInterface according to that frame of reference
@@ -68,7 +66,7 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
io::BufferReader buffer_reader(buffer_payload);
- return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, batch_result);
+ return ReadRecordBatch(*metadata, batch.schema(), &buffer_reader, batch_result);
}
void CheckRoundtrip(const RecordBatch& batch, int64_t buffer_size) {
@@ -112,14 +110,29 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
MemoryPool* pool_;
};
-TEST_P(TestWriteRecordBatch, RoundTrip) {
+class TestWriteRecordBatch : public ::testing::Test, public IpcTestFixture {
+ public:
+ void SetUp() { pool_ = default_memory_pool(); }
+ void TearDown() { io::MemoryMapFixture::TearDown(); }
+};
+
+class TestRecordBatchParam : public ::testing::TestWithParam<MakeRecordBatch*>,
+ public IpcTestFixture {
+ public:
+ void SetUp() { pool_ = default_memory_pool(); }
+ void TearDown() { io::MemoryMapFixture::TearDown(); }
+ using IpcTestFixture::RoundTripHelper;
+ using IpcTestFixture::CheckRoundtrip;
+};
+
+TEST_P(TestRecordBatchParam, RoundTrip) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
CheckRoundtrip(*batch, 1 << 20);
}
-TEST_P(TestWriteRecordBatch, SliceRoundTrip) {
+TEST_P(TestRecordBatchParam, SliceRoundTrip) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
@@ -130,7 +143,7 @@ TEST_P(TestWriteRecordBatch, SliceRoundTrip) {
CheckRoundtrip(*sliced_batch, 1 << 20);
}
-TEST_P(TestWriteRecordBatch, ZeroLengthArrays) {
+TEST_P(TestRecordBatchParam, ZeroLengthArrays) {
std::shared_ptr<RecordBatch> batch;
ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
@@ -159,10 +172,10 @@ TEST_P(TestWriteRecordBatch, ZeroLengthArrays) {
}
INSTANTIATE_TEST_CASE_P(
- RoundTripTests, TestWriteRecordBatch,
+ RoundTripTests, TestRecordBatchParam,
::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch,
&MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch,
- &MakeDeeplyNestedList, &MakeStruct, &MakeUnion));
+ &MakeDeeplyNestedList, &MakeStruct, &MakeUnion, &MakeDictionary));
void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
ipc::MockOutputStream mock;
@@ -251,8 +264,9 @@ TEST_F(RecursionLimits, ReadLimit) {
std::shared_ptr<Schema> schema;
ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema));
- std::shared_ptr<RecordBatchMetadata> metadata;
- ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+ std::shared_ptr<Message> message;
+ ASSERT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));
+ auto metadata = std::make_shared<RecordBatchMetadata>(message);
std::shared_ptr<Buffer> payload;
ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
@@ -260,7 +274,7 @@ TEST_F(RecursionLimits, ReadLimit) {
io::BufferReader reader(payload);
std::shared_ptr<RecordBatch> batch;
- ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch));
+ ASSERT_RAISES(Invalid, ReadRecordBatch(*metadata, schema, &reader, &batch));
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index 7cd8054..4b82aab 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -180,72 +180,44 @@ TEST_P(TestStreamFormat, RoundTrip) {
#define BATCH_CASES() \
::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch, \
&MakeZeroLengthRecordBatch, &MakeDeeplyNestedList, &MakeStringTypesRecordBatch, \
- &MakeStruct);
+ &MakeStruct, &MakeDictionary);
INSTANTIATE_TEST_CASE_P(FileRoundTripTests, TestFileFormat, BATCH_CASES());
INSTANTIATE_TEST_CASE_P(StreamRoundTripTests, TestStreamFormat, BATCH_CASES());
-class TestFileFooter : public ::testing::Test {
- public:
- void SetUp() {}
-
- void CheckRoundtrip(const Schema& schema, const std::vector<FileBlock>& dictionaries,
- const std::vector<FileBlock>& record_batches) {
- auto buffer = std::make_shared<PoolBuffer>();
- io::BufferOutputStream stream(buffer);
-
- ASSERT_OK(WriteFileFooter(schema, dictionaries, record_batches, &stream));
-
- std::unique_ptr<FileFooter> footer;
- ASSERT_OK(FileFooter::Open(buffer, &footer));
-
- ASSERT_EQ(MetadataVersion::V2, footer->version());
+void CheckBatchDictionaries(const RecordBatch& batch) {
+ // Check that dictionaries that should be the same are the same
+ auto schema = batch.schema();
- // Check schema
- std::shared_ptr<Schema> schema2;
- ASSERT_OK(footer->GetSchema(&schema2));
- AssertSchemaEqual(schema, *schema2);
+ const auto& t0 = static_cast<const DictionaryType&>(*schema->field(0)->type);
+ const auto& t1 = static_cast<const DictionaryType&>(*schema->field(1)->type);
- // Check blocks
- ASSERT_EQ(dictionaries.size(), footer->num_dictionaries());
- ASSERT_EQ(record_batches.size(), footer->num_record_batches());
+ ASSERT_EQ(t0.dictionary().get(), t1.dictionary().get());
- for (int i = 0; i < footer->num_dictionaries(); ++i) {
- CheckBlocks(dictionaries[i], footer->dictionary(i));
- }
-
- for (int i = 0; i < footer->num_record_batches(); ++i) {
- CheckBlocks(record_batches[i], footer->record_batch(i));
- }
- }
+ // Same dictionary used for list values
+ const auto& t3 = static_cast<const ListType&>(*schema->field(3)->type);
+ const auto& t3_value = static_cast<const DictionaryType&>(*t3.value_type());
+ ASSERT_EQ(t0.dictionary().get(), t3_value.dictionary().get());
+}
- void CheckBlocks(const FileBlock& left, const FileBlock& right) {
- ASSERT_EQ(left.offset, right.offset);
- ASSERT_EQ(left.metadata_length, right.metadata_length);
- ASSERT_EQ(left.body_length, right.body_length);
- }
+TEST_F(TestStreamFormat, DictionaryRoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeDictionary(&batch));
- private:
- std::shared_ptr<Schema> example_schema_;
-};
+ std::vector<std::shared_ptr<RecordBatch>> out_batches;
+ ASSERT_OK(RoundTripHelper(*batch, &out_batches));
-TEST_F(TestFileFooter, Basics) {
- auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
- auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
- Schema schema({f0, f1});
+ CheckBatchDictionaries(*out_batches[0]);
+}
- std::vector<FileBlock> dictionaries;
- dictionaries.emplace_back(8, 92, 900);
- dictionaries.emplace_back(1000, 100, 1900);
- dictionaries.emplace_back(3000, 100, 2900);
+TEST_F(TestFileFormat, DictionaryRoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeDictionary(&batch));
- std::vector<FileBlock> record_batches;
- record_batches.emplace_back(6000, 100, 900);
- record_batches.emplace_back(7000, 100, 1900);
- record_batches.emplace_back(9000, 100, 2900);
- record_batches.emplace_back(12000, 100, 3900);
+ std::vector<std::shared_ptr<RecordBatch>> out_batches;
+ ASSERT_OK(RoundTripHelper({batch}, &out_batches));
- CheckRoundtrip(schema, dictionaries, record_batches);
+ CheckBatchDictionaries(*out_batches[0]);
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 098f996..4fb3204 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -22,6 +22,7 @@
#include "gtest/gtest.h"
#include "arrow/io/memory.h"
+#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/test-common.h"
#include "arrow/schema.h"
@@ -39,9 +40,9 @@ class TestSchemaMetadata : public ::testing::Test {
public:
void SetUp() {}
- void CheckRoundtrip(const Schema& schema) {
+ void CheckRoundtrip(const Schema& schema, DictionaryMemo* memo) {
std::shared_ptr<Buffer> buffer;
- ASSERT_OK(WriteSchema(schema, &buffer));
+ ASSERT_OK(WriteSchemaMessage(schema, memo, &buffer));
std::shared_ptr<Message> message;
ASSERT_OK(Message::Open(buffer, 0, &message));
@@ -51,8 +52,10 @@ class TestSchemaMetadata : public ::testing::Test {
auto schema_msg = std::make_shared<SchemaMetadata>(message);
ASSERT_EQ(schema.num_fields(), schema_msg->num_fields());
+ DictionaryMemo empty_memo;
+
std::shared_ptr<Schema> schema2;
- ASSERT_OK(schema_msg->GetSchema(&schema2));
+ ASSERT_OK(schema_msg->GetSchema(empty_memo, &schema2));
AssertSchemaEqual(schema, *schema2);
}
@@ -74,7 +77,9 @@ TEST_F(TestSchemaMetadata, PrimitiveFields) {
auto f10 = std::make_shared<Field>("f10", std::make_shared<BooleanType>());
Schema schema({f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10});
- CheckRoundtrip(schema);
+ DictionaryMemo memo;
+
+ CheckRoundtrip(schema, &memo);
}
TEST_F(TestSchemaMetadata, NestedFields) {
@@ -86,7 +91,9 @@ TEST_F(TestSchemaMetadata, NestedFields) {
auto f1 = std::make_shared<Field>("f1", type2);
Schema schema({f0, f1});
- CheckRoundtrip(schema);
+ DictionaryMemo memo;
+
+ CheckRoundtrip(schema, &memo);
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index cd77220..7c8ddb9 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -25,6 +25,7 @@
#include "flatbuffers/flatbuffers.h"
+#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/ipc/Message_generated.h"
#include "arrow/schema.h"
@@ -115,8 +116,8 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
}
// Forward declaration
-static Status FieldToFlatbuffer(
- FBB& fbb, const std::shared_ptr<Field>& field, FieldOffset* offset);
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+ DictionaryMemo* dictionary_memo, FieldOffset* offset);
static Offset IntToFlatbuffer(FBB& fbb, int bitWidth, bool is_signed) {
return flatbuf::CreateInt(fbb, bitWidth, is_signed).Union();
@@ -126,34 +127,73 @@ static Offset FloatToFlatbuffer(FBB& fbb, flatbuf::Precision precision) {
return flatbuf::CreateFloatingPoint(fbb, precision).Union();
}
-static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, Offset* offset) {
+static Status AppendChildFields(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo) {
FieldOffset field;
- RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(0), &field));
- out_children->push_back(field);
+ for (int i = 0; i < type->num_children(); ++i) {
+ RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), dictionary_memo, &field));
+ out_children->push_back(field);
+ }
+ return Status::OK();
+}
+
+static Status ListToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+ Offset* offset) {
+ RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
*offset = flatbuf::CreateList(fbb).Union();
return Status::OK();
}
static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* out_children, Offset* offset) {
- FieldOffset field;
- for (int i = 0; i < type->num_children(); ++i) {
- RETURN_NOT_OK(FieldToFlatbuffer(fbb, type->child(i), &field));
- out_children->push_back(field);
- }
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+ Offset* offset) {
+ RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
*offset = flatbuf::CreateStruct_(fbb).Union();
return Status::OK();
}
+static Status UnionToFlatBuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
+ std::vector<FieldOffset>* out_children, DictionaryMemo* dictionary_memo,
+ Offset* offset) {
+ RETURN_NOT_OK(AppendChildFields(fbb, type, out_children, dictionary_memo));
+
+ const auto& union_type = static_cast<const UnionType&>(*type);
+
+ flatbuf::UnionMode mode = union_type.mode == UnionMode::SPARSE
+ ? flatbuf::UnionMode_Sparse
+ : flatbuf::UnionMode_Dense;
+
+ std::vector<int32_t> type_ids;
+ type_ids.reserve(union_type.type_codes.size());
+ for (uint8_t code : union_type.type_codes) {
+ type_ids.push_back(code);
+ }
+
+ auto fb_type_ids = fbb.CreateVector(type_ids);
+
+ *offset = flatbuf::CreateUnion(fbb, mode, fb_type_ids).Union();
+ return Status::OK();
+}
+
#define INT_TO_FB_CASE(BIT_WIDTH, IS_SIGNED) \
*out_type = flatbuf::Type_Int; \
*offset = IntToFlatbuffer(fbb, BIT_WIDTH, IS_SIGNED); \
break;
+// TODO(wesm): Convert this to visitor pattern
static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
- flatbuf::Type* out_type, Offset* offset) {
+ flatbuf::Type* out_type, DictionaryMemo* dictionary_memo, Offset* offset) {
+ if (type->type == Type::DICTIONARY) {
+ // In this library, the dictionary "type" is a logical construct. Here we
+ // pass through to the value type, as we've already captured the index
+ // type in the DictionaryEncoding metadata in the parent field
+ const auto& dict_type = static_cast<const DictionaryType&>(*type);
+ return TypeToFlatbuffer(fbb, dict_type.dictionary()->type(), children, layout,
+ out_type, dictionary_memo, offset);
+ }
+
std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
for (const BufferDescr& descr : buffer_layout) {
flatbuf::VectorType vector_type;
@@ -217,10 +257,13 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
break;
case Type::LIST:
*out_type = flatbuf::Type_List;
- return ListToFlatbuffer(fbb, type, children, offset);
+ return ListToFlatbuffer(fbb, type, children, dictionary_memo, offset);
case Type::STRUCT:
*out_type = flatbuf::Type_Struct_;
- return StructToFlatbuffer(fbb, type, children, offset);
+ return StructToFlatbuffer(fbb, type, children, dictionary_memo, offset);
+ case Type::UNION:
+ *out_type = flatbuf::Type_Union;
+ return UnionToFlatBuffer(fbb, type, children, dictionary_memo, offset);
default:
*out_type = flatbuf::Type_NONE; // Make clang-tidy happy
std::stringstream ss;
@@ -230,35 +273,63 @@ static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
return Status::OK();
}
-static Status FieldToFlatbuffer(
- FBB& fbb, const std::shared_ptr<Field>& field, FieldOffset* offset) {
+using DictionaryOffset = flatbuffers::Offset<flatbuf::DictionaryEncoding>;
+
+static DictionaryOffset GetDictionaryEncoding(
+ FBB& fbb, const DictionaryType& type, DictionaryMemo* memo) {
+ int64_t dictionary_id = memo->GetId(type.dictionary());
+
+ // We assume that the dictionary index type (as an integer) has already been
+ // validated elsewhere, and can safely assume we are dealing with signed
+ // integers
+ const auto& fw_index_type = static_cast<const FixedWidthType&>(*type.index_type());
+
+ auto index_type_offset = flatbuf::CreateInt(fbb, fw_index_type.bit_width(), true);
+
+ // TODO(wesm): ordered dictionaries
+ return flatbuf::CreateDictionaryEncoding(fbb, dictionary_id, index_type_offset);
+}
+
+static Status FieldToFlatbuffer(FBB& fbb, const std::shared_ptr<Field>& field,
+ DictionaryMemo* dictionary_memo, FieldOffset* offset) {
auto fb_name = fbb.CreateString(field->name);
flatbuf::Type type_enum;
- Offset type_data;
+ Offset type_offset;
Offset type_layout;
std::vector<FieldOffset> children;
std::vector<VectorLayoutOffset> layout;
- RETURN_NOT_OK(
- TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data));
+ RETURN_NOT_OK(TypeToFlatbuffer(
+ fbb, field->type, &children, &layout, &type_enum, dictionary_memo, &type_offset));
auto fb_children = fbb.CreateVector(children);
auto fb_layout = fbb.CreateVector(layout);
+ DictionaryOffset dictionary = 0;
+ if (field->type->type == Type::DICTIONARY) {
+ dictionary = GetDictionaryEncoding(
+ fbb, static_cast<const DictionaryType&>(*field->type), dictionary_memo);
+ }
+
// TODO: produce the list of VectorTypes
- *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data,
- field->dictionary, fb_children, fb_layout);
+ *offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_offset,
+ dictionary, fb_children, fb_layout);
return Status::OK();
}
-Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out) {
- std::shared_ptr<DataType> type;
+Status FieldFromFlatbufferDictionary(
+ const flatbuf::Field* field, std::shared_ptr<Field>* out) {
+ // Need an empty memo to pass down for constructing children
+ DictionaryMemo dummy_memo;
+
+ // Any DictionaryEncoding set is ignored here
+ std::shared_ptr<DataType> type;
auto children = field->children();
std::vector<std::shared_ptr<Field>> child_fields(children->size());
for (size_t i = 0; i < children->size(); ++i) {
- RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), &child_fields[i]));
+ RETURN_NOT_OK(FieldFromFlatbuffer(children->Get(i), dummy_memo, &child_fields[i]));
}
RETURN_NOT_OK(
@@ -268,6 +339,39 @@ Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>*
return Status::OK();
}
+Status FieldFromFlatbuffer(const flatbuf::Field* field,
+ const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out) {
+ std::shared_ptr<DataType> type;
+
+ const flatbuf::DictionaryEncoding* encoding = field->dictionary();
+
+ if (encoding == nullptr) {
+ // The field is not dictionary encoded. We must potentially visit its
+ // children to fully reconstruct the data type
+ auto children = field->children();
+ std::vector<std::shared_ptr<Field>> child_fields(children->size());
+ for (size_t i = 0; i < children->size(); ++i) {
+ RETURN_NOT_OK(
+ FieldFromFlatbuffer(children->Get(i), dictionary_memo, &child_fields[i]));
+ }
+ RETURN_NOT_OK(
+ TypeFromFlatbuffer(field->type_type(), field->type(), child_fields, &type));
+ } else {
+ // The field is dictionary encoded. The type of the dictionary values has
+ // been determined elsewhere, and is stored in the DictionaryMemo. Here we
+ // construct the logical DictionaryType object
+
+ std::shared_ptr<Array> dictionary;
+ RETURN_NOT_OK(dictionary_memo.GetDictionary(encoding->id(), &dictionary));
+
+ std::shared_ptr<DataType> index_type;
+ RETURN_NOT_OK(IntFromFlatbuffer(encoding->indexType(), &index_type));
+ type = std::make_shared<DictionaryType>(index_type, dictionary);
+ }
+ *out = std::make_shared<Field>(field->name()->str(), type, field->nullable());
+ return Status::OK();
+}
+
// Implement MessageBuilder
// will return the endianness of the system we are running on
@@ -281,13 +385,13 @@ flatbuf::Endianness endianness() {
return bint.c[0] == 1 ? flatbuf::Endianness_Big : flatbuf::Endianness_Little;
}
-Status SchemaToFlatbuffer(
- FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out) {
+Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo,
+ flatbuffers::Offset<flatbuf::Schema>* out) {
std::vector<FieldOffset> field_offsets;
for (int i = 0; i < schema.num_fields(); ++i) {
std::shared_ptr<Field> field = schema.field(i);
FieldOffset offset;
- RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, &offset));
+ RETURN_NOT_OK(FieldToFlatbuffer(fbb, field, dictionary_memo, &offset));
field_offsets.push_back(offset);
}
@@ -295,29 +399,63 @@ Status SchemaToFlatbuffer(
return Status::OK();
}
-Status MessageBuilder::SetSchema(const Schema& schema) {
- flatbuffers::Offset<flatbuf::Schema> fb_schema;
- RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, &fb_schema));
+class MessageBuilder {
+ public:
+ Status SetSchema(const Schema& schema, DictionaryMemo* dictionary_memo) {
+ flatbuffers::Offset<flatbuf::Schema> fb_schema;
+ RETURN_NOT_OK(SchemaToFlatbuffer(fbb_, schema, dictionary_memo, &fb_schema));
- header_type_ = flatbuf::MessageHeader_Schema;
- header_ = fb_schema.Union();
- body_length_ = 0;
- return Status::OK();
-}
+ header_type_ = flatbuf::MessageHeader_Schema;
+ header_ = fb_schema.Union();
+ body_length_ = 0;
+ return Status::OK();
+ }
-Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers) {
- header_type_ = flatbuf::MessageHeader_RecordBatch;
- header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes),
- fbb_.CreateVectorOfStructs(buffers))
- .Union();
- body_length_ = body_length;
+ Status SetRecordBatch(int32_t length, int64_t body_length,
+ const std::vector<flatbuf::FieldNode>& nodes,
+ const std::vector<flatbuf::Buffer>& buffers) {
+ header_type_ = flatbuf::MessageHeader_RecordBatch;
+ header_ = flatbuf::CreateRecordBatch(fbb_, length, fbb_.CreateVectorOfStructs(nodes),
+ fbb_.CreateVectorOfStructs(buffers))
+ .Union();
+ body_length_ = body_length;
- return Status::OK();
+ return Status::OK();
+ }
+
+ Status SetDictionary(int64_t id, int32_t length, int64_t body_length,
+ const std::vector<flatbuf::FieldNode>& nodes,
+ const std::vector<flatbuf::Buffer>& buffers) {
+ header_type_ = flatbuf::MessageHeader_DictionaryBatch;
+
+ auto record_batch = flatbuf::CreateRecordBatch(fbb_, length,
+ fbb_.CreateVectorOfStructs(nodes), fbb_.CreateVectorOfStructs(buffers));
+
+ header_ = flatbuf::CreateDictionaryBatch(fbb_, id, record_batch).Union();
+ body_length_ = body_length;
+ return Status::OK();
+ }
+
+ Status Finish();
+
+ Status GetBuffer(std::shared_ptr<Buffer>* out);
+
+ private:
+ flatbuf::MessageHeader header_type_;
+ flatbuffers::Offset<void> header_;
+ int64_t body_length_;
+ flatbuffers::FlatBufferBuilder fbb_;
+};
+
+Status WriteSchemaMessage(
+ const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out) {
+ MessageBuilder message;
+ RETURN_NOT_OK(message.SetSchema(schema, dictionary_memo));
+ RETURN_NOT_OK(message.Finish());
+ return message.GetBuffer(out);
}
-Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
MessageBuilder builder;
@@ -326,6 +464,15 @@ Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
return builder.GetBuffer(out);
}
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
+ const std::vector<flatbuf::FieldNode>& nodes,
+ const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
+ MessageBuilder builder;
+ RETURN_NOT_OK(builder.SetDictionary(id, length, body_length, nodes, buffers));
+ RETURN_NOT_OK(builder.Finish());
+ return builder.GetBuffer(out);
+}
+
Status MessageBuilder::Finish() {
auto message =
flatbuf::CreateMessage(fbb_, kMetadataVersion, header_type_, header_, body_length_);
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index d94a8ab..59afecb 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -46,31 +46,34 @@ using Offset = flatbuffers::Offset<void>;
static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
-Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out);
+// Construct a field with type for a dictionary-encoded field. None of its
+// children or children's descendents can be dictionary encoded
+Status FieldFromFlatbufferDictionary(
+ const flatbuf::Field* field, std::shared_ptr<Field>* out);
-Status SchemaToFlatbuffer(
- FBB& fbb, const Schema& schema, flatbuffers::Offset<flatbuf::Schema>* out);
+// Construct a field for a non-dictionary-encoded field. Its children may be
+// dictionary encoded
+Status FieldFromFlatbuffer(const flatbuf::Field* field,
+ const DictionaryMemo& dictionary_memo, std::shared_ptr<Field>* out);
-class MessageBuilder {
- public:
- Status SetSchema(const Schema& schema);
+Status SchemaToFlatbuffer(FBB& fbb, const Schema& schema, DictionaryMemo* dictionary_memo,
+ flatbuffers::Offset<flatbuf::Schema>* out);
- Status SetRecordBatch(int32_t length, int64_t body_length,
- const std::vector<flatbuf::FieldNode>& nodes,
- const std::vector<flatbuf::Buffer>& buffers);
-
- Status Finish();
-
- Status GetBuffer(std::shared_ptr<Buffer>* out);
-
- private:
- flatbuf::MessageHeader header_type_;
- flatbuffers::Offset<void> header_;
- int64_t body_length_;
- flatbuffers::FlatBufferBuilder fbb_;
-};
+// Serialize arrow::Schema as a Flatbuffer
+//
+// \param[in] schema a Schema instance
+// \param[inout] dictionary_memo class for tracking dictionaries and assigning
+// dictionary ids
+// \param[out] out the serialized arrow::Buffer
+// \return Status outcome
+Status WriteSchemaMessage(
+ const Schema& schema, DictionaryMemo* dictionary_memo, std::shared_ptr<Buffer>* out);
+
+Status WriteRecordBatchMessage(int32_t length, int64_t body_length,
+ const std::vector<flatbuf::FieldNode>& nodes,
+ const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
-Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
+Status WriteDictionaryMessage(int64_t id, int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index a97965c..2ba44ac 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -19,6 +19,7 @@
#include <cstdint>
#include <memory>
+#include <sstream>
#include <vector>
#include "flatbuffers/flatbuffers.h"
@@ -38,11 +39,60 @@ namespace flatbuf = org::apache::arrow::flatbuf;
namespace ipc {
-Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out) {
- MessageBuilder message;
- RETURN_NOT_OK(message.SetSchema(schema));
- RETURN_NOT_OK(message.Finish());
- return message.GetBuffer(out);
+// ----------------------------------------------------------------------
+// Memoization data structure for handling shared dictionaries
+
+DictionaryMemo::DictionaryMemo() {}
+
+// Returns KeyError if dictionary not found
+Status DictionaryMemo::GetDictionary(
+ int64_t id, std::shared_ptr<Array>* dictionary) const {
+ auto it = id_to_dictionary_.find(id);
+ if (it == id_to_dictionary_.end()) {
+ std::stringstream ss;
+ ss << "Dictionary with id " << id << " not found";
+ return Status::KeyError(ss.str());
+ }
+ *dictionary = it->second;
+ return Status::OK();
+}
+
+int64_t DictionaryMemo::GetId(const std::shared_ptr<Array>& dictionary) {
+ intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
+ auto it = dictionary_to_id_.find(address);
+ if (it != dictionary_to_id_.end()) {
+ // Dictionary already observed, return the id
+ return it->second;
+ } else {
+ int64_t new_id = static_cast<int64_t>(dictionary_to_id_.size()) + 1;
+ dictionary_to_id_[address] = new_id;
+ id_to_dictionary_[new_id] = dictionary;
+ return new_id;
+ }
+}
+
+bool DictionaryMemo::HasDictionary(const std::shared_ptr<Array>& dictionary) const {
+ intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
+ auto it = dictionary_to_id_.find(address);
+ return it != dictionary_to_id_.end();
+}
+
+bool DictionaryMemo::HasDictionaryId(int64_t id) const {
+ auto it = id_to_dictionary_.find(id);
+ return it != id_to_dictionary_.end();
+}
+
+Status DictionaryMemo::AddDictionary(
+ int64_t id, const std::shared_ptr<Array>& dictionary) {
+ if (HasDictionaryId(id)) {
+ std::stringstream ss;
+ ss << "Dictionary with id " << id << " already exists";
+ return Status::KeyError(ss.str());
+ }
+ intptr_t address = reinterpret_cast<intptr_t>(dictionary.get());
+ id_to_dictionary_[id] = dictionary;
+ dictionary_to_id_[address] = id;
+ return Status::OK();
}
//----------------------------------------------------------------------
@@ -113,10 +163,35 @@ class SchemaMetadata::SchemaMetadataImpl {
explicit SchemaMetadataImpl(const void* schema)
: schema_(static_cast<const flatbuf::Schema*>(schema)) {}
- const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); }
+ const flatbuf::Field* get_field(int i) const { return schema_->fields()->Get(i); }
int num_fields() const { return schema_->fields()->size(); }
+ Status VisitField(const flatbuf::Field* field, DictionaryTypeMap* id_to_field) const {
+ const flatbuf::DictionaryEncoding* dict_metadata = field->dictionary();
+ if (dict_metadata == nullptr) {
+ // Field is not dictionary encoded. Visit children
+ auto children = field->children();
+ for (flatbuffers::uoffset_t i = 0; i < children->size(); ++i) {
+ RETURN_NOT_OK(VisitField(children->Get(i), id_to_field));
+ }
+ } else {
+ // Field is dictionary encoded. Construct the data type for the
+ // dictionary (no descendents can be dictionary encoded)
+ std::shared_ptr<Field> dictionary_field;
+ RETURN_NOT_OK(FieldFromFlatbufferDictionary(field, &dictionary_field));
+ (*id_to_field)[dict_metadata->id()] = dictionary_field;
+ }
+ return Status::OK();
+ }
+
+ Status GetDictionaryTypes(DictionaryTypeMap* id_to_field) const {
+ for (int i = 0; i < num_fields(); ++i) {
+ RETURN_NOT_OK(VisitField(get_field(i), id_to_field));
+ }
+ return Status::OK();
+ }
+
private:
const flatbuf::Schema* schema_;
};
@@ -138,15 +213,16 @@ int SchemaMetadata::num_fields() const {
return impl_->num_fields();
}
-Status SchemaMetadata::GetField(int i, std::shared_ptr<Field>* out) const {
- const flatbuf::Field* field = impl_->field(i);
- return FieldFromFlatbuffer(field, out);
+Status SchemaMetadata::GetDictionaryTypes(DictionaryTypeMap* id_to_field) const {
+ return impl_->GetDictionaryTypes(id_to_field);
}
-Status SchemaMetadata::GetSchema(std::shared_ptr<Schema>* out) const {
+Status SchemaMetadata::GetSchema(
+ const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const {
std::vector<std::shared_ptr<Field>> fields(num_fields());
for (int i = 0; i < this->num_fields(); ++i) {
- RETURN_NOT_OK(GetField(i, &fields[i]));
+ const flatbuf::Field* field = impl_->get_field(i);
+ RETURN_NOT_OK(FieldFromFlatbuffer(field, dictionary_memo, &fields[i]));
}
*out = std::make_shared<Schema>(fields);
return Status::OK();
@@ -173,28 +249,34 @@ class RecordBatchMetadata::RecordBatchMetadataImpl {
int num_fields() const { return batch_->nodes()->size(); }
+ void set_message(const std::shared_ptr<Message>& message) { message_ = message; }
+
+ void set_buffer(const std::shared_ptr<Buffer>& buffer) { buffer_ = buffer; }
+
private:
const flatbuf::RecordBatch* batch_;
const flatbuffers::Vector<const flatbuf::FieldNode*>* nodes_;
const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_;
+
+ // Possible parents, owns the flatbuffer data
+ std::shared_ptr<Message> message_;
+ std::shared_ptr<Buffer> buffer_;
};
RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) {
- message_ = message;
impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
+ impl_->set_message(message);
}
-RecordBatchMetadata::RecordBatchMetadata(
- const std::shared_ptr<Buffer>& buffer, int64_t offset) {
- message_ = nullptr;
- buffer_ = buffer;
-
- const flatbuf::RecordBatch* metadata =
- flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset);
-
- // TODO(wesm): validate table
+RecordBatchMetadata::RecordBatchMetadata(const void* header) {
+ impl_.reset(new RecordBatchMetadataImpl(header));
+}
- impl_.reset(new RecordBatchMetadataImpl(metadata));
+RecordBatchMetadata::RecordBatchMetadata(
+ const std::shared_ptr<Buffer>& buffer, int64_t offset)
+ : RecordBatchMetadata(buffer->data() + offset) {
+ // Preserve ownership
+ impl_->set_buffer(buffer);
}
RecordBatchMetadata::~RecordBatchMetadata() {}
@@ -232,5 +314,64 @@ int RecordBatchMetadata::num_fields() const {
return impl_->num_fields();
}
+// ----------------------------------------------------------------------
+// DictionaryBatchMetadata
+
+class DictionaryBatchMetadata::DictionaryBatchMetadataImpl {
+ public:
+ explicit DictionaryBatchMetadataImpl(const void* dictionary)
+ : metadata_(static_cast<const flatbuf::DictionaryBatch*>(dictionary)) {
+ record_batch_.reset(new RecordBatchMetadata(metadata_->data()));
+ }
+
+ int64_t id() const { return metadata_->id(); }
+ const RecordBatchMetadata& record_batch() const { return *record_batch_; }
+
+ void set_message(const std::shared_ptr<Message>& message) { message_ = message; }
+
+ private:
+ const flatbuf::DictionaryBatch* metadata_;
+
+ std::unique_ptr<RecordBatchMetadata> record_batch_;
+
+ // Parent, owns the flatbuffer data
+ std::shared_ptr<Message> message_;
+};
+
+DictionaryBatchMetadata::DictionaryBatchMetadata(
+ const std::shared_ptr<Message>& message) {
+ impl_.reset(new DictionaryBatchMetadataImpl(message->impl_->header()));
+ impl_->set_message(message);
+}
+
+DictionaryBatchMetadata::~DictionaryBatchMetadata() {}
+
+int64_t DictionaryBatchMetadata::id() const {
+ return impl_->id();
+}
+
+const RecordBatchMetadata& DictionaryBatchMetadata::record_batch() const {
+ return impl_->record_batch();
+}
+
+// ----------------------------------------------------------------------
+// Conveniences
+
+Status ReadMessage(int64_t offset, int32_t metadata_length,
+ io::ReadableFileInterface* file, std::shared_ptr<Message>* message) {
+ std::shared_ptr<Buffer> buffer;
+ RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
+
+ int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
+
+ if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
+ std::stringstream ss;
+ ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset
+ << ", metadata length: " << metadata_length;
+ return Status::Invalid(ss.str());
+ }
+ return Message::Open(buffer, 4, message);
+}
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/d28f1c1e/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 81e3dbd..0091067 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -22,13 +22,17 @@
#include <cstdint>
#include <memory>
+#include <unordered_map>
#include <vector>
+#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
namespace arrow {
+class Array;
class Buffer;
+struct DataType;
struct Field;
class Schema;
class Status;
@@ -36,6 +40,7 @@ class Status;
namespace io {
class OutputStream;
+class ReadableFileInterface;
} // namespace io
@@ -47,9 +52,38 @@ struct MetadataVersion {
//----------------------------------------------------------------------
-// Serialize arrow::Schema as a Flatbuffer
-ARROW_EXPORT
-Status WriteSchema(const Schema& schema, std::shared_ptr<Buffer>* out);
+using DictionaryMap = std::unordered_map<int64_t, std::shared_ptr<Array>>;
+using DictionaryTypeMap = std::unordered_map<int64_t, std::shared_ptr<Field>>;
+
+// Memoization data structure for handling shared dictionaries
+class DictionaryMemo {
+ public:
+ DictionaryMemo();
+
+ // Returns KeyError if dictionary not found
+ Status GetDictionary(int64_t id, std::shared_ptr<Array>* dictionary) const;
+
+ int64_t GetId(const std::shared_ptr<Array>& dictionary);
+
+ bool HasDictionary(const std::shared_ptr<Array>& dictionary) const;
+ bool HasDictionaryId(int64_t id) const;
+
+ // Add a dictionary to the memo with a particular id. Returns KeyError if
+ // that dictionary already exists
+ Status AddDictionary(int64_t id, const std::shared_ptr<Array>& dictionary);
+
+ const DictionaryMap& id_to_dictionary() const { return id_to_dictionary_; }
+
+ private:
+ // Dictionary memory addresses, to track whether a dictionary has been seen
+ // before
+ std::unordered_map<intptr_t, int64_t> dictionary_to_id_;
+
+ // Map of dictionary id to dictionary array
+ DictionaryMap id_to_dictionary_;
+
+ DISALLOW_COPY_AND_ASSIGN(DictionaryMemo);
+};
// Read interface classes. We do not fully deserialize the flatbuffers so that
// individual fields metadata can be retrieved from very large schema without
@@ -69,12 +103,15 @@ class ARROW_EXPORT SchemaMetadata {
int num_fields() const;
- // Construct an arrow::Field for the i-th value in the metadata
- Status GetField(int i, std::shared_ptr<Field>* out) const;
+ // Retrieve a list of all the dictionary ids and types required by the schema for
+ // reconstruction. The presumption is that these will be loaded either from
+ // the stream or file (or they may already be somewhere else in memory)
+ Status GetDictionaryTypes(DictionaryTypeMap* id_to_field) const;
// Construct a complete Schema from the message. May be expensive for very
// large schemas if you are only interested in a few fields
- Status GetSchema(std::shared_ptr<Schema>* out) const;
+ Status GetSchema(
+ const DictionaryMemo& dictionary_memo, std::shared_ptr<Schema>* out) const;
private:
// Parent, owns the flatbuffer data
@@ -82,6 +119,8 @@ class ARROW_EXPORT SchemaMetadata {
class SchemaMetadataImpl;
std::unique_ptr<SchemaMetadataImpl> impl_;
+
+ DISALLOW_COPY_AND_ASSIGN(SchemaMetadata);
};
// Field metadata
@@ -99,8 +138,10 @@ struct ARROW_EXPORT BufferMetadata {
// Container for serialized record batch metadata contained in an IPC message
class ARROW_EXPORT RecordBatchMetadata {
public:
+ // Instantiate from opaque pointer. Memory ownership must be preserved
+ // elsewhere (e.g. in a dictionary batch)
+ explicit RecordBatchMetadata(const void* header);
explicit RecordBatchMetadata(const std::shared_ptr<Message>& message);
-
RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
~RecordBatchMetadata();
@@ -113,18 +154,25 @@ class ARROW_EXPORT RecordBatchMetadata {
int num_fields() const;
private:
- // Parent, owns the flatbuffer data
- std::shared_ptr<Message> message_;
- std::shared_ptr<Buffer> buffer_;
-
class RecordBatchMetadataImpl;
std::unique_ptr<RecordBatchMetadataImpl> impl_;
+
+ DISALLOW_COPY_AND_ASSIGN(RecordBatchMetadata);
};
class ARROW_EXPORT DictionaryBatchMetadata {
public:
+ explicit DictionaryBatchMetadata(const std::shared_ptr<Message>& message);
+ ~DictionaryBatchMetadata();
+
int64_t id() const;
- std::unique_ptr<RecordBatchMetadata> data() const;
+ const RecordBatchMetadata& record_batch() const;
+
+ private:
+ class DictionaryBatchMetadataImpl;
+ std::unique_ptr<DictionaryBatchMetadataImpl> impl_;
+
+ DISALLOW_COPY_AND_ASSIGN(DictionaryBatchMetadata);
};
class ARROW_EXPORT Message {
@@ -141,24 +189,31 @@ class ARROW_EXPORT Message {
private:
Message(const std::shared_ptr<Buffer>& buffer, int64_t offset);
+ friend class DictionaryBatchMetadata;
friend class RecordBatchMetadata;
friend class SchemaMetadata;
// Hide serialization details from user API
class MessageImpl;
std::unique_ptr<MessageImpl> impl_;
-};
-struct ARROW_EXPORT FileBlock {
- FileBlock() {}
- FileBlock(int64_t offset, int32_t metadata_length, int64_t body_length)
- : offset(offset), metadata_length(metadata_length), body_length(body_length) {}
-
- int64_t offset;
- int32_t metadata_length;
- int64_t body_length;
+ DISALLOW_COPY_AND_ASSIGN(Message);
};
+/// Read a length-prefixed message flatbuffer starting at the indicated file
+/// offset
+///
+/// The metadata_length includes at least the length prefix and the flatbuffer
+///
+/// \param[in] offset the position in the file where the message starts. The
+/// first 4 bytes after the offset are the message length
+/// \param[in] metadata_length the total number of bytes to read from file
+/// \param[in] file the seekable file interface to read from
+/// \param[out] message the message read
+/// \return Status success or failure
+Status ReadMessage(int64_t offset, int32_t metadata_length,
+ io::ReadableFileInterface* file, std::shared_ptr<Message>* message);
+
} // namespace ipc
} // namespace arrow