You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/11/29 02:29:28 UTC
[2/2] arrow git commit: ARROW-363: [Java/C++] integration testing
harness, initial integration tests
ARROW-363: [Java/C++] integration testing harness, initial integration tests
This also includes format reconciliation as discussed in ARROW-384.
Author: Wes McKinney <we...@twosigma.com>
Closes #211 from wesm/ARROW-363 and squashes the following commits:
6982c3c [Wes McKinney] Permit end of buffer IPC reads if length is 0
4d46c8b [Wes McKinney] Fix logical error with offsets array in JsonFileWriter. Add broken string test case to simple.json
36ab5d6 [Wes McKinney] Increment MetadataVersion in flatbuffer
844257e [Wes McKinney] cpplint
a2711f2 [Wes McKinney] Address other format incompatibilities, write vectorLayout to Arrow metadata
13608ef [Wes McKinney] Relax 64 byte padding. Do not write RecordBatch embedded in Message for now
6a66fc8 [Wes McKinney] Write record batch size prefix in Java
72ea42c [Wes McKinney] Note that padding is 64-bytes at start of file (for now)
c2ffde4 [Wes McKinney] More notes about the file format
aef4382 [Wes McKinney] cpplint
85128f7 [Wes McKinney] Refactor IPC/File record batch read/write structure to reflect discussion in ARROW-384
dbd6ed6 [Wes McKinney] Do not embed metadata length in WriteDataHeader
c529d63 [Wes McKinney] Fix JSON integration test example to make it further
d806aa6 [Wes McKinney] Exclude JSON files from Apache RAT checks
a7e2d4b [Wes McKinney] Draft testing harness
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e3c167bd
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e3c167bd
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e3c167bd
Branch: refs/heads/master
Commit: e3c167bd101734f92c3a2be2eb7f56f1fba91e67
Parents: 86f56a6
Author: Wes McKinney <we...@twosigma.com>
Authored: Mon Nov 28 21:29:19 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Nov 28 21:29:19 2016 -0500
----------------------------------------------------------------------
.gitignore | 26 ++
cpp/CMakeLists.txt | 1 -
cpp/src/arrow/io/io-file-test.cc | 2 +-
cpp/src/arrow/io/memory.cc | 25 +-
cpp/src/arrow/io/memory.h | 8 +-
cpp/src/arrow/ipc/adapter.cc | 251 ++++++++++---------
cpp/src/arrow/ipc/adapter.h | 65 ++---
cpp/src/arrow/ipc/file.cc | 31 ++-
cpp/src/arrow/ipc/ipc-adapter-test.cc | 85 ++++---
cpp/src/arrow/ipc/ipc-file-test.cc | 2 +-
cpp/src/arrow/ipc/ipc-json-test.cc | 20 +-
cpp/src/arrow/ipc/ipc-metadata-test.cc | 12 +-
cpp/src/arrow/ipc/json-integration-test.cc | 30 ++-
cpp/src/arrow/ipc/json-internal.cc | 110 +++-----
cpp/src/arrow/ipc/metadata-internal.cc | 100 +++++---
cpp/src/arrow/ipc/metadata-internal.h | 6 +-
cpp/src/arrow/ipc/metadata.cc | 115 +++++----
cpp/src/arrow/ipc/metadata.h | 50 ++--
cpp/src/arrow/ipc/test-common.h | 15 +-
cpp/src/arrow/ipc/util.h | 6 +-
cpp/src/arrow/test-util.h | 8 +-
cpp/src/arrow/type.cc | 46 +++-
cpp/src/arrow/type.h | 73 ++++--
cpp/src/arrow/types/primitive.cc | 2 +-
cpp/src/arrow/util/bit-util.h | 4 +
dev/release/run-rat.sh | 3 +-
format/IPC.md | 106 ++++++++
format/Message.fbs | 3 +-
integration/data/simple.json | 66 +++++
integration/integration_test.py | 177 +++++++++++++
java/pom.xml | 6 +-
java/tools/pom.xml | 6 +
.../org/apache/arrow/tools/Integration.java | 1 +
.../org/apache/arrow/vector/VectorLoader.java | 4 +-
.../apache/arrow/vector/file/ArrowReader.java | 6 +-
.../apache/arrow/vector/file/ArrowWriter.java | 23 +-
.../arrow/vector/file/json/JsonFileReader.java | 9 +-
.../arrow/vector/file/json/JsonFileWriter.java | 2 +-
python/.gitignore | 10 -
39 files changed, 1024 insertions(+), 491 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a00cbba
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Compiled source
+*.a
+*.dll
+*.o
+*.py[ocd]
+*.so
+*.dylib
+.build_cache_dir
+MANIFEST
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 0edb8ce..1a97008 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -528,7 +528,6 @@ if(ARROW_BUILD_TESTS)
ExternalProject_Add(gflags_ep
GIT_REPOSITORY https://github.com/gflags/gflags.git
GIT_TAG cce68f0c9c5d054017425e6e6fd54f696d36e8ee
- # URL "https://github.com/gflags/gflags/archive/v${GFLAGS_VERSION}.tar.gz"
BUILD_IN_SOURCE 1
CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DCMAKE_INSTALL_PREFIX=${GFLAGS_PREFIX}
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/io-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc
index 54c21d2..fad49ce 100644
--- a/cpp/src/arrow/io/io-file-test.cc
+++ b/cpp/src/arrow/io/io-file-test.cc
@@ -19,7 +19,7 @@
#include <cstdio>
#include <cstring>
#ifndef _MSC_VER
-# include <fcntl.h>
+#include <fcntl.h>
#endif
#include <fstream>
#include <memory>
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 71b0f1e..af495e2 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -258,8 +258,11 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
// ----------------------------------------------------------------------
// In-memory buffer reader
-BufferReader::BufferReader(const uint8_t* buffer, int buffer_size)
- : buffer_(buffer), buffer_size_(buffer_size), position_(0) {}
+BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
+ : buffer_(buffer), data_(buffer->data()), size_(buffer->size()), position_(0) {}
+
+BufferReader::BufferReader(const uint8_t* data, int64_t size)
+ : buffer_(nullptr), data_(data), size_(size), position_(0) {}
BufferReader::~BufferReader() {}
@@ -278,26 +281,32 @@ bool BufferReader::supports_zero_copy() const {
}
Status BufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) {
- memcpy(buffer, buffer_ + position_, nbytes);
- *bytes_read = std::min(nbytes, buffer_size_ - position_);
+ memcpy(buffer, data_ + position_, nbytes);
+ *bytes_read = std::min(nbytes, size_ - position_);
position_ += *bytes_read;
return Status::OK();
}
Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
- int64_t size = std::min(nbytes, buffer_size_ - position_);
- *out = std::make_shared<Buffer>(buffer_ + position_, size);
+ int64_t size = std::min(nbytes, size_ - position_);
+
+ if (buffer_ != nullptr) {
+ *out = SliceBuffer(buffer_, position_, size);
+ } else {
+ *out = std::make_shared<Buffer>(data_ + position_, size);
+ }
+
position_ += nbytes;
return Status::OK();
}
Status BufferReader::GetSize(int64_t* size) {
- *size = buffer_size_;
+ *size = size_;
return Status::OK();
}
Status BufferReader::Seek(int64_t position) {
- if (position < 0 || position >= buffer_size_) {
+ if (position < 0 || position >= size_) {
return Status::IOError("position out of bounds");
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index df2fe8d..b72f93b 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -99,7 +99,8 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
class ARROW_EXPORT BufferReader : public ReadableFileInterface {
public:
- BufferReader(const uint8_t* buffer, int buffer_size);
+ explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
+ BufferReader(const uint8_t* data, int64_t size);
~BufferReader();
Status Close() override;
@@ -116,8 +117,9 @@ class ARROW_EXPORT BufferReader : public ReadableFileInterface {
bool supports_zero_copy() const override;
private:
- const uint8_t* buffer_;
- int buffer_size_;
+ std::shared_ptr<Buffer> buffer_;
+ const uint8_t* data_;
+ int64_t size_;
int64_t position_;
};
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index da718c0..edf716f 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -48,15 +48,6 @@ namespace flatbuf = org::apache::arrow::flatbuf;
namespace ipc {
-namespace {
-Status CheckMultipleOf64(int64_t size) {
- if (BitUtil::IsMultipleOf64(size)) { return Status::OK(); }
- return Status::Invalid(
- "Attempted to write a buffer that "
- "wasn't a multiple of 64 bytes");
-}
-}
-
static bool IsPrimitive(const DataType* type) {
DCHECK(type != nullptr);
switch (type->type) {
@@ -124,30 +115,30 @@ Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes
class RecordBatchWriter {
public:
RecordBatchWriter(const std::vector<std::shared_ptr<Array>>& columns, int32_t num_rows,
- int max_recursion_depth)
+ int64_t buffer_start_offset, int max_recursion_depth)
: columns_(&columns),
num_rows_(num_rows),
+ buffer_start_offset_(buffer_start_offset),
max_recursion_depth_(max_recursion_depth) {}
- Status AssemblePayload() {
+ Status AssemblePayload(int64_t* body_length) {
+ if (field_nodes_.size() > 0) {
+ field_nodes_.clear();
+ buffer_meta_.clear();
+ buffers_.clear();
+ }
+
// Perform depth-first traversal of the row-batch
for (size_t i = 0; i < columns_->size(); ++i) {
const Array* arr = (*columns_)[i].get();
RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_));
}
- return Status::OK();
- }
- Status Write(
- io::OutputStream* dst, int64_t* body_end_offset, int64_t* header_end_offset) {
- // Get the starting position
- int64_t start_position;
- RETURN_NOT_OK(dst->Tell(&start_position));
-
- // Keep track of the current position so we can determine the size of the
- // message body
- int64_t position = start_position;
+ // The position for the start of a buffer relative to the passed frame of
+ // reference. May be 0 or some other position in an address space
+ int64_t offset = buffer_start_offset_;
+ // Construct the buffer metadata for the record batch header
for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
int64_t size = 0;
@@ -161,65 +152,103 @@ class RecordBatchWriter {
// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
- // future. Use page=0 for now
+ // future. Use page = -1 for now
//
// Note that page ids are a bespoke notion for Arrow and not a feature we
// are using from any OS-level shared memory. The thought is that systems
// may (in the future) associate integer page id's with physical memory
// pages (according to whatever is the desired shared memory mechanism)
- buffer_meta_.push_back(flatbuf::Buffer(0, position, size + padding));
-
- if (size > 0) {
- RETURN_NOT_OK(dst->Write(buffer->data(), size));
- position += size;
- }
-
- if (padding > 0) {
- RETURN_NOT_OK(dst->Write(kPaddingBytes, padding));
- position += padding;
- }
+ buffer_meta_.push_back(flatbuf::Buffer(-1, offset, size + padding));
+ offset += size + padding;
}
- *body_end_offset = position;
+ *body_length = offset - buffer_start_offset_;
+ DCHECK(BitUtil::IsMultipleOf64(*body_length));
+
+ return Status::OK();
+ }
+ Status WriteMetadata(
+ int64_t body_length, io::OutputStream* dst, int32_t* metadata_length) {
// Now that we have computed the locations of all of the buffers in shared
// memory, the data header can be converted to a flatbuffer and written out
//
// Note: The memory written here is prefixed by the size of the flatbuffer
- // itself as an int32_t. On reading from a input, you will have to
- // determine the data header size then request a buffer such that you can
- // construct the flatbuffer data accessor object (see arrow::ipc::Message)
- std::shared_ptr<Buffer> data_header;
- RETURN_NOT_OK(WriteDataHeader(
- num_rows_, position - start_position, field_nodes_, buffer_meta_, &data_header));
+ // itself as an int32_t.
+ std::shared_ptr<Buffer> metadata_fb;
+ RETURN_NOT_OK(WriteRecordBatchMetadata(
+ num_rows_, body_length, field_nodes_, buffer_meta_, &metadata_fb));
+
+ // Need to write 4 bytes (metadata size), the metadata, plus padding to
+ // fall on a 64-byte offset
+ int64_t padded_metadata_length =
+ BitUtil::RoundUpToMultipleOf64(metadata_fb->size() + 4);
+
+ // The returned metadata size includes the length prefix, the flatbuffer,
+ // plus padding
+ *metadata_length = padded_metadata_length;
- // Write the data header at the end
- RETURN_NOT_OK(dst->Write(data_header->data(), data_header->size()));
+ // Write the flatbuffer size prefix
+ int32_t flatbuffer_size = metadata_fb->size();
+ RETURN_NOT_OK(
+ dst->Write(reinterpret_cast<const uint8_t*>(&flatbuffer_size), sizeof(int32_t)));
- position += data_header->size();
- *header_end_offset = position;
+ // Write the flatbuffer
+ RETURN_NOT_OK(dst->Write(metadata_fb->data(), metadata_fb->size()));
- return Align(dst, &position);
+ // Write any padding
+ int64_t padding = padded_metadata_length - metadata_fb->size() - 4;
+ if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
+
+ return Status::OK();
}
- Status Align(io::OutputStream* dst, int64_t* position) {
- // Write all buffers here on word boundaries
- // TODO(wesm): Is there benefit to 64-byte padding in IPC?
- int64_t remainder = PaddedLength(*position) - *position;
- if (remainder > 0) {
- RETURN_NOT_OK(dst->Write(kPaddingBytes, remainder));
- *position += remainder;
+ Status Write(io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) {
+ RETURN_NOT_OK(AssemblePayload(body_length));
+
+#ifndef NDEBUG
+ int64_t start_position, current_position;
+ RETURN_NOT_OK(dst->Tell(&start_position));
+#endif
+
+ RETURN_NOT_OK(WriteMetadata(*body_length, dst, metadata_length));
+
+#ifndef NDEBUG
+ RETURN_NOT_OK(dst->Tell(¤t_position));
+ DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
+ // Now write the buffers
+ for (size_t i = 0; i < buffers_.size(); ++i) {
+ const Buffer* buffer = buffers_[i].get();
+ int64_t size = 0;
+ int64_t padding = 0;
+
+ // The buffer might be null if we are handling zero row lengths.
+ if (buffer) {
+ size = buffer->size();
+ padding = BitUtil::RoundUpToMultipleOf64(size) - size;
+ }
+
+ if (size > 0) { RETURN_NOT_OK(dst->Write(buffer->data(), size)); }
+
+ if (padding > 0) { RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); }
}
+
+#ifndef NDEBUG
+ RETURN_NOT_OK(dst->Tell(¤t_position));
+ DCHECK(BitUtil::IsMultipleOf8(current_position));
+#endif
+
return Status::OK();
}
- // This must be called after invoking AssemblePayload
Status GetTotalSize(int64_t* size) {
// emulates the behavior of Write without actually writing
- int64_t body_offset;
- int64_t data_header_offset;
+ int32_t metadata_length;
+ int64_t body_length;
MockOutputStream dst;
- RETURN_NOT_OK(Write(&dst, &body_offset, &data_header_offset));
+ RETURN_NOT_OK(Write(&dst, &metadata_length, &body_length));
*size = dst.GetExtentBytesWritten();
return Status::OK();
}
@@ -228,6 +257,7 @@ class RecordBatchWriter {
// Do not copy this vector. Ownership must be retained elsewhere
const std::vector<std::shared_ptr<Array>>* columns_;
int32_t num_rows_;
+ int64_t buffer_start_offset_;
std::vector<flatbuf::FieldNode> field_nodes_;
std::vector<flatbuf::Buffer> buffer_meta_;
@@ -236,18 +266,17 @@ class RecordBatchWriter {
};
Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
- int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
- int64_t* header_end_offset, int max_recursion_depth) {
+ int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
+ int32_t* metadata_length, int64_t* body_length, int max_recursion_depth) {
DCHECK_GT(max_recursion_depth, 0);
- RecordBatchWriter serializer(columns, num_rows, max_recursion_depth);
- RETURN_NOT_OK(serializer.AssemblePayload());
- return serializer.Write(dst, body_end_offset, header_end_offset);
+ RecordBatchWriter serializer(
+ columns, num_rows, buffer_start_offset, max_recursion_depth);
+ return serializer.Write(dst, metadata_length, body_length);
}
Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
RecordBatchWriter serializer(
- batch->columns(), batch->num_rows(), kMaxIpcRecursionDepth);
- RETURN_NOT_OK(serializer.AssemblePayload());
+ batch->columns(), batch->num_rows(), 0, kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}
@@ -255,30 +284,33 @@ Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size) {
// ----------------------------------------------------------------------
// Record batch read path
-class RecordBatchReader::RecordBatchReaderImpl {
+class RecordBatchReader {
public:
- RecordBatchReaderImpl(io::ReadableFileInterface* file,
- const std::shared_ptr<RecordBatchMessage>& metadata, int max_recursion_depth)
- : file_(file), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
+ RecordBatchReader(const std::shared_ptr<RecordBatchMetadata>& metadata,
+ const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+ io::ReadableFileInterface* file)
+ : metadata_(metadata),
+ schema_(schema),
+ max_recursion_depth_(max_recursion_depth),
+ file_(file) {
num_buffers_ = metadata->num_buffers();
num_flattened_fields_ = metadata->num_fields();
}
- Status AssembleBatch(
- const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
- std::vector<std::shared_ptr<Array>> arrays(schema->num_fields());
+ Status Read(std::shared_ptr<RecordBatch>* out) {
+ std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields());
// The field_index and buffer_index are incremented in NextArray based on
// how much of the batch is "consumed" (through nested data reconstruction,
// for example)
field_index_ = 0;
buffer_index_ = 0;
- for (int i = 0; i < schema->num_fields(); ++i) {
- const Field* field = schema->field(i).get();
+ for (int i = 0; i < schema_->num_fields(); ++i) {
+ const Field* field = schema_->field(i).get();
RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i]));
}
- *out = std::make_shared<RecordBatch>(schema, metadata_->length(), arrays);
+ *out = std::make_shared<RecordBatch>(schema_, metadata_->length(), arrays);
return Status::OK();
}
@@ -370,67 +402,56 @@ class RecordBatchReader::RecordBatchReaderImpl {
Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
BufferMetadata metadata = metadata_->buffer(buffer_index);
- RETURN_NOT_OK(CheckMultipleOf64(metadata.length));
- return file_->ReadAt(metadata.offset, metadata.length, out);
+
+ if (metadata.length == 0) {
+ *out = std::make_shared<Buffer>(nullptr, 0);
+ return Status::OK();
+ } else {
+ return file_->ReadAt(metadata.offset, metadata.length, out);
+ }
}
private:
+ std::shared_ptr<RecordBatchMetadata> metadata_;
+ std::shared_ptr<Schema> schema_;
+ int max_recursion_depth_;
io::ReadableFileInterface* file_;
- std::shared_ptr<RecordBatchMessage> metadata_;
int field_index_;
int buffer_index_;
- int max_recursion_depth_;
int num_buffers_;
int num_flattened_fields_;
};
-Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
- std::shared_ptr<RecordBatchReader>* out) {
- return Open(file, offset, kMaxIpcRecursionDepth, out);
-}
-
-Status RecordBatchReader::Open(io::ReadableFileInterface* file, int64_t offset,
- int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out) {
+Status ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
+ io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata) {
std::shared_ptr<Buffer> buffer;
- RETURN_NOT_OK(file->ReadAt(offset - sizeof(int32_t), sizeof(int32_t), &buffer));
-
- int32_t metadata_size = *reinterpret_cast<const int32_t*>(buffer->data());
+ RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));
- if (metadata_size + static_cast<int>(sizeof(int32_t)) > offset) {
- return Status::Invalid("metadata size invalid");
- }
-
- // Read the metadata
- RETURN_NOT_OK(
- file->ReadAt(offset - metadata_size - sizeof(int32_t), metadata_size, &buffer));
-
- // TODO(wesm): buffer slicing here would be better in case ReadAt returns
- // allocated memory
-
- std::shared_ptr<Message> message;
- RETURN_NOT_OK(Message::Open(buffer, &message));
+ int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
- if (message->type() != Message::RECORD_BATCH) {
- return Status::Invalid("Metadata message is not a record batch");
+ if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
+ std::stringstream ss;
+ ss << "flatbuffer size " << metadata_length << " invalid. File offset: " << offset
+ << ", metadata length: " << metadata_length;
+ return Status::Invalid(ss.str());
}
- std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
-
- std::shared_ptr<RecordBatchReader> result(new RecordBatchReader());
- result->impl_.reset(new RecordBatchReaderImpl(file, batch_meta, max_recursion_depth));
- *out = result;
-
+ *metadata = std::make_shared<RecordBatchMetadata>(buffer, sizeof(int32_t));
return Status::OK();
}
-// Here the explicit destructor is required for compilers to be aware of
-// the complete information of RecordBatchReader::RecordBatchReaderImpl class
-RecordBatchReader::~RecordBatchReader() {}
+Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+ const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
+ std::shared_ptr<RecordBatch>* out) {
+ return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out);
+}
-Status RecordBatchReader::GetRecordBatch(
- const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out) {
- return impl_->AssembleBatch(schema, out);
+Status ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+ const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+ io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out) {
+ RecordBatchReader reader(metadata, schema, max_recursion_depth, file);
+ return reader.Read(out);
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index b02de28..963b9ee 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -43,7 +43,7 @@ class OutputStream;
namespace ipc {
-class RecordBatchMessage;
+class RecordBatchMetadata;
// ----------------------------------------------------------------------
// Write path
@@ -51,22 +51,30 @@ class RecordBatchMessage;
// TODO(emkornfield) investigate this more
constexpr int kMaxIpcRecursionDepth = 64;
-// Write the RecordBatch (collection of equal-length Arrow arrays) to the output
-// stream
+// Write the RecordBatch (collection of equal-length Arrow arrays) to the
+// output stream in a contiguous block. The record batch metadata is written as
+// a flatbuffer (see format/Message.fbs -- the RecordBatch message type)
+// prefixed by its size, followed by each of the memory buffers in the batch
+// written end to end (with appropriate alignment and padding):
//
-// First, each of the memory buffers are written out end-to-end
-//
-// Then, this function writes the batch metadata as a flatbuffer (see
-// format/Message.fbs -- the RecordBatch message type) like so:
-//
-// <int32: metadata size> <uint8*: metadata>
+// <int32: metadata size> <uint8*: metadata> <buffers>
//
// Finally, the absolute offsets (relative to the start of the output stream)
// to the end of the body and end of the metadata / data header (suffixed by
// the header size) is returned in out-variables
+//
+// @param(in) buffer_start_offset: the start offset to use in the buffer metadata,
+// default should be 0
+//
+// @param(out) metadata_length: the size of the length-prefixed flatbuffer
+// including padding to a 64-byte boundary
+//
+// @param(out) body_length: the size of the contiguous buffer block plus
+// padding bytes
ARROW_EXPORT Status WriteRecordBatch(const std::vector<std::shared_ptr<Array>>& columns,
- int32_t num_rows, io::OutputStream* dst, int64_t* body_end_offset,
- int64_t* header_end_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
+ int32_t num_rows, int64_t buffer_start_offset, io::OutputStream* dst,
+ int32_t* metadata_length, int64_t* body_length,
+ int max_recursion_depth = kMaxIpcRecursionDepth);
// int64_t GetRecordBatchMetadata(const RecordBatch* batch);
@@ -78,27 +86,20 @@ ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch* batch, int64_t* size);
// ----------------------------------------------------------------------
// "Read" path; does not copy data if the input supports zero copy reads
-class ARROW_EXPORT RecordBatchReader {
- public:
- // The offset is the absolute position to the *end* of the record batch data
- // header
- static Status Open(io::ReadableFileInterface* file, int64_t offset,
- std::shared_ptr<RecordBatchReader>* out);
-
- static Status Open(io::ReadableFileInterface* file, int64_t offset,
- int max_recursion_depth, std::shared_ptr<RecordBatchReader>* out);
-
- virtual ~RecordBatchReader();
-
- // Reassemble the record batch. A Schema is required to be able to construct
- // the right array containers
- Status GetRecordBatch(
- const std::shared_ptr<Schema>& schema, std::shared_ptr<RecordBatch>* out);
-
- private:
- class RecordBatchReaderImpl;
- std::unique_ptr<RecordBatchReaderImpl> impl_;
-};
+// Read the record batch flatbuffer metadata starting at the indicated file offset
+//
+// The flatbuffer is expected to be length-prefixed, so the metadata_length
+// includes at least the length prefix and the flatbuffer
+Status ARROW_EXPORT ReadRecordBatchMetadata(int64_t offset, int32_t metadata_length,
+ io::ReadableFileInterface* file, std::shared_ptr<RecordBatchMetadata>* metadata);
+
+Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+ const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
+ std::shared_ptr<RecordBatch>* out);
+
+Status ARROW_EXPORT ReadRecordBatch(const std::shared_ptr<RecordBatchMetadata>& metadata,
+ const std::shared_ptr<Schema>& schema, int max_recursion_depth,
+ io::ReadableFileInterface* file, std::shared_ptr<RecordBatch>* out);
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc
index c68244d..06001cc 100644
--- a/cpp/src/arrow/ipc/file.cc
+++ b/cpp/src/arrow/ipc/file.cc
@@ -23,6 +23,7 @@
#include <vector>
#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
#include "arrow/ipc/adapter.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
@@ -87,19 +88,19 @@ Status FileWriter::WriteRecordBatch(
int64_t offset = position_;
- int64_t body_end_offset;
- int64_t header_end_offset;
+ // There may be padding ever the end of the metadata, so we cannot rely on
+ // position_
+ int32_t metadata_length;
+ int64_t body_length;
+
+ // Frame of reference in file format is 0, see ARROW-384
+ const int64_t buffer_start_offset = 0;
RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
- columns, num_rows, sink_, &body_end_offset, &header_end_offset));
+ columns, num_rows, buffer_start_offset, sink_, &metadata_length, &body_length));
RETURN_NOT_OK(UpdatePosition());
DCHECK(position_ % 8 == 0) << "ipc::WriteRecordBatch did not perform aligned writes";
- // There may be padding ever the end of the metadata, so we cannot rely on
- // position_
- int32_t metadata_length = header_end_offset - body_end_offset;
- int32_t body_length = body_end_offset - offset;
-
// Append metadata, to be written in the footer later
record_batches_.emplace_back(offset, metadata_length, body_length);
@@ -198,12 +199,18 @@ Status FileReader::GetRecordBatch(int i, std::shared_ptr<RecordBatch>* batch) {
DCHECK_GE(i, 0);
DCHECK_LT(i, num_record_batches());
FileBlock block = footer_->record_batch(i);
- int64_t metadata_end_offset = block.offset + block.body_length + block.metadata_length;
- std::shared_ptr<RecordBatchReader> reader;
- RETURN_NOT_OK(RecordBatchReader::Open(file_.get(), metadata_end_offset, &reader));
+ std::shared_ptr<RecordBatchMetadata> metadata;
+ RETURN_NOT_OK(ReadRecordBatchMetadata(
+ block.offset, block.metadata_length, file_.get(), &metadata));
+
+ // TODO(wesm): ARROW-388 -- the buffer frame of reference is 0 (see
+ // ARROW-384).
+ std::shared_ptr<Buffer> buffer_block;
+ RETURN_NOT_OK(file_->Read(block.body_length, &buffer_block));
+ io::BufferReader reader(buffer_block);
- return reader->GetRecordBatch(schema_, batch);
+ return ReadRecordBatch(metadata, schema_, &reader, batch);
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index f5611d4..1accfde 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -54,17 +54,24 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
std::string path = "test-write-row-batch";
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
- int64_t body_end_offset;
- int64_t header_end_offset;
+ int32_t metadata_length;
+ int64_t body_length;
- RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), mmap_.get(),
- &body_end_offset, &header_end_offset));
+ const int64_t buffer_offset = 0;
- std::shared_ptr<RecordBatchReader> reader;
- RETURN_NOT_OK(RecordBatchReader::Open(mmap_.get(), header_end_offset, &reader));
+ RETURN_NOT_OK(WriteRecordBatch(batch.columns(), batch.num_rows(), buffer_offset,
+ mmap_.get(), &metadata_length, &body_length));
- RETURN_NOT_OK(reader->GetRecordBatch(batch.schema(), batch_result));
- return Status::OK();
+ std::shared_ptr<RecordBatchMetadata> metadata;
+ RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+
+ // The buffer offsets start at 0, so we must construct a
+ // ReadableFileInterface according to that frame of reference
+ std::shared_ptr<Buffer> buffer_payload;
+ RETURN_NOT_OK(mmap_->ReadAt(metadata_length, body_length, &buffer_payload));
+ io::BufferReader buffer_reader(buffer_payload);
+
+ return ReadRecordBatch(metadata, batch.schema(), &buffer_reader, batch_result);
}
protected:
@@ -96,11 +103,11 @@ INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
ipc::MockOutputStream mock;
- int64_t mock_header_offset = -1;
- int64_t mock_body_offset = -1;
+ int32_t mock_metadata_length = -1;
+ int64_t mock_body_length = -1;
int64_t size = -1;
- ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), &mock,
- &mock_body_offset, &mock_header_offset));
+ ASSERT_OK(WriteRecordBatch(batch->columns(), batch->num_rows(), 0, &mock,
+ &mock_metadata_length, &mock_body_length));
ASSERT_OK(GetRecordBatchSize(batch.get(), &size));
ASSERT_EQ(mock.GetExtentBytesWritten(), size);
}
@@ -129,39 +136,36 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
void SetUp() { pool_ = default_memory_pool(); }
void TearDown() { io::MemoryMapFixture::TearDown(); }
- Status WriteToMmap(int recursion_level, bool override_level,
- int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = nullptr) {
+ Status WriteToMmap(int recursion_level, bool override_level, int32_t* metadata_length,
+ int64_t* body_length, std::shared_ptr<Schema>* schema) {
const int batch_length = 5;
- TypePtr type = kInt32;
+ TypePtr type = int32();
ArrayPtr array;
const bool include_nulls = true;
RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
for (int i = 0; i < recursion_level; ++i) {
- type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+ type = list(type);
RETURN_NOT_OK(
MakeRandomListArray(array, batch_length, include_nulls, pool_, &array));
}
- auto f0 = std::make_shared<Field>("f0", type);
- std::shared_ptr<Schema> schema(new Schema({f0}));
- if (schema_out != nullptr) { *schema_out = schema; }
+ auto f0 = field("f0", type);
+
+ *schema = std::shared_ptr<Schema>(new Schema({f0}));
+
std::vector<ArrayPtr> arrays = {array};
- auto batch = std::make_shared<RecordBatch>(schema, batch_length, arrays);
+ auto batch = std::make_shared<RecordBatch>(*schema, batch_length, arrays);
std::string path = "test-write-past-max-recursion";
const int memory_map_size = 1 << 16;
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
- int64_t body_offset;
- int64_t header_offset;
-
- int64_t* header_out_param = header_out == nullptr ? &header_offset : header_out;
if (override_level) {
- return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
- &body_offset, header_out_param, recursion_level + 1);
+ return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
+ metadata_length, body_length, recursion_level + 1);
} else {
- return WriteRecordBatch(batch->columns(), batch->num_rows(), mmap_.get(),
- &body_offset, header_out_param);
+ return WriteRecordBatch(batch->columns(), batch->num_rows(), 0, mmap_.get(),
+ metadata_length, body_length);
}
}
@@ -171,18 +175,29 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
};
TEST_F(RecursionLimits, WriteLimit) {
- ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false));
+ int32_t metadata_length = -1;
+ int64_t body_length = -1;
+ std::shared_ptr<Schema> schema;
+ ASSERT_RAISES(
+ Invalid, WriteToMmap((1 << 8) + 1, false, &metadata_length, &body_length, &schema));
}
TEST_F(RecursionLimits, ReadLimit) {
- int64_t header_offset = -1;
+ int32_t metadata_length = -1;
+ int64_t body_length = -1;
std::shared_ptr<Schema> schema;
- ASSERT_OK(WriteToMmap(64, true, &header_offset, &schema));
+ ASSERT_OK(WriteToMmap(64, true, &metadata_length, &body_length, &schema));
- std::shared_ptr<RecordBatchReader> reader;
- ASSERT_OK(RecordBatchReader::Open(mmap_.get(), header_offset, &reader));
- std::shared_ptr<RecordBatch> batch_result;
- ASSERT_RAISES(Invalid, reader->GetRecordBatch(schema, &batch_result));
+ std::shared_ptr<RecordBatchMetadata> metadata;
+ ASSERT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
+
+ std::shared_ptr<Buffer> payload;
+ ASSERT_OK(mmap_->ReadAt(metadata_length, body_length, &payload));
+
+ io::BufferReader reader(payload);
+
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_RAISES(Invalid, ReadRecordBatch(metadata, schema, &reader, &batch));
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-file-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-file-test.cc b/cpp/src/arrow/ipc/ipc-file-test.cc
index cd424bf..a1feac4 100644
--- a/cpp/src/arrow/ipc/ipc-file-test.cc
+++ b/cpp/src/arrow/ipc/ipc-file-test.cc
@@ -68,7 +68,7 @@ class TestFileFormat : public ::testing::TestWithParam<MakeRecordBatch*> {
RETURN_NOT_OK(sink_->Tell(&footer_offset));
// Open the file
- auto reader = std::make_shared<io::BufferReader>(buffer_->data(), buffer_->size());
+ auto reader = std::make_shared<io::BufferReader>(buffer_);
RETURN_NOT_OK(FileReader::Open(reader, footer_offset, &file_reader_));
EXPECT_EQ(num_batches, file_reader_->num_record_batches());
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index a51371c..e5c3a08 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -284,19 +284,23 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
"name": "foo",
"type": {"name": "int", "isSigned": true, "bitWidth": 32},
"nullable": true, "children": [],
- "typeLayout": [
- {"type": "VALIDITY", "typeBitWidth": 1},
- {"type": "DATA", "typeBitWidth": 32}
- ]
+ "typeLayout": {
+ "vectors": [
+ {"type": "VALIDITY", "typeBitWidth": 1},
+ {"type": "DATA", "typeBitWidth": 32}
+ ]
+ }
},
{
"name": "bar",
"type": {"name": "floatingpoint", "precision": "DOUBLE"},
"nullable": true, "children": [],
- "typeLayout": [
- {"type": "VALIDITY", "typeBitWidth": 1},
- {"type": "DATA", "typeBitWidth": 64}
- ]
+ "typeLayout": {
+ "vectors": [
+ {"type": "VALIDITY", "typeBitWidth": 1},
+ {"type": "DATA", "typeBitWidth": 64}
+ ]
+ }
}
]
},
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/ipc-metadata-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-metadata-test.cc b/cpp/src/arrow/ipc/ipc-metadata-test.cc
index 1dc3969..d29583f 100644
--- a/cpp/src/arrow/ipc/ipc-metadata-test.cc
+++ b/cpp/src/arrow/ipc/ipc-metadata-test.cc
@@ -43,7 +43,7 @@ static inline void assert_schema_equal(const Schema* lhs, const Schema* rhs) {
}
}
-class TestSchemaMessage : public ::testing::Test {
+class TestSchemaMetadata : public ::testing::Test {
public:
void SetUp() {}
@@ -52,11 +52,11 @@ class TestSchemaMessage : public ::testing::Test {
ASSERT_OK(WriteSchema(schema, &buffer));
std::shared_ptr<Message> message;
- ASSERT_OK(Message::Open(buffer, &message));
+ ASSERT_OK(Message::Open(buffer, 0, &message));
ASSERT_EQ(Message::SCHEMA, message->type());
- std::shared_ptr<SchemaMessage> schema_msg = message->GetSchema();
+ auto schema_msg = std::make_shared<SchemaMetadata>(message);
ASSERT_EQ(schema->num_fields(), schema_msg->num_fields());
std::shared_ptr<Schema> schema2;
@@ -68,7 +68,7 @@ class TestSchemaMessage : public ::testing::Test {
const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
-TEST_F(TestSchemaMessage, PrimitiveFields) {
+TEST_F(TestSchemaMetadata, PrimitiveFields) {
auto f0 = std::make_shared<Field>("f0", std::make_shared<Int8Type>());
auto f1 = std::make_shared<Field>("f1", std::make_shared<Int16Type>());
auto f2 = std::make_shared<Field>("f2", std::make_shared<Int32Type>());
@@ -85,7 +85,7 @@ TEST_F(TestSchemaMessage, PrimitiveFields) {
CheckRoundtrip(&schema);
}
-TEST_F(TestSchemaMessage, NestedFields) {
+TEST_F(TestSchemaMetadata, NestedFields) {
auto type = std::make_shared<ListType>(std::make_shared<Int32Type>());
auto f0 = std::make_shared<Field>("f0", type);
@@ -111,7 +111,7 @@ class TestFileFooter : public ::testing::Test {
std::unique_ptr<FileFooter> footer;
ASSERT_OK(FileFooter::Open(buffer, &footer));
- ASSERT_EQ(MetadataVersion::V1_SNAPSHOT, footer->version());
+ ASSERT_EQ(MetadataVersion::V2, footer->version());
// Check schema
std::shared_ptr<Schema> schema2;
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 5eff899..7a313f7 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -255,19 +255,23 @@ static const char* JSON_EXAMPLE = R"example(
"name": "foo",
"type": {"name": "int", "isSigned": true, "bitWidth": 32},
"nullable": true, "children": [],
- "typeLayout": [
- {"type": "VALIDITY", "typeBitWidth": 1},
- {"type": "DATA", "typeBitWidth": 32}
- ]
+ "typeLayout": {
+ "vectors": [
+ {"type": "VALIDITY", "typeBitWidth": 1},
+ {"type": "DATA", "typeBitWidth": 32}
+ ]
+ }
},
{
"name": "bar",
"type": {"name": "floatingpoint", "precision": "DOUBLE"},
"nullable": true, "children": [],
- "typeLayout": [
- {"type": "VALIDITY", "typeBitWidth": 1},
- {"type": "DATA", "typeBitWidth": 64}
- ]
+ "typeLayout": {
+ "vectors": [
+ {"type": "VALIDITY", "typeBitWidth": 1},
+ {"type": "DATA", "typeBitWidth": 64}
+ ]
+ }
}
]
},
@@ -301,10 +305,12 @@ static const char* JSON_EXAMPLE2 = R"example(
"name": "foo",
"type": {"name": "int", "isSigned": true, "bitWidth": 32},
"nullable": true, "children": [],
- "typeLayout": [
- {"type": "VALIDITY", "typeBitWidth": 1},
- {"type": "DATA", "typeBitWidth": 32}
- ]
+ "typeLayout": {
+ "vectors": [
+ {"type": "VALIDITY", "typeBitWidth": 1},
+ {"type": "DATA", "typeBitWidth": 32}
+ ]
+ }
}
]
},
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index 31fe35b..e56bcb3 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -45,8 +45,6 @@ namespace ipc {
using RjArray = rj::Value::ConstArray;
using RjObject = rj::Value::ConstObject;
-enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
-
static std::string GetBufferTypeName(BufferType type) {
switch (type) {
case BufferType::DATA:
@@ -93,27 +91,6 @@ static std::string GetTimeUnitName(TimeUnit unit) {
return "UNKNOWN";
}
-class BufferLayout {
- public:
- BufferLayout(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
-
- BufferType type() const { return type_; }
- int bit_width() const { return bit_width_; }
-
- private:
- BufferType type_;
- int bit_width_;
-};
-
-static const BufferLayout kValidityBuffer(BufferType::VALIDITY, 1);
-static const BufferLayout kOffsetBuffer(BufferType::OFFSET, 32);
-static const BufferLayout kTypeBuffer(BufferType::TYPE, 32);
-static const BufferLayout kBooleanBuffer(BufferType::DATA, 1);
-static const BufferLayout kValues64(BufferType::DATA, 64);
-static const BufferLayout kValues32(BufferType::DATA, 32);
-static const BufferLayout kValues16(BufferType::DATA, 16);
-static const BufferLayout kValues8(BufferType::DATA, 8);
-
class JsonSchemaWriter : public TypeVisitor {
public:
explicit JsonSchemaWriter(const Schema& schema, RjWriter* writer)
@@ -154,9 +131,9 @@ class JsonSchemaWriter : public TypeVisitor {
}
template <typename T>
- typename std::enable_if<std::is_base_of<NoExtraMeta, T>::value ||
- std::is_base_of<BooleanType, T>::value ||
- std::is_base_of<NullType, T>::value,
+ typename std::enable_if<
+ std::is_base_of<NoExtraMeta, T>::value || std::is_base_of<BooleanType, T>::value ||
+ std::is_base_of<DateType, T>::value || std::is_base_of<NullType, T>::value,
void>::type
WriteTypeMetadata(const T& type) {}
@@ -243,11 +220,10 @@ class JsonSchemaWriter : public TypeVisitor {
}
template <typename T>
- Status WritePrimitive(const std::string& typeclass, const T& type,
- const std::vector<BufferLayout>& buffer_layout) {
+ Status WritePrimitive(const std::string& typeclass, const T& type) {
WriteName(typeclass, type);
SetNoChildren();
- WriteBufferLayout(buffer_layout);
+ WriteBufferLayout(type.GetBufferLayout());
return Status::OK();
}
@@ -255,15 +231,17 @@ class JsonSchemaWriter : public TypeVisitor {
Status WriteVarBytes(const std::string& typeclass, const T& type) {
WriteName(typeclass, type);
SetNoChildren();
- WriteBufferLayout({kValidityBuffer, kOffsetBuffer, kValues8});
+ WriteBufferLayout(type.GetBufferLayout());
return Status::OK();
}
- void WriteBufferLayout(const std::vector<BufferLayout>& buffer_layout) {
+ void WriteBufferLayout(const std::vector<BufferDescr>& buffer_layout) {
writer_->Key("typeLayout");
+ writer_->StartObject();
+ writer_->Key("vectors");
writer_->StartArray();
- for (const BufferLayout& buffer : buffer_layout) {
+ for (const BufferDescr& buffer : buffer_layout) {
writer_->StartObject();
writer_->Key("type");
writer_->String(GetBufferTypeName(buffer.type()));
@@ -274,6 +252,7 @@ class JsonSchemaWriter : public TypeVisitor {
writer_->EndObject();
}
writer_->EndArray();
+ writer_->EndObject();
}
Status WriteChildren(const std::vector<std::shared_ptr<Field>>& children) {
@@ -286,74 +265,52 @@ class JsonSchemaWriter : public TypeVisitor {
return Status::OK();
}
- Status Visit(const NullType& type) override { return WritePrimitive("null", type, {}); }
+ Status Visit(const NullType& type) override { return WritePrimitive("null", type); }
- Status Visit(const BooleanType& type) override {
- return WritePrimitive("bool", type, {kValidityBuffer, kBooleanBuffer});
- }
+ Status Visit(const BooleanType& type) override { return WritePrimitive("bool", type); }
- Status Visit(const Int8Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues8});
- }
+ Status Visit(const Int8Type& type) override { return WritePrimitive("int", type); }
- Status Visit(const Int16Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues16});
- }
+ Status Visit(const Int16Type& type) override { return WritePrimitive("int", type); }
- Status Visit(const Int32Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues32});
- }
+ Status Visit(const Int32Type& type) override { return WritePrimitive("int", type); }
- Status Visit(const Int64Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues64});
- }
+ Status Visit(const Int64Type& type) override { return WritePrimitive("int", type); }
- Status Visit(const UInt8Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues8});
- }
+ Status Visit(const UInt8Type& type) override { return WritePrimitive("int", type); }
- Status Visit(const UInt16Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues16});
- }
+ Status Visit(const UInt16Type& type) override { return WritePrimitive("int", type); }
- Status Visit(const UInt32Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues32});
- }
+ Status Visit(const UInt32Type& type) override { return WritePrimitive("int", type); }
- Status Visit(const UInt64Type& type) override {
- return WritePrimitive("int", type, {kValidityBuffer, kValues64});
- }
+ Status Visit(const UInt64Type& type) override { return WritePrimitive("int", type); }
Status Visit(const HalfFloatType& type) override {
- return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues16});
+ return WritePrimitive("floatingpoint", type);
}
Status Visit(const FloatType& type) override {
- return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues32});
+ return WritePrimitive("floatingpoint", type);
}
Status Visit(const DoubleType& type) override {
- return WritePrimitive("floatingpoint", type, {kValidityBuffer, kValues64});
+ return WritePrimitive("floatingpoint", type);
}
Status Visit(const StringType& type) override { return WriteVarBytes("utf8", type); }
Status Visit(const BinaryType& type) override { return WriteVarBytes("binary", type); }
- Status Visit(const DateType& type) override {
- return WritePrimitive("date", type, {kValidityBuffer, kValues64});
- }
+ Status Visit(const DateType& type) override { return WritePrimitive("date", type); }
- Status Visit(const TimeType& type) override {
- return WritePrimitive("time", type, {kValidityBuffer, kValues64});
- }
+ Status Visit(const TimeType& type) override { return WritePrimitive("time", type); }
Status Visit(const TimestampType& type) override {
- return WritePrimitive("timestamp", type, {kValidityBuffer, kValues64});
+ return WritePrimitive("timestamp", type);
}
Status Visit(const IntervalType& type) override {
- return WritePrimitive("interval", type, {kValidityBuffer, kValues64});
+ return WritePrimitive("interval", type);
}
Status Visit(const DecimalType& type) override { return Status::NotImplemented("NYI"); }
@@ -361,26 +318,21 @@ class JsonSchemaWriter : public TypeVisitor {
Status Visit(const ListType& type) override {
WriteName("list", type);
RETURN_NOT_OK(WriteChildren(type.children()));
- WriteBufferLayout({kValidityBuffer, kOffsetBuffer});
+ WriteBufferLayout(type.GetBufferLayout());
return Status::OK();
}
Status Visit(const StructType& type) override {
WriteName("struct", type);
WriteChildren(type.children());
- WriteBufferLayout({kValidityBuffer, kTypeBuffer});
+ WriteBufferLayout(type.GetBufferLayout());
return Status::OK();
}
Status Visit(const UnionType& type) override {
WriteName("union", type);
WriteChildren(type.children());
-
- if (type.mode == UnionMode::SPARSE) {
- WriteBufferLayout({kValidityBuffer, kTypeBuffer});
- } else {
- WriteBufferLayout({kValidityBuffer, kTypeBuffer, kOffsetBuffer});
- }
+ WriteBufferLayout(type.GetBufferLayout());
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index 7102012..b995228 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -37,20 +37,6 @@ namespace flatbuf = org::apache::arrow::flatbuf;
namespace ipc {
-const std::shared_ptr<DataType> BOOL = std::make_shared<BooleanType>();
-const std::shared_ptr<DataType> INT8 = std::make_shared<Int8Type>();
-const std::shared_ptr<DataType> INT16 = std::make_shared<Int16Type>();
-const std::shared_ptr<DataType> INT32 = std::make_shared<Int32Type>();
-const std::shared_ptr<DataType> INT64 = std::make_shared<Int64Type>();
-const std::shared_ptr<DataType> UINT8 = std::make_shared<UInt8Type>();
-const std::shared_ptr<DataType> UINT16 = std::make_shared<UInt16Type>();
-const std::shared_ptr<DataType> UINT32 = std::make_shared<UInt32Type>();
-const std::shared_ptr<DataType> UINT64 = std::make_shared<UInt64Type>();
-const std::shared_ptr<DataType> FLOAT = std::make_shared<FloatType>();
-const std::shared_ptr<DataType> DOUBLE = std::make_shared<DoubleType>();
-const std::shared_ptr<DataType> STRING = std::make_shared<StringType>();
-const std::shared_ptr<DataType> BINARY = std::make_shared<BinaryType>();
-
static Status IntFromFlatbuffer(
const flatbuf::Int* int_data, std::shared_ptr<DataType>* out) {
if (int_data->bitWidth() > 64) {
@@ -62,16 +48,16 @@ static Status IntFromFlatbuffer(
switch (int_data->bitWidth()) {
case 8:
- *out = int_data->is_signed() ? INT8 : UINT8;
+ *out = int_data->is_signed() ? int8() : uint8();
break;
case 16:
- *out = int_data->is_signed() ? INT16 : UINT16;
+ *out = int_data->is_signed() ? int16() : uint16();
break;
case 32:
- *out = int_data->is_signed() ? INT32 : UINT32;
+ *out = int_data->is_signed() ? int32() : uint32();
break;
case 64:
- *out = int_data->is_signed() ? INT64 : UINT64;
+ *out = int_data->is_signed() ? int64() : uint64();
break;
default:
return Status::NotImplemented("Integers not in cstdint are not implemented");
@@ -81,10 +67,12 @@ static Status IntFromFlatbuffer(
static Status FloatFromFlatuffer(
const flatbuf::FloatingPoint* float_data, std::shared_ptr<DataType>* out) {
- if (float_data->precision() == flatbuf::Precision_SINGLE) {
- *out = FLOAT;
+ if (float_data->precision() == flatbuf::Precision_HALF) {
+ *out = float16();
+ } else if (float_data->precision() == flatbuf::Precision_SINGLE) {
+ *out = float32();
} else {
- *out = DOUBLE;
+ *out = float64();
}
return Status::OK();
}
@@ -100,13 +88,13 @@ static Status TypeFromFlatbuffer(flatbuf::Type type, const void* type_data,
return FloatFromFlatuffer(
static_cast<const flatbuf::FloatingPoint*>(type_data), out);
case flatbuf::Type_Binary:
- *out = BINARY;
+ *out = binary();
return Status::OK();
case flatbuf::Type_Utf8:
- *out = STRING;
+ *out = utf8();
return Status::OK();
case flatbuf::Type_Bool:
- *out = BOOL;
+ *out = boolean();
return Status::OK();
case flatbuf::Type_Decimal:
case flatbuf::Type_Timestamp:
@@ -164,7 +152,32 @@ static Status StructToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type
break;
static Status TypeToFlatbuffer(FBB& fbb, const std::shared_ptr<DataType>& type,
- std::vector<FieldOffset>* children, flatbuf::Type* out_type, Offset* offset) {
+ std::vector<FieldOffset>* children, std::vector<VectorLayoutOffset>* layout,
+ flatbuf::Type* out_type, Offset* offset) {
+ std::vector<BufferDescr> buffer_layout = type->GetBufferLayout();
+ for (const BufferDescr& descr : buffer_layout) {
+ flatbuf::VectorType vector_type;
+ switch (descr.type()) {
+ case BufferType::OFFSET:
+ vector_type = flatbuf::VectorType_OFFSET;
+ break;
+ case BufferType::DATA:
+ vector_type = flatbuf::VectorType_DATA;
+ break;
+ case BufferType::VALIDITY:
+ vector_type = flatbuf::VectorType_VALIDITY;
+ break;
+ case BufferType::TYPE:
+ vector_type = flatbuf::VectorType_TYPE;
+ break;
+ default:
+ vector_type = flatbuf::VectorType_DATA;
+ break;
+ }
+ auto offset = flatbuf::CreateVectorLayout(fbb, descr.bit_width(), vector_type);
+ layout->push_back(offset);
+ }
+
switch (type->type) {
case Type::BOOL:
*out_type = flatbuf::Type_Bool;
@@ -223,14 +236,18 @@ static Status FieldToFlatbuffer(
flatbuf::Type type_enum;
Offset type_data;
+ Offset type_layout;
std::vector<FieldOffset> children;
+ std::vector<VectorLayoutOffset> layout;
- RETURN_NOT_OK(TypeToFlatbuffer(fbb, field->type, &children, &type_enum, &type_data));
+ RETURN_NOT_OK(
+ TypeToFlatbuffer(fbb, field->type, &children, &layout, &type_enum, &type_data));
auto fb_children = fbb.CreateVector(children);
+ auto fb_layout = fbb.CreateVector(layout);
// TODO: produce the list of VectorTypes
*offset = flatbuf::CreateField(fbb, fb_name, field->nullable, type_enum, type_data,
- field->dictionary, fb_children);
+ field->dictionary, fb_children, fb_layout);
return Status::OK();
}
@@ -300,13 +317,26 @@ Status MessageBuilder::SetRecordBatch(int32_t length, int64_t body_length,
return Status::OK();
}
-Status WriteDataHeader(int32_t length, int64_t body_length,
+Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out) {
- MessageBuilder message;
- RETURN_NOT_OK(message.SetRecordBatch(length, body_length, nodes, buffers));
- RETURN_NOT_OK(message.Finish());
- return message.GetBuffer(out);
+ flatbuffers::FlatBufferBuilder fbb;
+
+ auto batch = flatbuf::CreateRecordBatch(
+ fbb, length, fbb.CreateVectorOfStructs(nodes), fbb.CreateVectorOfStructs(buffers));
+
+ fbb.Finish(batch);
+
+ int32_t size = fbb.GetSize();
+
+ auto result = std::make_shared<PoolBuffer>();
+ RETURN_NOT_OK(result->Resize(size));
+
+ uint8_t* dst = result->mutable_data();
+ memcpy(dst, fbb.GetBufferPointer(), size);
+
+ *out = result;
+ return Status::OK();
}
Status MessageBuilder::Finish() {
@@ -317,17 +347,13 @@ Status MessageBuilder::Finish() {
}
Status MessageBuilder::GetBuffer(std::shared_ptr<Buffer>* out) {
- // The message buffer is suffixed by the size of the complete flatbuffer as
- // int32_t
- // <uint8_t*: flatbuffer data><int32_t: flatbuffer size>
int32_t size = fbb_.GetSize();
auto result = std::make_shared<PoolBuffer>();
- RETURN_NOT_OK(result->Resize(size + sizeof(int32_t)));
+ RETURN_NOT_OK(result->Resize(size));
uint8_t* dst = result->mutable_data();
memcpy(dst, fbb_.GetBufferPointer(), size);
- memcpy(dst + size, reinterpret_cast<int32_t*>(&size), sizeof(int32_t));
*out = result;
return Status::OK();
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index c404cfd..4826ebe 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -41,10 +41,10 @@ namespace ipc {
using FBB = flatbuffers::FlatBufferBuilder;
using FieldOffset = flatbuffers::Offset<arrow::flatbuf::Field>;
+using VectorLayoutOffset = flatbuffers::Offset<arrow::flatbuf::VectorLayout>;
using Offset = flatbuffers::Offset<void>;
-static constexpr flatbuf::MetadataVersion kMetadataVersion =
- flatbuf::MetadataVersion_V1_SNAPSHOT;
+static constexpr flatbuf::MetadataVersion kMetadataVersion = flatbuf::MetadataVersion_V2;
Status FieldFromFlatbuffer(const flatbuf::Field* field, std::shared_ptr<Field>* out);
@@ -70,7 +70,7 @@ class MessageBuilder {
flatbuffers::FlatBufferBuilder fbb_;
};
-Status WriteDataHeader(int32_t length, int64_t body_length,
+Status WriteRecordBatchMetadata(int32_t length, int64_t body_length,
const std::vector<flatbuf::FieldNode>& nodes,
const std::vector<flatbuf::Buffer>& buffers, std::shared_ptr<Buffer>* out);
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 66df8a6..44d3939 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -50,9 +50,15 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out) {
class Message::MessageImpl {
public:
- explicit MessageImpl(
- const std::shared_ptr<Buffer>& buffer, const flatbuf::Message* message)
- : buffer_(buffer), message_(message) {}
+ explicit MessageImpl(const std::shared_ptr<Buffer>& buffer, int64_t offset)
+ : buffer_(buffer), offset_(offset), message_(nullptr) {}
+
+ Status Open() {
+ message_ = flatbuf::GetMessage(buffer_->data() + offset_);
+
+ // TODO(wesm): verify the message
+ return Status::OK();
+ }
Message::Type type() const {
switch (message_->header_type()) {
@@ -72,25 +78,23 @@ class Message::MessageImpl {
int64_t body_length() const { return message_->bodyLength(); }
private:
- // Owns the memory this message accesses
+ // Retain reference to memory
std::shared_ptr<Buffer> buffer_;
+ int64_t offset_;
const flatbuf::Message* message_;
};
-Message::Message() {}
-
-Status Message::Open(
- const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out) {
- std::shared_ptr<Message> result(new Message());
-
- const flatbuf::Message* message = flatbuf::GetMessage(buffer->data());
+Message::Message(const std::shared_ptr<Buffer>& buffer, int64_t offset) {
+ impl_.reset(new MessageImpl(buffer, offset));
+}
- // TODO(wesm): verify message
- result->impl_.reset(new MessageImpl(buffer, message));
- *out = result;
+Status Message::Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
+ std::shared_ptr<Message>* out) {
+ // ctor is private
- return Status::OK();
+ *out = std::shared_ptr<Message>(new Message(buffer, offset));
+ return (*out)->impl_->Open();
}
Message::Type Message::type() const {
@@ -101,20 +105,12 @@ int64_t Message::body_length() const {
return impl_->body_length();
}
-std::shared_ptr<Message> Message::get_shared_ptr() {
- return this->shared_from_this();
-}
-
-std::shared_ptr<SchemaMessage> Message::GetSchema() {
- return std::make_shared<SchemaMessage>(this->shared_from_this(), impl_->header());
-}
-
// ----------------------------------------------------------------------
-// SchemaMessage
+// SchemaMetadata
-class SchemaMessage::SchemaMessageImpl {
+class SchemaMetadata::SchemaMetadataImpl {
public:
- explicit SchemaMessageImpl(const void* schema)
+ explicit SchemaMetadataImpl(const void* schema)
: schema_(static_cast<const flatbuf::Schema*>(schema)) {}
const flatbuf::Field* field(int i) const { return schema_->fields()->Get(i); }
@@ -125,22 +121,29 @@ class SchemaMessage::SchemaMessageImpl {
const flatbuf::Schema* schema_;
};
-SchemaMessage::SchemaMessage(
- const std::shared_ptr<Message>& message, const void* schema) {
+SchemaMetadata::SchemaMetadata(
+ const std::shared_ptr<Message>& message, const void* flatbuf) {
+ message_ = message;
+ impl_.reset(new SchemaMetadataImpl(flatbuf));
+}
+
+SchemaMetadata::SchemaMetadata(const std::shared_ptr<Message>& message) {
message_ = message;
- impl_.reset(new SchemaMessageImpl(schema));
+ impl_.reset(new SchemaMetadataImpl(message->impl_->header()));
}
-int SchemaMessage::num_fields() const {
+SchemaMetadata::~SchemaMetadata() {}
+
+int SchemaMetadata::num_fields() const {
return impl_->num_fields();
}
-Status SchemaMessage::GetField(int i, std::shared_ptr<Field>* out) const {
+Status SchemaMetadata::GetField(int i, std::shared_ptr<Field>* out) const {
const flatbuf::Field* field = impl_->field(i);
return FieldFromFlatbuffer(field, out);
}
-Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const {
+Status SchemaMetadata::GetSchema(std::shared_ptr<Schema>* out) const {
std::vector<std::shared_ptr<Field>> fields(num_fields());
for (int i = 0; i < this->num_fields(); ++i) {
RETURN_NOT_OK(GetField(i, &fields[i]));
@@ -150,11 +153,11 @@ Status SchemaMessage::GetSchema(std::shared_ptr<Schema>* out) const {
}
// ----------------------------------------------------------------------
-// RecordBatchMessage
+// RecordBatchMetadata
-class RecordBatchMessage::RecordBatchMessageImpl {
+class RecordBatchMetadata::RecordBatchMetadataImpl {
public:
- explicit RecordBatchMessageImpl(const void* batch)
+ explicit RecordBatchMetadataImpl(const void* batch)
: batch_(static_cast<const flatbuf::RecordBatch*>(batch)) {
nodes_ = batch_->nodes();
buffers_ = batch_->buffers();
@@ -176,19 +179,29 @@ class RecordBatchMessage::RecordBatchMessageImpl {
const flatbuffers::Vector<const flatbuf::Buffer*>* buffers_;
};
-std::shared_ptr<RecordBatchMessage> Message::GetRecordBatch() {
- return std::make_shared<RecordBatchMessage>(this->shared_from_this(), impl_->header());
+RecordBatchMetadata::RecordBatchMetadata(const std::shared_ptr<Message>& message) {
+ message_ = message;
+ impl_.reset(new RecordBatchMetadataImpl(message->impl_->header()));
}
-RecordBatchMessage::RecordBatchMessage(
- const std::shared_ptr<Message>& message, const void* batch) {
- message_ = message;
- impl_.reset(new RecordBatchMessageImpl(batch));
+RecordBatchMetadata::RecordBatchMetadata(
+ const std::shared_ptr<Buffer>& buffer, int64_t offset) {
+ message_ = nullptr;
+ buffer_ = buffer;
+
+ const flatbuf::RecordBatch* metadata =
+ flatbuffers::GetRoot<flatbuf::RecordBatch>(buffer->data() + offset);
+
+ // TODO(wesm): validate table
+
+ impl_.reset(new RecordBatchMetadataImpl(metadata));
}
+RecordBatchMetadata::~RecordBatchMetadata() {}
+
// TODO(wesm): Copying the flatbuffer data isn't great, but this will do for
// now
-FieldMetadata RecordBatchMessage::field(int i) const {
+FieldMetadata RecordBatchMetadata::field(int i) const {
const flatbuf::FieldNode* node = impl_->field(i);
FieldMetadata result;
@@ -197,7 +210,7 @@ FieldMetadata RecordBatchMessage::field(int i) const {
return result;
}
-BufferMetadata RecordBatchMessage::buffer(int i) const {
+BufferMetadata RecordBatchMetadata::buffer(int i) const {
const flatbuf::Buffer* buffer = impl_->buffer(i);
BufferMetadata result;
@@ -207,15 +220,15 @@ BufferMetadata RecordBatchMessage::buffer(int i) const {
return result;
}
-int32_t RecordBatchMessage::length() const {
+int32_t RecordBatchMetadata::length() const {
return impl_->length();
}
-int RecordBatchMessage::num_buffers() const {
+int RecordBatchMetadata::num_buffers() const {
return impl_->num_buffers();
}
-int RecordBatchMessage::num_fields() const {
+int RecordBatchMetadata::num_fields() const {
return impl_->num_fields();
}
@@ -268,11 +281,13 @@ class FileFooter::FileFooterImpl {
MetadataVersion::type version() const {
switch (footer_->version()) {
- case flatbuf::MetadataVersion_V1_SNAPSHOT:
- return MetadataVersion::V1_SNAPSHOT;
+ case flatbuf::MetadataVersion_V1:
+ return MetadataVersion::V1;
+ case flatbuf::MetadataVersion_V2:
+ return MetadataVersion::V2;
// Add cases as other versions become available
default:
- return MetadataVersion::V1_SNAPSHOT;
+ return MetadataVersion::V2;
}
}
@@ -285,7 +300,7 @@ class FileFooter::FileFooterImpl {
}
Status GetSchema(std::shared_ptr<Schema>* out) const {
- auto schema_msg = std::make_shared<SchemaMessage>(nullptr, footer_->schema());
+ auto schema_msg = std::make_shared<SchemaMetadata>(nullptr, footer_->schema());
return schema_msg->GetSchema(out);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index 2f0e853..1c4ef64 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -42,7 +42,7 @@ class OutputStream;
namespace ipc {
struct MetadataVersion {
- enum type { V1_SNAPSHOT };
+ enum type { V1, V2 };
};
//----------------------------------------------------------------------
@@ -58,10 +58,14 @@ Status WriteSchema(const Schema* schema, std::shared_ptr<Buffer>* out);
class Message;
// Container for serialized Schema metadata contained in an IPC message
-class ARROW_EXPORT SchemaMessage {
+class ARROW_EXPORT SchemaMetadata {
public:
+ explicit SchemaMetadata(const std::shared_ptr<Message>& message);
+
// Accepts an opaque flatbuffer pointer
- SchemaMessage(const std::shared_ptr<Message>& message, const void* schema);
+ SchemaMetadata(const std::shared_ptr<Message>& message, const void* schema);
+
+ ~SchemaMetadata();
int num_fields() const;
@@ -76,8 +80,8 @@ class ARROW_EXPORT SchemaMessage {
// Parent, owns the flatbuffer data
std::shared_ptr<Message> message_;
- class SchemaMessageImpl;
- std::unique_ptr<SchemaMessageImpl> impl_;
+ class SchemaMetadataImpl;
+ std::unique_ptr<SchemaMetadataImpl> impl_;
};
// Field metadata
@@ -93,10 +97,13 @@ struct BufferMetadata {
};
// Container for serialized record batch metadata contained in an IPC message
-class ARROW_EXPORT RecordBatchMessage {
+class ARROW_EXPORT RecordBatchMetadata {
public:
- // Accepts an opaque flatbuffer pointer
- RecordBatchMessage(const std::shared_ptr<Message>& message, const void* batch_meta);
+ explicit RecordBatchMetadata(const std::shared_ptr<Message>& message);
+
+ RecordBatchMetadata(const std::shared_ptr<Buffer>& message, int64_t offset);
+
+ ~RecordBatchMetadata();
FieldMetadata field(int i) const;
BufferMetadata buffer(int i) const;
@@ -108,37 +115,34 @@ class ARROW_EXPORT RecordBatchMessage {
private:
// Parent, owns the flatbuffer data
std::shared_ptr<Message> message_;
+ std::shared_ptr<Buffer> buffer_;
- class RecordBatchMessageImpl;
- std::unique_ptr<RecordBatchMessageImpl> impl_;
+ class RecordBatchMetadataImpl;
+ std::unique_ptr<RecordBatchMetadataImpl> impl_;
};
-class ARROW_EXPORT DictionaryBatchMessage {
+class ARROW_EXPORT DictionaryBatchMetadata {
public:
int64_t id() const;
- std::unique_ptr<RecordBatchMessage> data() const;
+ std::unique_ptr<RecordBatchMetadata> data() const;
};
-class ARROW_EXPORT Message : public std::enable_shared_from_this<Message> {
+class ARROW_EXPORT Message {
public:
enum Type { NONE, SCHEMA, DICTIONARY_BATCH, RECORD_BATCH };
- static Status Open(
- const std::shared_ptr<Buffer>& buffer, std::shared_ptr<Message>* out);
-
- std::shared_ptr<Message> get_shared_ptr();
+ static Status Open(const std::shared_ptr<Buffer>& buffer, int64_t offset,
+ std::shared_ptr<Message>* out);
int64_t body_length() const;
Type type() const;
- // These methods only to be invoked if you have checked the message type
- std::shared_ptr<SchemaMessage> GetSchema();
- std::shared_ptr<RecordBatchMessage> GetRecordBatch();
- std::shared_ptr<DictionaryBatchMessage> GetDictionaryBatch();
-
private:
- Message();
+ Message(const std::shared_ptr<Buffer>& buffer, int64_t offset);
+
+ friend class RecordBatchMetadata;
+ friend class SchemaMetadata;
// Hide serialization details from user API
class MessageImpl;
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 9abc20d..65b3782 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -39,8 +39,7 @@
namespace arrow {
namespace ipc {
-const auto kInt32 = std::make_shared<Int32Type>();
-const auto kListInt32 = list(kInt32);
+const auto kListInt32 = list(int32());
const auto kListListInt32 = list(kListInt32);
Status MakeRandomInt32Array(
@@ -99,8 +98,8 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
const int length = 1000;
// Make the schema
- auto f0 = std::make_shared<Field>("f0", kInt32);
- auto f1 = std::make_shared<Field>("f1", kInt32);
+ auto f0 = std::make_shared<Field>("f0", int32());
+ auto f1 = std::make_shared<Field>("f1", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1}));
// Example data
@@ -161,7 +160,7 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = std::make_shared<Field>("f0", kListInt32);
auto f1 = std::make_shared<Field>("f1", kListListInt32);
- auto f2 = std::make_shared<Field>("f2", kInt32);
+ auto f2 = std::make_shared<Field>("f2", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
// Example data
@@ -184,7 +183,7 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = std::make_shared<Field>("f0", kListInt32);
auto f1 = std::make_shared<Field>("f1", kListListInt32);
- auto f2 = std::make_shared<Field>("f2", kInt32);
+ auto f2 = std::make_shared<Field>("f2", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
// Example data
@@ -205,7 +204,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
auto f0 = std::make_shared<Field>("f0", kListInt32);
auto f1 = std::make_shared<Field>("f1", kListListInt32);
- auto f2 = std::make_shared<Field>("f2", kInt32);
+ auto f2 = std::make_shared<Field>("f2", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
// Example data
@@ -226,7 +225,7 @@ Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) {
const int batch_length = 5;
- TypePtr type = kInt32;
+ TypePtr type = int32();
MemoryPool* pool = default_memory_pool();
ArrayPtr array;
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/ipc/util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/util.h b/cpp/src/arrow/ipc/util.h
index 9000d1b..242d662 100644
--- a/cpp/src/arrow/ipc/util.h
+++ b/cpp/src/arrow/ipc/util.h
@@ -28,12 +28,10 @@ namespace arrow {
namespace ipc {
// Align on 8-byte boundaries
-static constexpr int kArrowAlignment = 8;
-
// Buffers are padded to 64-byte boundaries (for SIMD)
-static constexpr int kArrowBufferAlignment = 64;
+static constexpr int kArrowAlignment = 64;
-static constexpr uint8_t kPaddingBytes[kArrowBufferAlignment] = {0};
+static constexpr uint8_t kPaddingBytes[kArrowAlignment] = {0};
static inline int64_t PaddedLength(int64_t nbytes, int64_t alignment = kArrowAlignment) {
return ((nbytes + alignment - 1) / alignment) * alignment;
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 93dd5b6..63c2166 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -61,10 +61,10 @@
// Alias MSVC popcount to GCC name
#ifdef _MSC_VER
-# include <intrin.h>
-# define __builtin_popcount __popcnt
-# include <nmmintrin.h>
-# define __builtin_popcountll _mm_popcnt_u64
+#include <intrin.h>
+#define __builtin_popcount __popcnt
+#include <nmmintrin.h>
+#define __builtin_popcountll _mm_popcnt_u64
#endif
namespace arrow {
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 589bdad..80f295c 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -105,10 +105,6 @@ std::string UnionType::ToString() const {
return s.str();
}
-int NullType::bit_width() const {
- return 0;
-}
-
std::string NullType::ToString() const {
return name();
}
@@ -187,4 +183,46 @@ std::shared_ptr<Field> field(
return std::make_shared<Field>(name, type, nullable, dictionary);
}
+static const BufferDescr kValidityBuffer(BufferType::VALIDITY, 1);
+static const BufferDescr kOffsetBuffer(BufferType::OFFSET, 32);
+static const BufferDescr kTypeBuffer(BufferType::TYPE, 32);
+static const BufferDescr kBooleanBuffer(BufferType::DATA, 1);
+static const BufferDescr kValues64(BufferType::DATA, 64);
+static const BufferDescr kValues32(BufferType::DATA, 32);
+static const BufferDescr kValues16(BufferType::DATA, 16);
+static const BufferDescr kValues8(BufferType::DATA, 8);
+
+std::vector<BufferDescr> FixedWidthType::GetBufferLayout() const {
+ return {kValidityBuffer, BufferDescr(BufferType::DATA, bit_width())};
+}
+
+std::vector<BufferDescr> NullType::GetBufferLayout() const {
+ return {};
+}
+
+std::vector<BufferDescr> BinaryType::GetBufferLayout() const {
+ return {kValidityBuffer, kOffsetBuffer, kValues8};
+}
+
+std::vector<BufferDescr> ListType::GetBufferLayout() const {
+ return {kValidityBuffer, kOffsetBuffer};
+}
+
+std::vector<BufferDescr> StructType::GetBufferLayout() const {
+ return {kValidityBuffer, kTypeBuffer};
+}
+
+std::vector<BufferDescr> UnionType::GetBufferLayout() const {
+ if (mode == UnionMode::SPARSE) {
+ return {kValidityBuffer, kTypeBuffer};
+ } else {
+ return {kValidityBuffer, kTypeBuffer, kOffsetBuffer};
+ }
+}
+
+std::vector<BufferDescr> DecimalType::GetBufferLayout() const {
+ // TODO(wesm)
+ return {};
+}
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 876d7ea..3077738 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -101,6 +101,20 @@ struct Type {
};
};
+enum class BufferType : char { DATA, OFFSET, TYPE, VALIDITY };
+
+class BufferDescr {
+ public:
+ BufferDescr(BufferType type, int bit_width) : type_(type), bit_width_(bit_width) {}
+
+ BufferType type() const { return type_; }
+ int bit_width() const { return bit_width_; }
+
+ private:
+ BufferType type_;
+ int bit_width_;
+};
+
struct ARROW_EXPORT DataType {
Type::type type;
@@ -129,12 +143,18 @@ struct ARROW_EXPORT DataType {
virtual Status Accept(TypeVisitor* visitor) const = 0;
virtual std::string ToString() const = 0;
+
+ virtual std::vector<BufferDescr> GetBufferLayout() const = 0;
};
typedef std::shared_ptr<DataType> TypePtr;
-struct ARROW_EXPORT FixedWidthMeta {
+struct ARROW_EXPORT FixedWidthType : public DataType {
+ using DataType::DataType;
+
virtual int bit_width() const = 0;
+
+ std::vector<BufferDescr> GetBufferLayout() const override;
};
struct ARROW_EXPORT IntegerMeta {
@@ -184,12 +204,12 @@ struct ARROW_EXPORT Field {
};
typedef std::shared_ptr<Field> FieldPtr;
-struct ARROW_EXPORT PrimitiveCType : public DataType {
- using DataType::DataType;
+struct ARROW_EXPORT PrimitiveCType : public FixedWidthType {
+ using FixedWidthType::FixedWidthType;
};
template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
-struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta {
+struct ARROW_EXPORT CTypeImpl : public PrimitiveCType {
using c_type = C_TYPE;
static constexpr Type::type type_id = TYPE_ID;
@@ -204,16 +224,17 @@ struct ARROW_EXPORT CTypeImpl : public PrimitiveCType, public FixedWidthMeta {
std::string ToString() const override { return std::string(DERIVED::name()); }
};
-struct ARROW_EXPORT NullType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT NullType : public DataType {
static constexpr Type::type type_id = Type::NA;
NullType() : DataType(Type::NA) {}
- int bit_width() const override;
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override;
static std::string name() { return "null"; }
+
+ std::vector<BufferDescr> GetBufferLayout() const override;
};
template <typename DERIVED, Type::type TYPE_ID, typename C_TYPE>
@@ -221,10 +242,10 @@ struct IntegerTypeImpl : public CTypeImpl<DERIVED, TYPE_ID, C_TYPE>, public Inte
bool is_signed() const override { return std::is_signed<C_TYPE>::value; }
};
-struct ARROW_EXPORT BooleanType : public DataType, FixedWidthMeta {
+struct ARROW_EXPORT BooleanType : public FixedWidthType {
static constexpr Type::type type_id = Type::BOOL;
- BooleanType() : DataType(Type::BOOL) {}
+ BooleanType() : FixedWidthType(Type::BOOL) {}
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override;
@@ -306,6 +327,8 @@ struct ARROW_EXPORT ListType : public DataType, public NoExtraMeta {
std::string ToString() const override;
static std::string name() { return "list"; }
+
+ std::vector<BufferDescr> GetBufferLayout() const override;
};
// BinaryType type is reprsents lists of 1-byte values.
@@ -318,6 +341,8 @@ struct ARROW_EXPORT BinaryType : public DataType, public NoExtraMeta {
std::string ToString() const override;
static std::string name() { return "binary"; }
+ std::vector<BufferDescr> GetBufferLayout() const override;
+
protected:
// Allow subclasses to change the logical type.
explicit BinaryType(Type::type logical_type) : DataType(logical_type) {}
@@ -345,6 +370,8 @@ struct ARROW_EXPORT StructType : public DataType, public NoExtraMeta {
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override;
static std::string name() { return "struct"; }
+
+ std::vector<BufferDescr> GetBufferLayout() const override;
};
struct ARROW_EXPORT DecimalType : public DataType {
@@ -358,6 +385,8 @@ struct ARROW_EXPORT DecimalType : public DataType {
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override;
static std::string name() { return "decimal"; }
+
+ std::vector<BufferDescr> GetBufferLayout() const override;
};
enum class UnionMode : char { SPARSE, DENSE };
@@ -375,14 +404,20 @@ struct ARROW_EXPORT UnionType : public DataType {
static std::string name() { return "union"; }
Status Accept(TypeVisitor* visitor) const override;
+ std::vector<BufferDescr> GetBufferLayout() const override;
+
UnionMode mode;
std::vector<uint8_t> type_ids;
};
-struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta {
+struct ARROW_EXPORT DateType : public FixedWidthType {
static constexpr Type::type type_id = Type::DATE;
- DateType() : DataType(Type::DATE) {}
+ using c_type = int32_t;
+
+ DateType() : FixedWidthType(Type::DATE) {}
+
+ int bit_width() const override { return sizeof(c_type) * 8; }
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override { return name(); }
@@ -391,13 +426,17 @@ struct ARROW_EXPORT DateType : public DataType, public NoExtraMeta {
enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 };
-struct ARROW_EXPORT TimeType : public DataType {
+struct ARROW_EXPORT TimeType : public FixedWidthType {
static constexpr Type::type type_id = Type::TIME;
using Unit = TimeUnit;
+ using c_type = int64_t;
TimeUnit unit;
- explicit TimeType(TimeUnit unit = TimeUnit::MILLI) : DataType(Type::TIME), unit(unit) {}
+ int bit_width() const override { return sizeof(c_type) * 8; }
+
+ explicit TimeType(TimeUnit unit = TimeUnit::MILLI)
+ : FixedWidthType(Type::TIME), unit(unit) {}
TimeType(const TimeType& other) : TimeType(other.unit) {}
Status Accept(TypeVisitor* visitor) const override;
@@ -405,7 +444,7 @@ struct ARROW_EXPORT TimeType : public DataType {
static std::string name() { return "time"; }
};
-struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT TimestampType : public FixedWidthType {
using Unit = TimeUnit;
typedef int64_t c_type;
@@ -416,7 +455,7 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
TimeUnit unit;
explicit TimestampType(TimeUnit unit = TimeUnit::MILLI)
- : DataType(Type::TIMESTAMP), unit(unit) {}
+ : FixedWidthType(Type::TIMESTAMP), unit(unit) {}
TimestampType(const TimestampType& other) : TimestampType(other.unit) {}
@@ -425,10 +464,10 @@ struct ARROW_EXPORT TimestampType : public DataType, public FixedWidthMeta {
static std::string name() { return "timestamp"; }
};
-struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta {
+struct ARROW_EXPORT IntervalType : public FixedWidthType {
enum class Unit : char { YEAR_MONTH = 0, DAY_TIME = 1 };
- typedef int64_t c_type;
+ using c_type = int64_t;
static constexpr Type::type type_id = Type::INTERVAL;
int bit_width() const override { return sizeof(int64_t) * 8; }
@@ -436,7 +475,7 @@ struct ARROW_EXPORT IntervalType : public DataType, public FixedWidthMeta {
Unit unit;
explicit IntervalType(Unit unit = Unit::YEAR_MONTH)
- : DataType(Type::INTERVAL), unit(unit) {}
+ : FixedWidthType(Type::INTERVAL), unit(unit) {}
IntervalType(const IntervalType& other) : IntervalType(other.unit) {}
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/types/primitive.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc
index 14667ee..f42a3ca 100644
--- a/cpp/src/arrow/types/primitive.cc
+++ b/cpp/src/arrow/types/primitive.cc
@@ -49,7 +49,7 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const {
const uint8_t* this_data = raw_data_;
const uint8_t* other_data = other.raw_data_;
- auto size_meta = dynamic_cast<const FixedWidthMeta*>(type_.get());
+ auto size_meta = dynamic_cast<const FixedWidthType*>(type_.get());
int value_byte_size = size_meta->bit_width() / 8;
DCHECK_GT(value_byte_size, 0);
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 13b7e19..5c8055f 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -78,6 +78,10 @@ static inline bool IsMultipleOf64(int64_t n) {
return (n & 63) == 0;
}
+static inline bool IsMultipleOf8(int64_t n) {
+ return (n & 7) == 0;
+}
+
inline int64_t RoundUpToMultipleOf64(int64_t num) {
// TODO(wesm): is this definitely needed?
// DCHECK_GE(num, 0);
http://git-wip-us.apache.org/repos/asf/arrow/blob/e3c167bd/dev/release/run-rat.sh
----------------------------------------------------------------------
diff --git a/dev/release/run-rat.sh b/dev/release/run-rat.sh
index d8ec650..e26dd58 100755
--- a/dev/release/run-rat.sh
+++ b/dev/release/run-rat.sh
@@ -28,6 +28,7 @@ $RAT $1 \
-e ".*" \
-e mman.h \
-e "*_generated.h" \
+ -e "*.json" \
-e random.h \
-e status.cc \
-e status.h \
@@ -49,5 +50,3 @@ else
echo "${UNAPPROVED} unapproved licences. Check rat report: rat.txt"
exit 1
fi
-
-