You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2016/04/18 19:44:45 UTC
arrow git commit: ARROW-82: Initial IPC support for ListArray
Repository: arrow
Updated Branches:
refs/heads/master 5843e6872 -> 0b472d860
ARROW-82: Initial IPC support for ListArray
This is a work in progress because I can't get clang-tidy to shut-up on parameterized test files (see last commit which I need to revert before merge). I'd like to make sure this is a clean build and make sure people are ok with these change. This PR also has a lot of collateral damage for small/large things I cleaned up my way to make this work. I tried to split the commits up logically but if people would prefer separate pull requests I can try to do that as well.
Open questions:
1. For supporting strings, binary, etc. I was thinking of changing thei4 type definitions to inherit from ListType, and to hard-code the child type. This would allow for simpler IPC code (all of the instantiation of types would happen in construct.h/.cc?) vs handling each of there types separately for IPC.
2. There are some TODOs I left sprinkled in the code and would like peoples thoughts on if they are urgent/worthwhile for following up on.
Open issues:
1. Supporting the rest of the List backed logical types
2. More unit tests for added functionality.
As part of this commit I also refactored the Builder interfaces a little bit for the following reasons:
1. It seems that if ArrayBuilder owns the bitmap it should be responsible for having methods to manipulate it.
2. This allows ListBuilder to use the parent class + a BufferBuilder instead of inheriting Int32Builder, which means it doesn't need to do strange length/capacity hacks.
Other misc things here:
1. Native popcount in test-util.h
2. Ability to build a new list on top of an existing by incrementally add offsets/sizes
3. Added missing types primitive types in construct.h for primitive.
Author: Micah Kornfield <em...@gmail.com>
Closes #59 from emkornfield/emk_list_ipc_PR and squashes the following commits:
0c5162d [Micah Kornfield] another format fix
0af558b [Micah Kornfield] remove a now unnecessary NOLINT, but mostly to trigger another travis-ci job that failed due to apt get issue
7789205 [Micah Kornfield] make clang-format-3.7 happy
6e57728 [Micah Kornfield] make format fixes
5e15815 [Micah Kornfield] fix make lint
8982723 [Micah Kornfield] remaining style cleanup
be04b3e [Micah Kornfield] add unit tests for zero length row batches and non-null batches. fix bugs
10e6651 [Micah Kornfield] add in maximum recursion depth, surfaced possible recursion issue with flatbuffers
3b219a1 [Micah Kornfield] Make append is_null parameter is_valid for api consistency
2e6c477 [Micah Kornfield] add missing RETURN_NOT_OK
e71810b [Micah Kornfield] make Resize and Init virtual on builder
8ab5315 [Micah Kornfield] make clang tidy ignore a little bit less hacky
53d37bc [Micah Kornfield] filter out ipc-adapter-test from tidy
8e464b5 [Micah Kornfield] Fixes per tidy and lint
aa0602c [Micah Kornfield] add potentially useful pool factories to test utils
39c57ed [Micah Kornfield] add potentially useful methods for generative arrays to ipc test-common
a2e1e52 [Micah Kornfield] native popcount
61b0481 [Micah Kornfield] small fixes to naming/style for c++ and potential bugs
5f87aef [Micah Kornfield] Refactor ipc-adapter-test to make it paramaterizable. add unit test for lists. make unit test pass and and construction method for list arrays
45e41c0 [Micah Kornfield] Make BufferBuilder more useable for appending primitives
1374485 [Micah Kornfield] augment python unittest to have null element in list
20f984b [Micah Kornfield] refactor primitive builders to use parent builders bitmap
3895d34 [Micah Kornfield] Refactor list builder to use ArrayBuilders bitmap methods and a separate buffer builder
01c50be [Micah Kornfield] Add utility methods for managing null bitmap directly to ArrayBuilder
cc7f851 [Micah Kornfield] add Validate method to array and implementation for ListArray
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/0b472d86
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/0b472d86
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/0b472d86
Branch: refs/heads/master
Commit: 0b472d860260f7063aee742939be23b921382741
Parents: 5843e68
Author: Micah Kornfield <em...@gmail.com>
Authored: Mon Apr 18 19:44:29 2016 +0200
Committer: Wes McKinney <we...@apache.org>
Committed: Mon Apr 18 19:44:29 2016 +0200
----------------------------------------------------------------------
cpp/CMakeLists.txt | 2 +-
cpp/README.md | 9 +-
cpp/src/.clang-tidy-ignore | 1 +
cpp/src/arrow/array.cc | 5 +
cpp/src/arrow/array.h | 6 +-
cpp/src/arrow/builder.cc | 56 ++++++++
cpp/src/arrow/builder.h | 46 ++++--
cpp/src/arrow/ipc/adapter.cc | 136 +++++++++++++-----
cpp/src/arrow/ipc/adapter.h | 11 +-
cpp/src/arrow/ipc/ipc-adapter-test.cc | 216 +++++++++++++++++++++++-----
cpp/src/arrow/ipc/memory.cc | 1 +
cpp/src/arrow/ipc/metadata-internal.cc | 3 +-
cpp/src/arrow/ipc/metadata-internal.h | 3 +-
cpp/src/arrow/ipc/metadata.cc | 3 +-
cpp/src/arrow/ipc/test-common.h | 67 +++++++++
cpp/src/arrow/parquet/schema.cc | 2 +-
cpp/src/arrow/schema.cc | 2 +-
cpp/src/arrow/test-util.h | 49 ++++++-
cpp/src/arrow/type.h | 2 +-
cpp/src/arrow/types/construct.cc | 43 +++++-
cpp/src/arrow/types/construct.h | 9 ++
cpp/src/arrow/types/list-test.cc | 80 ++++++++---
cpp/src/arrow/types/list.cc | 60 +++++++-
cpp/src/arrow/types/list.h | 112 +++++++++------
cpp/src/arrow/types/primitive-test.cc | 6 +-
cpp/src/arrow/types/primitive.cc | 45 +-----
cpp/src/arrow/types/primitive.h | 41 +++---
cpp/src/arrow/types/string.h | 5 +-
cpp/src/arrow/util/buffer.h | 59 ++++++--
cpp/src/arrow/util/logging.h | 2 +-
cpp/src/arrow/util/memory-pool.cc | 2 +-
python/pyarrow/tests/test_array.py | 5 +-
32 files changed, 839 insertions(+), 250 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index f803c0f..b38f91e 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -565,7 +565,7 @@ if (${CLANG_TIDY_FOUND})
`find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc | sed -e '/_generated/g'`)
# runs clang-tidy and exits with a non-zero exit code if any errors are found.
add_custom_target(check-clang-tidy ${BUILD_SUPPORT_DIR}/run-clang-tidy.sh ${CLANG_TIDY_BIN} ${CMAKE_BINARY_DIR}/compile_commands.json
- 0 `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc | sed -e '/_generated/g'`)
+ 0 `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc |grep -v -F -f ${CMAKE_CURRENT_SOURCE_DIR}/src/.clang-tidy-ignore | sed -e '/_generated/g'`)
endif()
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/README.md
----------------------------------------------------------------------
diff --git a/cpp/README.md b/cpp/README.md
index 3f5da21..c8cd86f 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -76,4 +76,11 @@ build failures by running the following checks before submitting your pull reque
Note that the clang-tidy target may take a while to run. You might consider
running clang-tidy separately on the files you have added/changed before
-invoking the make target to reduce iteration time.
+invoking the make target to reduce iteration time. Also, it might generate warnings
+that aren't valid. To avoid these you can use add a line comment `// NOLINT`. If
+NOLINT doesn't suppress the warnings, you add the file in question to
+the .clang-tidy-ignore file. This will allow `make check-clang-tidy` to pass in
+travis-CI (but still surface the potential warnings in `make clang-tidy`). Ideally,
+both of these options would be used rarely. Current known uses-cases whent hey are required:
+
+* Parameterized tests in google test.
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/.clang-tidy-ignore
----------------------------------------------------------------------
diff --git a/cpp/src/.clang-tidy-ignore b/cpp/src/.clang-tidy-ignore
new file mode 100644
index 0000000..a128c38
--- /dev/null
+++ b/cpp/src/.clang-tidy-ignore
@@ -0,0 +1 @@
+ipc-adapter-test.cc
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index a153686..c6b9b15 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -20,6 +20,7 @@
#include <cstdint>
#include "arrow/util/buffer.h"
+#include "arrow/util/status.h"
namespace arrow {
@@ -47,6 +48,10 @@ bool Array::EqualsExact(const Array& other) const {
return true;
}
+Status Array::Validate() const {
+ return Status::OK();
+}
+
bool NullArray::Equals(const std::shared_ptr<Array>& arr) const {
if (this == arr.get()) { return true; }
if (Type::NA != arr->type_enum()) { return false; }
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index c6735f8..f98c4c2 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -28,6 +28,7 @@
namespace arrow {
class Buffer;
+class Status;
// Immutable data array with some logical type and some length. Any memory is
// owned by the respective Buffer instance (or its parents).
@@ -39,7 +40,7 @@ class Array {
Array(const std::shared_ptr<DataType>& type, int32_t length, int32_t null_count = 0,
const std::shared_ptr<Buffer>& null_bitmap = nullptr);
- virtual ~Array() {}
+ virtual ~Array() = default;
// Determine if a slot is null. For inner loops. Does *not* boundscheck
bool IsNull(int i) const {
@@ -58,6 +59,9 @@ class Array {
bool EqualsExact(const Array& arr) const;
virtual bool Equals(const std::shared_ptr<Array>& arr) const = 0;
+ // Determines if the array is internally consistent. Defaults to always
+ // returning Status::OK. This can be an expensive check.
+ virtual Status Validate() const;
protected:
std::shared_ptr<DataType> type_;
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 1447078..87c1219 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -25,6 +25,25 @@
namespace arrow {
+Status ArrayBuilder::AppendToBitmap(bool is_valid) {
+ if (length_ == capacity_) {
+ // If the capacity was not already a multiple of 2, do so here
+ // TODO(emkornfield) doubling isn't great default allocation practice
+ // see https://github.com/facebook/folly/blob/master/folly/docs/FBVector.md
+ // fo discussion
+ RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1)));
+ }
+ UnsafeAppendToBitmap(is_valid);
+ return Status::OK();
+}
+
+Status ArrayBuilder::AppendToBitmap(const uint8_t* valid_bytes, int32_t length) {
+ RETURN_NOT_OK(Reserve(length));
+
+ UnsafeAppendToBitmap(valid_bytes, length);
+ return Status::OK();
+}
+
Status ArrayBuilder::Init(int32_t capacity) {
capacity_ = capacity;
int32_t to_alloc = util::ceil_byte(capacity) / 8;
@@ -36,6 +55,7 @@ Status ArrayBuilder::Init(int32_t capacity) {
}
Status ArrayBuilder::Resize(int32_t new_bits) {
+ if (!null_bitmap_) { return Init(new_bits); }
int32_t new_bytes = util::ceil_byte(new_bits) / 8;
int32_t old_bytes = null_bitmap_->size();
RETURN_NOT_OK(null_bitmap_->Resize(new_bytes));
@@ -56,10 +76,46 @@ Status ArrayBuilder::Advance(int32_t elements) {
Status ArrayBuilder::Reserve(int32_t elements) {
if (length_ + elements > capacity_) {
+ // TODO(emkornfield) power of 2 growth is potentially suboptimal
int32_t new_capacity = util::next_power2(length_ + elements);
return Resize(new_capacity);
}
return Status::OK();
}
+Status ArrayBuilder::SetNotNull(int32_t length) {
+ RETURN_NOT_OK(Reserve(length));
+ UnsafeSetNotNull(length);
+ return Status::OK();
+}
+
+void ArrayBuilder::UnsafeAppendToBitmap(bool is_valid) {
+ if (is_valid) {
+ util::set_bit(null_bitmap_data_, length_);
+ } else {
+ ++null_count_;
+ }
+ ++length_;
+}
+
+void ArrayBuilder::UnsafeAppendToBitmap(const uint8_t* valid_bytes, int32_t length) {
+ if (valid_bytes == nullptr) {
+ UnsafeSetNotNull(length);
+ return;
+ }
+ for (int32_t i = 0; i < length; ++i) {
+ // TODO(emkornfield) Optimize for large values of length?
+ UnsafeAppendToBitmap(valid_bytes[i] > 0);
+ }
+}
+
+void ArrayBuilder::UnsafeSetNotNull(int32_t length) {
+ const int32_t new_length = length + length_;
+ // TODO(emkornfield) Optimize for large values of length?
+ for (int32_t i = length_; i < new_length; ++i) {
+ util::set_bit(null_bitmap_data_, i);
+ }
+ length_ = new_length;
+}
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index 21a6341..7d3f439 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -34,7 +34,10 @@ class PoolBuffer;
static constexpr int32_t MIN_BUILDER_CAPACITY = 1 << 5;
-// Base class for all data array builders
+// Base class for all data array builders.
+// This class provides a facilities for incrementally building the null bitmap
+// (see Append methods) and as a side effect the current number of slots and
+// the null count.
class ArrayBuilder {
public:
explicit ArrayBuilder(MemoryPool* pool, const TypePtr& type)
@@ -46,7 +49,7 @@ class ArrayBuilder {
length_(0),
capacity_(0) {}
- virtual ~ArrayBuilder() {}
+ virtual ~ArrayBuilder() = default;
// For nested types. Since the objects are owned by this class instance, we
// skip shared pointers and just return a raw pointer
@@ -58,14 +61,27 @@ class ArrayBuilder {
int32_t null_count() const { return null_count_; }
int32_t capacity() const { return capacity_; }
- // Allocates requires memory at this level, but children need to be
- // initialized independently
- Status Init(int32_t capacity);
+ // Append to null bitmap
+ Status AppendToBitmap(bool is_valid);
+ // Vector append. Treat each zero byte as a null. If valid_bytes is null
+ // assume all of length bits are valid.
+ Status AppendToBitmap(const uint8_t* valid_bytes, int32_t length);
+ // Set the next length bits to not null (i.e. valid).
+ Status SetNotNull(int32_t length);
- // Resizes the null_bitmap array
- Status Resize(int32_t new_bits);
+ // Allocates initial capacity requirements for the builder. In most
+ // cases subclasses should override and call there parent classes
+ // method as well.
+ virtual Status Init(int32_t capacity);
- Status Reserve(int32_t extra_bits);
+ // Resizes the null_bitmap array. In most
+ // cases subclasses should override and call there parent classes
+ // method as well.
+ virtual Status Resize(int32_t new_bits);
+
+ // Ensures there is enough space for adding the number of elements by checking
+ // capacity and calling Resize if necessary.
+ Status Reserve(int32_t elements);
// For cases where raw data was memcpy'd into the internal buffers, allows us
// to advance the length of the builder. It is your responsibility to use
@@ -75,7 +91,7 @@ class ArrayBuilder {
const std::shared_ptr<PoolBuffer>& null_bitmap() const { return null_bitmap_; }
// Creates new array object to hold the contents of the builder and transfers
- // ownership of the data
+ // ownership of the data. This resets all variables on the builder.
virtual std::shared_ptr<Array> Finish() = 0;
const std::shared_ptr<DataType>& type() const { return type_; }
@@ -97,6 +113,18 @@ class ArrayBuilder {
// Child value array builders. These are owned by this class
std::vector<std::unique_ptr<ArrayBuilder>> children_;
+ //
+ // Unsafe operations (don't check capacity/don't resize)
+ //
+
+ // Append to null bitmap.
+ void UnsafeAppendToBitmap(bool is_valid);
+ // Vector append. Treat each zero byte as a nullzero. If valid_bytes is null
+ // assume all of length bits are valid.
+ void UnsafeAppendToBitmap(const uint8_t* valid_bytes, int32_t length);
+ // Set the next length bits to not null (i.e. valid).
+ void UnsafeSetNotNull(int32_t length);
+
private:
DISALLOW_COPY_AND_ASSIGN(ArrayBuilder);
};
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index 2f72c3a..bf6fa94 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -19,17 +19,19 @@
#include <cstdint>
#include <cstring>
+#include <sstream>
#include <vector>
#include "arrow/array.h"
-#include "arrow/ipc/memory.h"
#include "arrow/ipc/Message_generated.h"
-#include "arrow/ipc/metadata.h"
+#include "arrow/ipc/memory.h"
#include "arrow/ipc/metadata-internal.h"
+#include "arrow/ipc/metadata.h"
#include "arrow/schema.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/types/construct.h"
+#include "arrow/types/list.h"
#include "arrow/types/primitive.h"
#include "arrow/util/buffer.h"
#include "arrow/util/logging.h"
@@ -63,44 +65,70 @@ static bool IsPrimitive(const DataType* type) {
}
}
+static bool IsListType(const DataType* type) {
+ DCHECK(type != nullptr);
+ switch (type->type) {
+ // TODO(emkornfield) grouping like this are used in a few places in the
+ // code consider using pattern like:
+ // http://stackoverflow.com/questions/26784685/c-macro-for-calling-function-based-on-enum-type
+ //
+ // TODO(emkornfield) Fix type systems so these are all considered lists and
+ // the types behave the same way?
+ // case Type::BINARY:
+ // case Type::CHAR:
+ case Type::LIST:
+ // see todo on common types
+ // case Type::STRING:
+ // case Type::VARCHAR:
+ return true;
+ default:
+ return false;
+ }
+}
+
// ----------------------------------------------------------------------
// Row batch write path
Status VisitArray(const Array* arr, std::vector<flatbuf::FieldNode>* field_nodes,
- std::vector<std::shared_ptr<Buffer>>* buffers) {
- if (IsPrimitive(arr->type().get())) {
- const PrimitiveArray* prim_arr = static_cast<const PrimitiveArray*>(arr);
-
- field_nodes->push_back(
- flatbuf::FieldNode(prim_arr->length(), prim_arr->null_count()));
+ std::vector<std::shared_ptr<Buffer>>* buffers, int max_recursion_depth) {
+ if (max_recursion_depth <= 0) { return Status::Invalid("Max recursion depth reached"); }
+ DCHECK(arr);
+ DCHECK(field_nodes);
+ // push back all common elements
+ field_nodes->push_back(flatbuf::FieldNode(arr->length(), arr->null_count()));
+ if (arr->null_count() > 0) {
+ buffers->push_back(arr->null_bitmap());
+ } else {
+ // Push a dummy zero-length buffer, not to be copied
+ buffers->push_back(std::make_shared<Buffer>(nullptr, 0));
+ }
- if (prim_arr->null_count() > 0) {
- buffers->push_back(prim_arr->null_bitmap());
- } else {
- // Push a dummy zero-length buffer, not to be copied
- buffers->push_back(std::make_shared<Buffer>(nullptr, 0));
- }
+ const DataType* arr_type = arr->type().get();
+ if (IsPrimitive(arr_type)) {
+ const auto prim_arr = static_cast<const PrimitiveArray*>(arr);
buffers->push_back(prim_arr->data());
- } else if (arr->type_enum() == Type::LIST) {
- // TODO(wesm)
- return Status::NotImplemented("List type");
+ } else if (IsListType(arr_type)) {
+ const auto list_arr = static_cast<const ListArray*>(arr);
+ buffers->push_back(list_arr->offset_buffer());
+ RETURN_NOT_OK(VisitArray(
+ list_arr->values().get(), field_nodes, buffers, max_recursion_depth - 1));
} else if (arr->type_enum() == Type::STRUCT) {
// TODO(wesm)
return Status::NotImplemented("Struct type");
}
-
return Status::OK();
}
class RowBatchWriter {
public:
- explicit RowBatchWriter(const RowBatch* batch) : batch_(batch) {}
+ RowBatchWriter(const RowBatch* batch, int max_recursion_depth)
+ : batch_(batch), max_recursion_depth_(max_recursion_depth) {}
Status AssemblePayload() {
// Perform depth-first traversal of the row-batch
for (int i = 0; i < batch_->num_columns(); ++i) {
const Array* arr = batch_->column(i).get();
- RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_));
+ RETURN_NOT_OK(VisitArray(arr, &field_nodes_, &buffers_, max_recursion_depth_));
}
return Status::OK();
}
@@ -111,8 +139,10 @@ class RowBatchWriter {
int64_t offset = 0;
for (size_t i = 0; i < buffers_.size(); ++i) {
const Buffer* buffer = buffers_[i].get();
- int64_t size = buffer->size();
+ int64_t size = 0;
+ // The buffer might be null if we are handling zero row lengths.
+ if (buffer) { size = buffer->size(); }
// TODO(wesm): We currently have no notion of shared memory page id's,
// but we've included it in the metadata IDL for when we have it in the
// future. Use page=0 for now
@@ -171,11 +201,13 @@ class RowBatchWriter {
std::vector<flatbuf::FieldNode> field_nodes_;
std::vector<flatbuf::Buffer> buffer_meta_;
std::vector<std::shared_ptr<Buffer>> buffers_;
+ int max_recursion_depth_;
};
-Status WriteRowBatch(
- MemorySource* dst, const RowBatch* batch, int64_t position, int64_t* header_offset) {
- RowBatchWriter serializer(batch);
+Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
+ int64_t* header_offset, int max_recursion_depth) {
+ DCHECK_GT(max_recursion_depth, 0);
+ RowBatchWriter serializer(batch, max_recursion_depth);
RETURN_NOT_OK(serializer.AssemblePayload());
return serializer.Write(dst, position, header_offset);
}
@@ -186,8 +218,9 @@ static constexpr int64_t INIT_METADATA_SIZE = 4096;
class RowBatchReader::Impl {
public:
- Impl(MemorySource* source, const std::shared_ptr<RecordBatchMessage>& metadata)
- : source_(source), metadata_(metadata) {
+ Impl(MemorySource* source, const std::shared_ptr<RecordBatchMessage>& metadata,
+ int max_recursion_depth)
+ : source_(source), metadata_(metadata), max_recursion_depth_(max_recursion_depth) {
num_buffers_ = metadata->num_buffers();
num_flattened_fields_ = metadata->num_fields();
}
@@ -203,7 +236,7 @@ class RowBatchReader::Impl {
buffer_index_ = 0;
for (int i = 0; i < schema->num_fields(); ++i) {
const Field* field = schema->field(i).get();
- RETURN_NOT_OK(NextArray(field, &arrays[i]));
+ RETURN_NOT_OK(NextArray(field, max_recursion_depth_, &arrays[i]));
}
*out = std::make_shared<RowBatch>(schema, metadata_->length(), arrays);
@@ -213,8 +246,12 @@ class RowBatchReader::Impl {
private:
// Traverse the flattened record batch metadata and reassemble the
// corresponding array containers
- Status NextArray(const Field* field, std::shared_ptr<Array>* out) {
- const std::shared_ptr<DataType>& type = field->type;
+ Status NextArray(
+ const Field* field, int max_recursion_depth, std::shared_ptr<Array>* out) {
+ const TypePtr& type = field->type;
+ if (max_recursion_depth <= 0) {
+ return Status::Invalid("Max recursion depth reached");
+ }
// pop off a field
if (field_index_ >= num_flattened_fields_) {
@@ -226,23 +263,42 @@ class RowBatchReader::Impl {
// we can skip that buffer without reading from shared memory
FieldMetadata field_meta = metadata_->field(field_index_++);
+ // extract null_bitmap which is common to all arrays
+ std::shared_ptr<Buffer> null_bitmap;
+ if (field_meta.null_count == 0) {
+ ++buffer_index_;
+ } else {
+ RETURN_NOT_OK(GetBuffer(buffer_index_++, &null_bitmap));
+ }
+
if (IsPrimitive(type.get())) {
- std::shared_ptr<Buffer> null_bitmap;
std::shared_ptr<Buffer> data;
- if (field_meta.null_count == 0) {
- null_bitmap = nullptr;
- ++buffer_index_;
- } else {
- RETURN_NOT_OK(GetBuffer(buffer_index_++, &null_bitmap));
- }
if (field_meta.length > 0) {
RETURN_NOT_OK(GetBuffer(buffer_index_++, &data));
} else {
+ buffer_index_++;
data.reset(new Buffer(nullptr, 0));
}
return MakePrimitiveArray(
type, field_meta.length, data, field_meta.null_count, null_bitmap, out);
}
+
+ if (IsListType(type.get())) {
+ std::shared_ptr<Buffer> offsets;
+ RETURN_NOT_OK(GetBuffer(buffer_index_++, &offsets));
+ const int num_children = type->num_children();
+ if (num_children != 1) {
+ std::stringstream ss;
+ ss << "Field: " << field->ToString()
+ << " has wrong number of children:" << num_children;
+ return Status::Invalid(ss.str());
+ }
+ std::shared_ptr<Array> values_array;
+ RETURN_NOT_OK(
+ NextArray(type->child(0).get(), max_recursion_depth - 1, &values_array));
+ return MakeListArray(type, field_meta.length, offsets, values_array,
+ field_meta.null_count, null_bitmap, out);
+ }
return Status::NotImplemented("Non-primitive types not complete yet");
}
@@ -256,12 +312,18 @@ class RowBatchReader::Impl {
int field_index_;
int buffer_index_;
+ int max_recursion_depth_;
int num_buffers_;
int num_flattened_fields_;
};
Status RowBatchReader::Open(
MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* out) {
+ return Open(source, position, kMaxIpcRecursionDepth, out);
+}
+
+Status RowBatchReader::Open(MemorySource* source, int64_t position,
+ int max_recursion_depth, std::shared_ptr<RowBatchReader>* out) {
std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(source->ReadAt(position, INIT_METADATA_SIZE, &metadata));
@@ -286,7 +348,7 @@ Status RowBatchReader::Open(
std::shared_ptr<RecordBatchMessage> batch_meta = message->GetRecordBatch();
std::shared_ptr<RowBatchReader> result(new RowBatchReader());
- result->impl_.reset(new Impl(source, batch_meta));
+ result->impl_.reset(new Impl(source, batch_meta, max_recursion_depth));
*out = result;
return Status::OK();
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index d453fa0..4c9a8a9 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -38,7 +38,9 @@ class RecordBatchMessage;
// ----------------------------------------------------------------------
// Write path
-
+// We have trouble decoding flatbuffers if the size i > 70, so 64 is a nice round number
+// TODO(emkornfield) investigate this more
+constexpr int kMaxIpcRecursionDepth = 64;
// Write the RowBatch (collection of equal-length Arrow arrays) to the memory
// source at the indicated position
//
@@ -52,8 +54,8 @@ class RecordBatchMessage;
//
// Finally, the memory offset to the start of the metadata / data header is
// returned in an out-variable
-Status WriteRowBatch(
- MemorySource* dst, const RowBatch* batch, int64_t position, int64_t* header_offset);
+Status WriteRowBatch(MemorySource* dst, const RowBatch* batch, int64_t position,
+ int64_t* header_offset, int max_recursion_depth = kMaxIpcRecursionDepth);
// int64_t GetRowBatchMetadata(const RowBatch* batch);
@@ -70,6 +72,9 @@ class RowBatchReader {
static Status Open(
MemorySource* source, int64_t position, std::shared_ptr<RowBatchReader>* out);
+ static Status Open(MemorySource* source, int64_t position, int max_recursion_depth,
+ std::shared_ptr<RowBatchReader>* out);
+
// Reassemble the row batch. A Schema is required to be able to construct the
// right array containers
Status GetRowBatch(
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index fbdda77..c243cfb 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -18,9 +18,7 @@
#include <cstdint>
#include <cstdio>
#include <cstring>
-#include <limits>
#include <memory>
-#include <random>
#include <string>
#include <vector>
@@ -31,6 +29,7 @@
#include "arrow/ipc/test-common.h"
#include "arrow/test-util.h"
+#include "arrow/types/list.h"
#include "arrow/types/primitive.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/buffer.h"
@@ -40,25 +39,56 @@
namespace arrow {
namespace ipc {
-class TestWriteRowBatch : public ::testing::Test, public MemoryMapFixture {
+// TODO(emkornfield) convert to google style kInt32, etc?
+const auto INT32 = std::make_shared<Int32Type>();
+const auto LIST_INT32 = std::make_shared<ListType>(INT32);
+const auto LIST_LIST_INT32 = std::make_shared<ListType>(LIST_INT32);
+
+typedef Status MakeRowBatch(std::shared_ptr<RowBatch>* out);
+
+class TestWriteRowBatch : public ::testing::TestWithParam<MakeRowBatch*>,
+ public MemoryMapFixture {
public:
void SetUp() { pool_ = default_memory_pool(); }
void TearDown() { MemoryMapFixture::TearDown(); }
- void InitMemoryMap(int64_t size) {
+ Status RoundTripHelper(const RowBatch& batch, int memory_map_size,
+ std::shared_ptr<RowBatch>* batch_result) {
std::string path = "test-write-row-batch";
- MemoryMapFixture::CreateFile(path, size);
- ASSERT_OK(MemoryMappedSource::Open(path, MemorySource::READ_WRITE, &mmap_));
+ MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+ int64_t header_location;
+ RETURN_NOT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));
+
+ std::shared_ptr<RowBatchReader> reader;
+ RETURN_NOT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
+
+ RETURN_NOT_OK(reader->GetRowBatch(batch.schema(), batch_result));
+ return Status::OK();
}
protected:
- MemoryPool* pool_;
std::shared_ptr<MemoryMappedSource> mmap_;
+ MemoryPool* pool_;
};
-const auto INT32 = std::make_shared<Int32Type>();
+TEST_P(TestWriteRowBatch, RoundTrip) {
+ std::shared_ptr<RowBatch> batch;
+ ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
+ std::shared_ptr<RowBatch> batch_result;
+ ASSERT_OK(RoundTripHelper(*batch, 1 << 16, &batch_result));
+
+ // do checks
+ ASSERT_TRUE(batch->schema()->Equals(batch_result->schema()));
+ ASSERT_EQ(batch->num_columns(), batch_result->num_columns())
+ << batch->schema()->ToString() << " result: " << batch_result->schema()->ToString();
+ EXPECT_EQ(batch->num_rows(), batch_result->num_rows());
+ for (int i = 0; i < batch->num_columns(); ++i) {
+ EXPECT_TRUE(batch->column(i)->Equals(batch_result->column(i)))
+ << "Idx: " << i << " Name: " << batch->column_name(i);
+ }
+}
-TEST_F(TestWriteRowBatch, IntegerRoundTrip) {
+Status MakeIntRowBatch(std::shared_ptr<RowBatch>* out) {
const int length = 1000;
// Make the schema
@@ -67,41 +97,159 @@ TEST_F(TestWriteRowBatch, IntegerRoundTrip) {
std::shared_ptr<Schema> schema(new Schema({f0, f1}));
// Example data
+ std::shared_ptr<Array> a0, a1;
+ MemoryPool* pool = default_memory_pool();
+ RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0));
+ RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1));
+ out->reset(new RowBatch(schema, length, {a0, a1}));
+ return Status::OK();
+}
- auto data = std::make_shared<PoolBuffer>(pool_);
- ASSERT_OK(data->Resize(length * sizeof(int32_t)));
- test::rand_uniform_int(length, 0, 0, std::numeric_limits<int32_t>::max(),
- reinterpret_cast<int32_t*>(data->mutable_data()));
+Status MakeListRowBatch(std::shared_ptr<RowBatch>* out) {
+ // Make the schema
+ auto f0 = std::make_shared<Field>("f0", LIST_INT32);
+ auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
+ auto f2 = std::make_shared<Field>("f2", INT32);
+ std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
- auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
- int null_bytes = util::bytes_for_bits(length);
- ASSERT_OK(null_bitmap->Resize(null_bytes));
- test::random_bytes(null_bytes, 0, null_bitmap->mutable_data());
+ // Example data
- auto a0 = std::make_shared<Int32Array>(length, data);
- auto a1 = std::make_shared<Int32Array>(
- length, data, test::bitmap_popcount(null_bitmap->data(), length), null_bitmap);
+ MemoryPool* pool = default_memory_pool();
+ const int length = 200;
+ std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
+ const bool include_nulls = true;
+ RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &leaf_values));
+ RETURN_NOT_OK(
+ MakeRandomListArray(leaf_values, length, include_nulls, pool, &list_array));
+ RETURN_NOT_OK(
+ MakeRandomListArray(list_array, length, include_nulls, pool, &list_list_array));
+ RETURN_NOT_OK(MakeRandomInt32Array(length, include_nulls, pool, &flat_array));
+ out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array}));
+ return Status::OK();
+}
- RowBatch batch(schema, length, {a0, a1});
+Status MakeZeroLengthRowBatch(std::shared_ptr<RowBatch>* out) {
+ // Make the schema
+ auto f0 = std::make_shared<Field>("f0", LIST_INT32);
+ auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
+ auto f2 = std::make_shared<Field>("f2", INT32);
+ std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
- // TODO(wesm): computing memory requirements for a row batch
- // 64k is plenty of space
- InitMemoryMap(1 << 16);
+ // Example data
+ MemoryPool* pool = default_memory_pool();
+ const int length = 200;
+ const bool include_nulls = true;
+ std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
+ RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values));
+ RETURN_NOT_OK(MakeRandomListArray(leaf_values, 0, include_nulls, pool, &list_array));
+ RETURN_NOT_OK(
+ MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array));
+ RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
+ out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array}));
+ return Status::OK();
+}
- int64_t header_location;
- ASSERT_OK(WriteRowBatch(mmap_.get(), &batch, 0, &header_location));
+Status MakeNonNullRowBatch(std::shared_ptr<RowBatch>* out) {
+ // Make the schema
+ auto f0 = std::make_shared<Field>("f0", LIST_INT32);
+ auto f1 = std::make_shared<Field>("f1", LIST_LIST_INT32);
+ auto f2 = std::make_shared<Field>("f2", INT32);
+ std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
- std::shared_ptr<RowBatchReader> result;
- ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &result));
+ // Example data
+ MemoryPool* pool = default_memory_pool();
+ const int length = 200;
+ std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
- std::shared_ptr<RowBatch> batch_result;
- ASSERT_OK(result->GetRowBatch(schema, &batch_result));
- EXPECT_EQ(batch.num_rows(), batch_result->num_rows());
+ RETURN_NOT_OK(MakeRandomInt32Array(1000, true, pool, &leaf_values));
+ bool include_nulls = false;
+ RETURN_NOT_OK(MakeRandomListArray(leaf_values, 50, include_nulls, pool, &list_array));
+ RETURN_NOT_OK(
+ MakeRandomListArray(list_array, 50, include_nulls, pool, &list_list_array));
+ RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
+ out->reset(new RowBatch(schema, length, {list_array, list_list_array, flat_array}));
+ return Status::OK();
+}
- for (int i = 0; i < batch.num_columns(); ++i) {
- EXPECT_TRUE(batch.column(i)->Equals(batch_result->column(i))) << i
- << batch.column_name(i);
+Status MakeDeeplyNestedList(std::shared_ptr<RowBatch>* out) {
+ const int batch_length = 5;
+ TypePtr type = INT32;
+
+ MemoryPool* pool = default_memory_pool();
+ ArrayPtr array;
+ const bool include_nulls = true;
+ RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool, &array));
+ for (int i = 0; i < 63; ++i) {
+ type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+ RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array));
+ }
+
+ auto f0 = std::make_shared<Field>("f0", type);
+ std::shared_ptr<Schema> schema(new Schema({f0}));
+ std::vector<ArrayPtr> arrays = {array};
+ out->reset(new RowBatch(schema, batch_length, arrays));
+ return Status::OK();
+}
+
+INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRowBatch,
+ ::testing::Values(&MakeIntRowBatch, &MakeListRowBatch, &MakeNonNullRowBatch,
+ &MakeZeroLengthRowBatch, &MakeDeeplyNestedList));
+
+class RecursionLimits : public ::testing::Test, public MemoryMapFixture {
+ public:
+ void SetUp() { pool_ = default_memory_pool(); }
+ void TearDown() { MemoryMapFixture::TearDown(); }
+
+ Status WriteToMmap(int recursion_level, bool override_level,
+ int64_t* header_out = nullptr, std::shared_ptr<Schema>* schema_out = nullptr) {
+ const int batch_length = 5;
+ TypePtr type = INT32;
+ ArrayPtr array;
+ const bool include_nulls = true;
+ RETURN_NOT_OK(MakeRandomInt32Array(1000, include_nulls, pool_, &array));
+ for (int i = 0; i < recursion_level; ++i) {
+ type = std::static_pointer_cast<DataType>(std::make_shared<ListType>(type));
+ RETURN_NOT_OK(
+ MakeRandomListArray(array, batch_length, include_nulls, pool_, &array));
+ }
+
+ auto f0 = std::make_shared<Field>("f0", type);
+ std::shared_ptr<Schema> schema(new Schema({f0}));
+ if (schema_out != nullptr) { *schema_out = schema; }
+ std::vector<ArrayPtr> arrays = {array};
+ auto batch = std::make_shared<RowBatch>(schema, batch_length, arrays);
+
+ std::string path = "test-write-past-max-recursion";
+ const int memory_map_size = 1 << 16;
+ MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
+ int64_t header_location;
+ int64_t* header_out_param = header_out == nullptr ? &header_location : header_out;
+ if (override_level) {
+ return WriteRowBatch(
+ mmap_.get(), batch.get(), 0, header_out_param, recursion_level + 1);
+ } else {
+ return WriteRowBatch(mmap_.get(), batch.get(), 0, header_out_param);
+ }
}
+
+ protected:
+ std::shared_ptr<MemoryMappedSource> mmap_;
+ MemoryPool* pool_;
+};
+
+TEST_F(RecursionLimits, WriteLimit) {
+ ASSERT_RAISES(Invalid, WriteToMmap((1 << 8) + 1, false));
+}
+
+TEST_F(RecursionLimits, ReadLimit) {
+ int64_t header_location;
+ std::shared_ptr<Schema> schema;
+ ASSERT_OK(WriteToMmap(64, true, &header_location, &schema));
+
+ std::shared_ptr<RowBatchReader> reader;
+ ASSERT_OK(RowBatchReader::Open(mmap_.get(), header_location, &reader));
+ std::shared_ptr<RowBatch> batch_result;
+ ASSERT_RAISES(Invalid, reader->GetRowBatch(schema, &batch_result));
}
} // namespace ipc
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/memory.cc b/cpp/src/arrow/ipc/memory.cc
index 2b077e9..84cbc18 100644
--- a/cpp/src/arrow/ipc/memory.cc
+++ b/cpp/src/arrow/ipc/memory.cc
@@ -18,6 +18,7 @@
#include "arrow/ipc/memory.h"
#include <sys/mman.h> // For memory-mapping
+
#include <algorithm>
#include <cerrno>
#include <cstdint>
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/metadata-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc
index ad5951d..1b1d50f 100644
--- a/cpp/src/arrow/ipc/metadata-internal.cc
+++ b/cpp/src/arrow/ipc/metadata-internal.cc
@@ -17,13 +17,14 @@
#include "arrow/ipc/metadata-internal.h"
-#include <flatbuffers/flatbuffers.h>
#include <cstdint>
#include <cstring>
#include <memory>
#include <sstream>
#include <string>
+#include "flatbuffers/flatbuffers.h"
+
#include "arrow/ipc/Message_generated.h"
#include "arrow/schema.h"
#include "arrow/type.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/metadata-internal.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h
index 779c5a3..871b5bc 100644
--- a/cpp/src/arrow/ipc/metadata-internal.h
+++ b/cpp/src/arrow/ipc/metadata-internal.h
@@ -18,11 +18,12 @@
#ifndef ARROW_IPC_METADATA_INTERNAL_H
#define ARROW_IPC_METADATA_INTERNAL_H
-#include <flatbuffers/flatbuffers.h>
#include <cstdint>
#include <memory>
#include <vector>
+#include "flatbuffers/flatbuffers.h"
+
#include "arrow/ipc/Message_generated.h"
namespace arrow {
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index bcf104f..4fc8ec5 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -17,11 +17,12 @@
#include "arrow/ipc/metadata.h"
-#include <flatbuffers/flatbuffers.h>
#include <cstdint>
#include <memory>
#include <vector>
+#include "flatbuffers/flatbuffers.h"
+
// Generated C++ flatbuffer IDL
#include "arrow/ipc/Message_generated.h"
#include "arrow/ipc/metadata-internal.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index 65c837d..e7dbb84 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -18,11 +18,19 @@
#ifndef ARROW_IPC_TEST_COMMON_H
#define ARROW_IPC_TEST_COMMON_H
+#include <algorithm>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
+#include "arrow/array.h"
+#include "arrow/test-util.h"
+#include "arrow/types/list.h"
+#include "arrow/types/primitive.h"
+#include "arrow/util/buffer.h"
+#include "arrow/util/memory-pool.h"
+
namespace arrow {
namespace ipc {
@@ -41,10 +49,69 @@ class MemoryMapFixture {
fclose(file);
}
+ Status InitMemoryMap(
+ int64_t size, const std::string& path, std::shared_ptr<MemoryMappedSource>* mmap) {
+ CreateFile(path, size);
+ return MemoryMappedSource::Open(path, MemorySource::READ_WRITE, mmap);
+ }
+
private:
std::vector<std::string> tmp_files_;
};
+Status MakeRandomInt32Array(
+ int32_t length, bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* array) {
+ std::shared_ptr<PoolBuffer> data;
+ test::MakeRandomInt32PoolBuffer(length, pool, &data);
+ const auto INT32 = std::make_shared<Int32Type>();
+ Int32Builder builder(pool, INT32);
+ if (include_nulls) {
+ std::shared_ptr<PoolBuffer> valid_bytes;
+ test::MakeRandomBytePoolBuffer(length, pool, &valid_bytes);
+ RETURN_NOT_OK(builder.Append(
+ reinterpret_cast<const int32_t*>(data->data()), length, valid_bytes->data()));
+ *array = builder.Finish();
+ return Status::OK();
+ }
+ RETURN_NOT_OK(builder.Append(reinterpret_cast<const int32_t*>(data->data()), length));
+ *array = builder.Finish();
+ return Status::OK();
+}
+
+Status MakeRandomListArray(const std::shared_ptr<Array>& child_array, int num_lists,
+ bool include_nulls, MemoryPool* pool, std::shared_ptr<Array>* array) {
+ // Create the null list values
+ std::vector<uint8_t> valid_lists(num_lists);
+ const double null_percent = include_nulls ? 0.1 : 0;
+ test::random_null_bytes(num_lists, null_percent, valid_lists.data());
+
+ // Create list offsets
+ const int max_list_size = 10;
+
+ std::vector<int32_t> list_sizes(num_lists, 0);
+ std::vector<int32_t> offsets(
+ num_lists + 1, 0); // +1 so we can shift for nulls. See partial sum below.
+ const int seed = child_array->length();
+ if (num_lists > 0) {
+ test::rand_uniform_int(num_lists, seed, 0, max_list_size, list_sizes.data());
+ // make sure sizes are consistent with null
+ std::transform(list_sizes.begin(), list_sizes.end(), valid_lists.begin(),
+ list_sizes.begin(),
+ [](int32_t size, int32_t valid) { return valid == 0 ? 0 : size; });
+ std::partial_sum(list_sizes.begin(), list_sizes.end(), ++offsets.begin());
+
+ // Force invariants
+ const int child_length = child_array->length();
+ offsets[0] = 0;
+ std::replace_if(offsets.begin(), offsets.end(),
+ [child_length](int32_t offset) { return offset > child_length; }, child_length);
+ }
+ ListBuilder builder(pool, child_array);
+ RETURN_NOT_OK(builder.Append(offsets.data(), num_lists, valid_lists.data()));
+ *array = builder.Finish();
+ return (*array)->Validate();
+}
+
} // namespace ipc
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc
index 066388b..560e283 100644
--- a/cpp/src/arrow/parquet/schema.cc
+++ b/cpp/src/arrow/parquet/schema.cc
@@ -21,8 +21,8 @@
#include "parquet/api/schema.h"
-#include "arrow/util/status.h"
#include "arrow/types/decimal.h"
+#include "arrow/util/status.h"
using parquet::schema::Node;
using parquet::schema::NodePtr;
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/schema.cc b/cpp/src/arrow/schema.cc
index a38acaa..ff3ea19 100644
--- a/cpp/src/arrow/schema.cc
+++ b/cpp/src/arrow/schema.cc
@@ -18,8 +18,8 @@
#include "arrow/schema.h"
#include <memory>
-#include <string>
#include <sstream>
+#include <string>
#include <vector>
#include "arrow/type.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 538d9b2..2f81161 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -19,6 +19,7 @@
#define ARROW_TEST_UTIL_H_
#include <cstdint>
+#include <limits>
#include <memory>
#include <random>
#include <string>
@@ -26,12 +27,13 @@
#include "gtest/gtest.h"
-#include "arrow/type.h"
#include "arrow/column.h"
#include "arrow/schema.h"
#include "arrow/table.h"
+#include "arrow/type.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/buffer.h"
+#include "arrow/util/logging.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/random.h"
#include "arrow/util/status.h"
@@ -103,10 +105,12 @@ std::shared_ptr<Buffer> to_buffer(const std::vector<T>& values) {
reinterpret_cast<const uint8_t*>(values.data()), values.size() * sizeof(T));
}
-void random_null_bitmap(int64_t n, double pct_null, uint8_t* null_bitmap) {
+// Sets approximately pct_null of the first n bytes in null_bytes to zero
+// and the rest to non-zero (true) values.
+void random_null_bytes(int64_t n, double pct_null, uint8_t* null_bytes) {
Random rng(random_seed());
for (int i = 0; i < n; ++i) {
- null_bitmap[i] = rng.NextDoubleFraction() > pct_null;
+ null_bytes[i] = rng.NextDoubleFraction() > pct_null;
}
}
@@ -121,6 +125,7 @@ static inline void random_bytes(int n, uint32_t seed, uint8_t* out) {
template <typename T>
void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
+ DCHECK(out);
std::mt19937 gen(seed);
std::uniform_int_distribution<T> d(min_value, max_value);
for (int i = 0; i < n; ++i) {
@@ -129,11 +134,25 @@ void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
}
static inline int bitmap_popcount(const uint8_t* data, int length) {
+ // book keeping
+ constexpr int pop_len = sizeof(uint64_t);
+ const uint64_t* i64_data = reinterpret_cast<const uint64_t*>(data);
+ const int fast_counts = length / pop_len;
+ const uint64_t* end = i64_data + fast_counts;
+
int count = 0;
- for (int i = 0; i < length; ++i) {
- // TODO(wesm): accelerate this
+ // popcount as much as possible with the widest possible count
+ for (auto iter = i64_data; iter < end; ++iter) {
+ count += __builtin_popcountll(*iter);
+ }
+
+ // Account for left over bytes (in theory we could fall back to smaller
+ // versions of popcount but the code complexity is likely not worth it)
+ const int loop_tail_index = fast_counts * pop_len;
+ for (int i = loop_tail_index; i < length; ++i) {
if (util::get_bit(data, i)) { ++count; }
}
+
return count;
}
@@ -153,6 +172,26 @@ std::shared_ptr<Buffer> bytes_to_null_buffer(const std::vector<uint8_t>& bytes)
return out;
}
+Status MakeRandomInt32PoolBuffer(int32_t length, MemoryPool* pool,
+ std::shared_ptr<PoolBuffer>* pool_buffer, uint32_t seed = 0) {
+ DCHECK(pool);
+ auto data = std::make_shared<PoolBuffer>(pool);
+ RETURN_NOT_OK(data->Resize(length * sizeof(int32_t)));
+ test::rand_uniform_int(length, seed, 0, std::numeric_limits<int32_t>::max(),
+ reinterpret_cast<int32_t*>(data->mutable_data()));
+ *pool_buffer = data;
+ return Status::OK();
+}
+
+Status MakeRandomBytePoolBuffer(int32_t length, MemoryPool* pool,
+ std::shared_ptr<PoolBuffer>* pool_buffer, uint32_t seed = 0) {
+ auto bytes = std::make_shared<PoolBuffer>(pool);
+ RETURN_NOT_OK(bytes->Resize(length));
+ test::random_bytes(length, seed, bytes->mutable_data());
+ *pool_buffer = bytes;
+ return Status::OK();
+}
+
} // namespace test
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 051ab46..77404cd 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -116,7 +116,7 @@ struct DataType {
bool Equals(const DataType* other) {
// Call with a pointer so more friendly to subclasses
- return this == other || (this->type == other->type);
+ return other && ((this == other) || (this->type == other->type));
}
bool Equals(const std::shared_ptr<DataType>& other) { return Equals(other.get()); }
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/construct.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/construct.cc b/cpp/src/arrow/types/construct.cc
index 0a30929..78036d4 100644
--- a/cpp/src/arrow/types/construct.cc
+++ b/cpp/src/arrow/types/construct.cc
@@ -20,8 +20,8 @@
#include <memory>
#include "arrow/type.h"
-#include "arrow/types/primitive.h"
#include "arrow/types/list.h"
+#include "arrow/types/primitive.h"
#include "arrow/types/string.h"
#include "arrow/util/buffer.h"
#include "arrow/util/status.h"
@@ -60,11 +60,10 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
case Type::LIST: {
std::shared_ptr<ArrayBuilder> value_builder;
-
const std::shared_ptr<DataType>& value_type =
static_cast<ListType*>(type.get())->value_type();
RETURN_NOT_OK(MakeBuilder(pool, value_type, &value_builder));
- out->reset(new ListBuilder(pool, type, value_builder));
+ out->reset(new ListBuilder(pool, value_builder));
return Status::OK();
}
default:
@@ -75,11 +74,11 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
#define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType) \
case Type::ENUM: \
out->reset(new ArrayType(type, length, data, null_count, null_bitmap)); \
- return Status::OK();
+ break;
-Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
+Status MakePrimitiveArray(const TypePtr& type, int32_t length,
const std::shared_ptr<Buffer>& data, int32_t null_count,
- const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<Array>* out) {
+ const std::shared_ptr<Buffer>& null_bitmap, ArrayPtr* out) {
switch (type->type) {
MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray);
MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array);
@@ -90,11 +89,43 @@ Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
MAKE_PRIMITIVE_ARRAY_CASE(INT32, Int32Array);
MAKE_PRIMITIVE_ARRAY_CASE(UINT64, UInt64Array);
MAKE_PRIMITIVE_ARRAY_CASE(INT64, Int64Array);
+ MAKE_PRIMITIVE_ARRAY_CASE(TIME, Int64Array);
+ MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP, Int64Array);
MAKE_PRIMITIVE_ARRAY_CASE(FLOAT, FloatArray);
MAKE_PRIMITIVE_ARRAY_CASE(DOUBLE, DoubleArray);
+ MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP_DOUBLE, DoubleArray);
+ default:
+ return Status::NotImplemented(type->ToString());
+ }
+#ifdef NDEBUG
+ return Status::OK();
+#else
+ return (*out)->Validate();
+#endif
+}
+
+Status MakeListArray(const TypePtr& type, int32_t length,
+ const std::shared_ptr<Buffer>& offsets, const ArrayPtr& values, int32_t null_count,
+ const std::shared_ptr<Buffer>& null_bitmap, ArrayPtr* out) {
+ switch (type->type) {
+ case Type::BINARY:
+ case Type::LIST:
+ out->reset(new ListArray(type, length, offsets, values, null_count, null_bitmap));
+ break;
+ case Type::CHAR:
+ case Type::DECIMAL_TEXT:
+ case Type::STRING:
+ case Type::VARCHAR:
+ out->reset(new StringArray(type, length, offsets, values, null_count, null_bitmap));
+ break;
default:
return Status::NotImplemented(type->ToString());
}
+#ifdef NDEBUG
+ return Status::OK();
+#else
+ return (*out)->Validate();
+#endif
}
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/construct.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/construct.h b/cpp/src/arrow/types/construct.h
index 27fb7bd..43c0018 100644
--- a/cpp/src/arrow/types/construct.h
+++ b/cpp/src/arrow/types/construct.h
@@ -33,10 +33,19 @@ class Status;
Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
std::shared_ptr<ArrayBuilder>* out);
+// Create new arrays for logical types that are backed by primitive arrays.
Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int32_t length,
const std::shared_ptr<Buffer>& data, int32_t null_count,
const std::shared_ptr<Buffer>& null_bitmap, std::shared_ptr<Array>* out);
+// Create new list arrays for logical types that are backed by ListArrays (e.g. list of
+// primitives and strings)
+// TODO(emkornfield) split up string vs list?
+Status MakeListArray(const std::shared_ptr<DataType>& type, int32_t length,
+ const std::shared_ptr<Buffer>& offests, const std::shared_ptr<Array>& values,
+ int32_t null_count, const std::shared_ptr<Buffer>& null_bitmap,
+ std::shared_ptr<Array>* out);
+
} // namespace arrow
#endif // ARROW_BUILDER_H_
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/list-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/list-test.cc b/cpp/src/arrow/types/list-test.cc
index aa34f23..6a8ad9a 100644
--- a/cpp/src/arrow/types/list-test.cc
+++ b/cpp/src/arrow/types/list-test.cc
@@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#include <cstdlib>
#include <cstdint>
+#include <cstdlib>
#include <memory>
#include <string>
#include <vector>
@@ -94,6 +94,7 @@ TEST_F(TestListBuilder, TestAppendNull) {
Done();
+ ASSERT_OK(result_->Validate());
ASSERT_TRUE(result_->IsNull(0));
ASSERT_TRUE(result_->IsNull(1));
@@ -105,50 +106,93 @@ TEST_F(TestListBuilder, TestAppendNull) {
ASSERT_EQ(0, values->length());
}
+void ValidateBasicListArray(const ListArray* result, const vector<int32_t>& values,
+ const vector<uint8_t>& is_valid) {
+ ASSERT_OK(result->Validate());
+ ASSERT_EQ(1, result->null_count());
+ ASSERT_EQ(0, result->values()->null_count());
+
+ ASSERT_EQ(3, result->length());
+ vector<int32_t> ex_offsets = {0, 3, 3, 7};
+ for (size_t i = 0; i < ex_offsets.size(); ++i) {
+ ASSERT_EQ(ex_offsets[i], result->offset(i));
+ }
+
+ for (int i = 0; i < result->length(); ++i) {
+ ASSERT_EQ(!static_cast<bool>(is_valid[i]), result->IsNull(i));
+ }
+
+ ASSERT_EQ(7, result->values()->length());
+ Int32Array* varr = static_cast<Int32Array*>(result->values().get());
+
+ for (size_t i = 0; i < values.size(); ++i) {
+ ASSERT_EQ(values[i], varr->Value(i));
+ }
+}
+
TEST_F(TestListBuilder, TestBasics) {
vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
vector<int> lengths = {3, 0, 4};
- vector<uint8_t> is_null = {0, 1, 0};
+ vector<uint8_t> is_valid = {1, 0, 1};
Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());
- EXPECT_OK(builder_->Reserve(lengths.size()));
- EXPECT_OK(vb->Reserve(values.size()));
+ ASSERT_OK(builder_->Reserve(lengths.size()));
+ ASSERT_OK(vb->Reserve(values.size()));
int pos = 0;
for (size_t i = 0; i < lengths.size(); ++i) {
- ASSERT_OK(builder_->Append(is_null[i] > 0));
+ ASSERT_OK(builder_->Append(is_valid[i] > 0));
for (int j = 0; j < lengths[i]; ++j) {
vb->Append(values[pos++]);
}
}
Done();
+ ValidateBasicListArray(result_.get(), values, is_valid);
+}
- ASSERT_EQ(1, result_->null_count());
- ASSERT_EQ(0, result_->values()->null_count());
+TEST_F(TestListBuilder, BulkAppend) {
+ vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
+ vector<int> lengths = {3, 0, 4};
+ vector<uint8_t> is_valid = {1, 0, 1};
+ vector<int32_t> offsets = {0, 3, 3};
- ASSERT_EQ(3, result_->length());
- vector<int32_t> ex_offsets = {0, 3, 3, 7};
- for (size_t i = 0; i < ex_offsets.size(); ++i) {
- ASSERT_EQ(ex_offsets[i], result_->offset(i));
- }
+ Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());
+ ASSERT_OK(vb->Reserve(values.size()));
- for (int i = 0; i < result_->length(); ++i) {
- ASSERT_EQ(static_cast<bool>(is_null[i]), result_->IsNull(i));
+ builder_->Append(offsets.data(), offsets.size(), is_valid.data());
+ for (int32_t value : values) {
+ vb->Append(value);
}
+ Done();
+ ValidateBasicListArray(result_.get(), values, is_valid);
+}
- ASSERT_EQ(7, result_->values()->length());
- Int32Array* varr = static_cast<Int32Array*>(result_->values().get());
+TEST_F(TestListBuilder, BulkAppendInvalid) {
+ vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
+ vector<int> lengths = {3, 0, 4};
+ vector<uint8_t> is_null = {0, 1, 0};
+ vector<uint8_t> is_valid = {1, 0, 1};
+ vector<int32_t> offsets = {0, 2, 4}; // should be 0, 3, 3 given the is_null array
- for (size_t i = 0; i < values.size(); ++i) {
- ASSERT_EQ(values[i], varr->Value(i));
+ Int32Builder* vb = static_cast<Int32Builder*>(builder_->value_builder().get());
+ ASSERT_OK(vb->Reserve(values.size()));
+
+ builder_->Append(offsets.data(), offsets.size(), is_valid.data());
+ builder_->Append(offsets.data(), offsets.size(), is_valid.data());
+ for (int32_t value : values) {
+ vb->Append(value);
}
+
+ Done();
+ ASSERT_RAISES(Invalid, result_->Validate());
}
TEST_F(TestListBuilder, TestZeroLength) {
// All buffers are null
Done();
+ ASSERT_OK(result_->Validate());
}
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/list.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/list.cc b/cpp/src/arrow/types/list.cc
index 23f12dd..fc33311 100644
--- a/cpp/src/arrow/types/list.cc
+++ b/cpp/src/arrow/types/list.cc
@@ -14,23 +14,26 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
#include "arrow/types/list.h"
+#include <sstream>
+
namespace arrow {
bool ListArray::EqualsExact(const ListArray& other) const {
if (this == &other) { return true; }
if (null_count_ != other.null_count_) { return false; }
- bool equal_offsets = offset_buf_->Equals(*other.offset_buf_, length_ + 1);
+ bool equal_offsets =
+ offset_buf_->Equals(*other.offset_buf_, (length_ + 1) * sizeof(int32_t));
+ if (!equal_offsets) { return false; }
bool equal_null_bitmap = true;
if (null_count_ > 0) {
equal_null_bitmap =
null_bitmap_->Equals(*other.null_bitmap_, util::bytes_for_bits(length_));
}
- if (!(equal_offsets && equal_null_bitmap)) { return false; }
+ if (!equal_null_bitmap) { return false; }
return values()->Equals(other.values());
}
@@ -41,4 +44,55 @@ bool ListArray::Equals(const std::shared_ptr<Array>& arr) const {
return EqualsExact(*static_cast<const ListArray*>(arr.get()));
}
+Status ListArray::Validate() const {
+ if (length_ < 0) { return Status::Invalid("Length was negative"); }
+ if (!offset_buf_) { return Status::Invalid("offset_buf_ was null"); }
+ if (offset_buf_->size() / sizeof(int32_t) < length_) {
+ std::stringstream ss;
+ ss << "offset buffer size (bytes): " << offset_buf_->size()
+ << " isn't large enough for length: " << length_;
+ return Status::Invalid(ss.str());
+ }
+ const int32_t last_offset = offset(length_);
+ if (last_offset > 0) {
+ if (!values_) {
+ return Status::Invalid("last offset was non-zero and values was null");
+ }
+ if (values_->length() != last_offset) {
+ std::stringstream ss;
+ ss << "Final offset invariant not equal to values length: " << last_offset
+ << "!=" << values_->length();
+ return Status::Invalid(ss.str());
+ }
+
+ const Status child_valid = values_->Validate();
+ if (!child_valid.ok()) {
+ std::stringstream ss;
+ ss << "Child array invalid: " << child_valid.ToString();
+ return Status::Invalid(ss.str());
+ }
+ }
+
+ int32_t prev_offset = offset(0);
+ if (prev_offset != 0) { return Status::Invalid("The first offset wasn't zero"); }
+ for (int32_t i = 1; i <= length_; ++i) {
+ int32_t current_offset = offset(i);
+ if (IsNull(i - 1) && current_offset != prev_offset) {
+ std::stringstream ss;
+ ss << "Offset invariant failure at: " << i << " inconsistent offsets for null slot"
+ << current_offset << "!=" << prev_offset;
+ return Status::Invalid(ss.str());
+ }
+ if (current_offset < prev_offset) {
+ std::stringstream ss;
+ ss << "Offset invariant failure: " << i
+ << " inconsistent offset for non-null slot: " << current_offset << "<"
+ << prev_offset;
+ return Status::Invalid(ss.str());
+ }
+ prev_offset = current_offset;
+ }
+ return Status::OK();
+}
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/list.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/list.h b/cpp/src/arrow/types/list.h
index 6b81546..e2302d9 100644
--- a/cpp/src/arrow/types/list.h
+++ b/cpp/src/arrow/types/list.h
@@ -28,6 +28,7 @@
#include "arrow/types/primitive.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/buffer.h"
+#include "arrow/util/logging.h"
#include "arrow/util/status.h"
namespace arrow {
@@ -46,11 +47,16 @@ class ListArray : public Array {
values_ = values;
}
- virtual ~ListArray() {}
+ Status Validate() const override;
+
+ virtual ~ListArray() = default;
// Return a shared pointer in case the requestor desires to share ownership
// with this array.
const std::shared_ptr<Array>& values() const { return values_; }
+ const std::shared_ptr<Buffer> offset_buffer() const {
+ return std::static_pointer_cast<Buffer>(offset_buf_);
+ }
const std::shared_ptr<DataType>& value_type() const { return values_->type(); }
@@ -78,59 +84,73 @@ class ListArray : public Array {
//
// To use this class, you must append values to the child array builder and use
// the Append function to delimit each distinct list value (once the values
-// have been appended to the child array)
-class ListBuilder : public Int32Builder {
+// have been appended to the child array) or use the bulk API to append
+// a sequence of offests and null values.
+//
+// A note on types. Per arrow/type.h all types in the c++ implementation are
+// logical so even though this class always builds an Array of lists, this can
+// represent multiple different logical types. If no logical type is provided
+// at construction time, the class defaults to List<T> where t is take from the
+// value_builder/values that the object is constructed with.
+class ListBuilder : public ArrayBuilder {
public:
+ // Use this constructor to incrementally build the value array along with offsets and
+ // null bitmap.
+ ListBuilder(MemoryPool* pool, std::shared_ptr<ArrayBuilder> value_builder,
+ const TypePtr& type = nullptr)
+ : ArrayBuilder(
+ pool, type ? type : std::static_pointer_cast<DataType>(
+ std::make_shared<ListType>(value_builder->type()))),
+ offset_builder_(pool),
+ value_builder_(value_builder) {}
+
+ // Use this constructor to build the list with a pre-existing values array
ListBuilder(
- MemoryPool* pool, const TypePtr& type, std::shared_ptr<ArrayBuilder> value_builder)
- : Int32Builder(pool, type), value_builder_(value_builder) {}
-
- Status Init(int32_t elements) {
- // One more than requested.
- //
- // XXX: This is slightly imprecise, because we might trigger null mask
- // resizes that are unnecessary when creating arrays with power-of-two size
- return Int32Builder::Init(elements + 1);
+ MemoryPool* pool, std::shared_ptr<Array> values, const TypePtr& type = nullptr)
+ : ArrayBuilder(pool, type ? type : std::static_pointer_cast<DataType>(
+ std::make_shared<ListType>(values->type()))),
+ offset_builder_(pool),
+ values_(values) {}
+
+ Status Init(int32_t elements) override {
+ RETURN_NOT_OK(ArrayBuilder::Init(elements));
+ // one more then requested for offsets
+ return offset_builder_.Resize((elements + 1) * sizeof(int32_t));
}
- Status Resize(int32_t capacity) {
- // Need space for the end offset
- RETURN_NOT_OK(Int32Builder::Resize(capacity + 1));
-
- // Slight hack, as the "real" capacity is one less
- --capacity_;
- return Status::OK();
+ Status Resize(int32_t capacity) override {
+ // one more then requested for offsets
+ RETURN_NOT_OK(offset_builder_.Resize((capacity + 1) * sizeof(int32_t)));
+ return ArrayBuilder::Resize(capacity);
}
// Vector append
//
// If passed, valid_bytes is of equal length to values, and any zero byte
// will be considered as a null for that slot
- Status Append(value_type* values, int32_t length, uint8_t* valid_bytes = nullptr) {
- if (length_ + length > capacity_) {
- int32_t new_capacity = util::next_power2(length_ + length);
- RETURN_NOT_OK(Resize(new_capacity));
- }
- memcpy(raw_data_ + length_, values, type_traits<Int32Type>::bytes_required(length));
-
- if (valid_bytes != nullptr) { AppendNulls(valid_bytes, length); }
-
- length_ += length;
+ Status Append(
+ const int32_t* offsets, int32_t length, const uint8_t* valid_bytes = nullptr) {
+ RETURN_NOT_OK(Reserve(length));
+ UnsafeAppendToBitmap(valid_bytes, length);
+ offset_builder_.UnsafeAppend<int32_t>(offsets, length);
return Status::OK();
}
+ // The same as Finalize but allows for overridding the c++ type
template <typename Container>
std::shared_ptr<Array> Transfer() {
- std::shared_ptr<Array> items = value_builder_->Finish();
+ std::shared_ptr<Array> items = values_;
+ if (!items) { items = value_builder_->Finish(); }
- // Add final offset if the length is non-zero
- if (length_) { raw_data_[length_] = items->length(); }
+ offset_builder_.Append<int32_t>(items->length());
+ const auto offsets_buffer = offset_builder_.Finish();
auto result = std::make_shared<Container>(
- type_, length_, data_, items, null_count_, null_bitmap_);
+ type_, length_, offsets_buffer, items, null_count_, null_bitmap_);
- data_ = null_bitmap_ = nullptr;
+ // TODO(emkornfield) make a reset method
capacity_ = length_ = null_count_ = 0;
+ null_bitmap_ = nullptr;
return result;
}
@@ -141,26 +161,24 @@ class ListBuilder : public Int32Builder {
//
// This function should be called before beginning to append elements to the
// value builder
- Status Append(bool is_null = false) {
- if (length_ == capacity_) {
- // If the capacity was not already a multiple of 2, do so here
- RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1)));
- }
- if (is_null) {
- ++null_count_;
- } else {
- util::set_bit(null_bitmap_data_, length_);
- }
- raw_data_[length_++] = value_builder_->length();
+ Status Append(bool is_valid = true) {
+ RETURN_NOT_OK(Reserve(1));
+ UnsafeAppendToBitmap(is_valid);
+ RETURN_NOT_OK(offset_builder_.Append<int32_t>(value_builder_->length()));
return Status::OK();
}
- Status AppendNull() { return Append(true); }
+ Status AppendNull() { return Append(false); }
- const std::shared_ptr<ArrayBuilder>& value_builder() const { return value_builder_; }
+ const std::shared_ptr<ArrayBuilder>& value_builder() const {
+ DCHECK(!values_) << "Using value builder is pointless when values_ is set";
+ return value_builder_;
+ }
protected:
+ BufferBuilder offset_builder_;
std::shared_ptr<ArrayBuilder> value_builder_;
+ std::shared_ptr<Array> values_;
};
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/primitive-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc
index 6bd9e73..2b4c087 100644
--- a/cpp/src/arrow/types/primitive-test.cc
+++ b/cpp/src/arrow/types/primitive-test.cc
@@ -102,7 +102,7 @@ class TestPrimitiveBuilder : public TestBuilder {
Attrs::draw(N, &draws_);
valid_bytes_.resize(N);
- test::random_null_bitmap(N, pct_null, valid_bytes_.data());
+ test::random_null_bytes(N, pct_null, valid_bytes_.data());
}
void Check(const std::shared_ptr<BuilderType>& builder, bool nullable) {
@@ -193,8 +193,8 @@ void TestPrimitiveBuilder<PBoolean>::RandomData(int N, double pct_null) {
draws_.resize(N);
valid_bytes_.resize(N);
- test::random_null_bitmap(N, 0.5, draws_.data());
- test::random_null_bitmap(N, pct_null, valid_bytes_.data());
+ test::random_null_bytes(N, 0.5, draws_.data());
+ test::random_null_bytes(N, pct_null, valid_bytes_.data());
}
template <>
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/primitive.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc
index 9549c47..9102c53 100644
--- a/cpp/src/arrow/types/primitive.cc
+++ b/cpp/src/arrow/types/primitive.cc
@@ -57,12 +57,14 @@ bool PrimitiveArray::EqualsExact(const PrimitiveArray& other) const {
}
return true;
} else {
+ if (length_ == 0 && other.length_ == 0) { return true; }
return data_->Equals(*other.data_, length_);
}
}
bool PrimitiveArray::Equals(const std::shared_ptr<Array>& arr) const {
if (this == arr.get()) { return true; }
+ if (!arr) { return false; }
if (this->type_enum() != arr->type_enum()) { return false; }
return EqualsExact(*static_cast<const PrimitiveArray*>(arr.get()));
}
@@ -102,48 +104,21 @@ Status PrimitiveBuilder<T>::Resize(int32_t capacity) {
}
template <typename T>
-Status PrimitiveBuilder<T>::Reserve(int32_t elements) {
- if (length_ + elements > capacity_) {
- int32_t new_capacity = util::next_power2(length_ + elements);
- return Resize(new_capacity);
- }
- return Status::OK();
-}
-
-template <typename T>
Status PrimitiveBuilder<T>::Append(
const value_type* values, int32_t length, const uint8_t* valid_bytes) {
- RETURN_NOT_OK(PrimitiveBuilder<T>::Reserve(length));
+ RETURN_NOT_OK(Reserve(length));
if (length > 0) {
memcpy(raw_data_ + length_, values, type_traits<T>::bytes_required(length));
}
- if (valid_bytes != nullptr) {
- PrimitiveBuilder<T>::AppendNulls(valid_bytes, length);
- } else {
- for (int i = 0; i < length; ++i) {
- util::set_bit(null_bitmap_data_, length_ + i);
- }
- }
+ // length_ is update by these
+ ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
- length_ += length;
return Status::OK();
}
template <typename T>
-void PrimitiveBuilder<T>::AppendNulls(const uint8_t* valid_bytes, int32_t length) {
- // If valid_bytes is all not null, then none of the values are null
- for (int i = 0; i < length; ++i) {
- if (valid_bytes[i] == 0) {
- ++null_count_;
- } else {
- util::set_bit(null_bitmap_data_, length_ + i);
- }
- }
-}
-
-template <typename T>
std::shared_ptr<Array> PrimitiveBuilder<T>::Finish() {
std::shared_ptr<Array> result = std::make_shared<typename type_traits<T>::ArrayType>(
type_, length_, data_, null_count_, null_bitmap_);
@@ -166,14 +141,8 @@ Status PrimitiveBuilder<BooleanType>::Append(
}
}
- if (valid_bytes != nullptr) {
- PrimitiveBuilder<BooleanType>::AppendNulls(valid_bytes, length);
- } else {
- for (int i = 0; i < length; ++i) {
- util::set_bit(null_bitmap_data_, length_ + i);
- }
- }
- length_ += length;
+ // this updates length_
+ ArrayBuilder::UnsafeAppendToBitmap(valid_bytes, length);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/primitive.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/primitive.h b/cpp/src/arrow/types/primitive.h
index fcd3db4..6f6b2fe 100644
--- a/cpp/src/arrow/types/primitive.h
+++ b/cpp/src/arrow/types/primitive.h
@@ -95,15 +95,13 @@ class PrimitiveBuilder : public ArrayBuilder {
using ArrayBuilder::Advance;
// Write nulls as uint8_t* (0 value indicates null) into pre-allocated memory
- void AppendNulls(const uint8_t* valid_bytes, int32_t length);
+ void AppendNulls(const uint8_t* valid_bytes, int32_t length) {
+ UnsafeAppendToBitmap(valid_bytes, length);
+ }
Status AppendNull() {
- if (length_ == capacity_) {
- // If the capacity was not already a multiple of 2, do so here
- RETURN_NOT_OK(Resize(util::next_power2(capacity_ + 1)));
- }
- ++null_count_;
- ++length_;
+ RETURN_NOT_OK(Reserve(1));
+ UnsafeAppendToBitmap(false);
return Status::OK();
}
@@ -116,21 +114,17 @@ class PrimitiveBuilder : public ArrayBuilder {
Status Append(
const value_type* values, int32_t length, const uint8_t* valid_bytes = nullptr);
- // Ensure that builder can accommodate an additional number of
- // elements. Resizes if the current capacity is not sufficient
- Status Reserve(int32_t elements);
-
std::shared_ptr<Array> Finish() override;
- protected:
- std::shared_ptr<PoolBuffer> data_;
- value_type* raw_data_;
-
- Status Init(int32_t capacity);
+ Status Init(int32_t capacity) override;
// Increase the capacity of the builder to accommodate at least the indicated
// number of elements
- Status Resize(int32_t capacity);
+ Status Resize(int32_t capacity) override;
+
+ protected:
+ std::shared_ptr<PoolBuffer> data_;
+ value_type* raw_data_;
};
template <typename T>
@@ -140,9 +134,17 @@ class NumericBuilder : public PrimitiveBuilder<T> {
using PrimitiveBuilder<T>::PrimitiveBuilder;
using PrimitiveBuilder<T>::Append;
+ using PrimitiveBuilder<T>::Init;
+ using PrimitiveBuilder<T>::Resize;
- // Scalar append. Does not capacity-check; make sure to call Reserve beforehand
+ // Scalar append.
void Append(value_type val) {
+ ArrayBuilder::Reserve(1);
+ UnsafeAppend(val);
+ }
+
+ // Does not capacity-check; make sure to call Reserve beforehand
+ void UnsafeAppend(value_type val) {
util::set_bit(null_bitmap_data_, length_);
raw_data_[length_++] = val;
}
@@ -151,9 +153,6 @@ class NumericBuilder : public PrimitiveBuilder<T> {
using PrimitiveBuilder<T>::length_;
using PrimitiveBuilder<T>::null_bitmap_data_;
using PrimitiveBuilder<T>::raw_data_;
-
- using PrimitiveBuilder<T>::Init;
- using PrimitiveBuilder<T>::Resize;
};
template <>
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/types/string.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/types/string.h b/cpp/src/arrow/types/string.h
index c5cbe10..d2d3c5b 100644
--- a/cpp/src/arrow/types/string.h
+++ b/cpp/src/arrow/types/string.h
@@ -89,11 +89,11 @@ class StringArray : public ListArray {
const uint8_t* raw_bytes_;
};
-// Array builder
+// String builder
class StringBuilder : public ListBuilder {
public:
explicit StringBuilder(MemoryPool* pool, const TypePtr& type)
- : ListBuilder(pool, type, std::make_shared<UInt8Builder>(pool, value_type_)) {
+ : ListBuilder(pool, std::make_shared<UInt8Builder>(pool, value_type_), type) {
byte_builder_ = static_cast<UInt8Builder*>(value_builder_.get());
}
@@ -110,7 +110,6 @@ class StringBuilder : public ListBuilder {
}
protected:
- std::shared_ptr<ListBuilder> list_builder_;
UInt8Builder* byte_builder_;
static TypePtr value_type_;
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/util/buffer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/buffer.h b/cpp/src/arrow/util/buffer.h
index 56532be..5ef0076 100644
--- a/cpp/src/arrow/util/buffer.h
+++ b/cpp/src/arrow/util/buffer.h
@@ -23,6 +23,7 @@
#include <cstring>
#include <memory>
+#include "arrow/util/bit-util.h"
#include "arrow/util/macros.h"
#include "arrow/util/status.h"
@@ -137,26 +138,64 @@ class BufferBuilder {
public:
explicit BufferBuilder(MemoryPool* pool) : pool_(pool), capacity_(0), size_(0) {}
+ Status Resize(int32_t elements) {
+ if (capacity_ == 0) { buffer_ = std::make_shared<PoolBuffer>(pool_); }
+ capacity_ = elements;
+ RETURN_NOT_OK(buffer_->Resize(capacity_));
+ data_ = buffer_->mutable_data();
+ return Status::OK();
+ }
+
Status Append(const uint8_t* data, int length) {
- if (capacity_ < length + size_) {
- if (capacity_ == 0) { buffer_ = std::make_shared<PoolBuffer>(pool_); }
- capacity_ = std::max(MIN_BUFFER_CAPACITY, capacity_);
- while (capacity_ < length + size_) {
- capacity_ *= 2;
- }
- RETURN_NOT_OK(buffer_->Resize(capacity_));
- data_ = buffer_->mutable_data();
- }
+ if (capacity_ < length + size_) { RETURN_NOT_OK(Resize(length + size_)); }
+ UnsafeAppend(data, length);
+ return Status::OK();
+ }
+
+ template <typename T>
+ Status Append(T arithmetic_value) {
+ static_assert(std::is_arithmetic<T>::value,
+ "Convenience buffer append only supports arithmetic types");
+ return Append(reinterpret_cast<uint8_t*>(&arithmetic_value), sizeof(T));
+ }
+
+ template <typename T>
+ Status Append(const T* arithmetic_values, int num_elements) {
+ static_assert(std::is_arithmetic<T>::value,
+ "Convenience buffer append only supports arithmetic types");
+ return Append(
+ reinterpret_cast<const uint8_t*>(arithmetic_values), num_elements * sizeof(T));
+ }
+
+ // Unsafe methods don't check existing size
+ void UnsafeAppend(const uint8_t* data, int length) {
memcpy(data_ + size_, data, length);
size_ += length;
- return Status::OK();
+ }
+
+ template <typename T>
+ void UnsafeAppend(T arithmetic_value) {
+ static_assert(std::is_arithmetic<T>::value,
+ "Convenience buffer append only supports arithmetic types");
+ UnsafeAppend(reinterpret_cast<uint8_t*>(&arithmetic_value), sizeof(T));
+ }
+
+ template <typename T>
+ void UnsafeAppend(const T* arithmetic_values, int num_elements) {
+ static_assert(std::is_arithmetic<T>::value,
+ "Convenience buffer append only supports arithmetic types");
+ UnsafeAppend(
+ reinterpret_cast<const uint8_t*>(arithmetic_values), num_elements * sizeof(T));
}
std::shared_ptr<Buffer> Finish() {
auto result = buffer_;
buffer_ = nullptr;
+ capacity_ = size_ = 0;
return result;
}
+ int capacity() { return capacity_; }
+ int length() { return size_; }
private:
std::shared_ptr<PoolBuffer> buffer_;
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index 527ce42..fccc5e3 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -18,8 +18,8 @@
#ifndef ARROW_UTIL_LOGGING_H
#define ARROW_UTIL_LOGGING_H
-#include <iostream>
#include <cstdlib>
+#include <iostream>
namespace arrow {
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/cpp/src/arrow/util/memory-pool.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/memory-pool.cc b/cpp/src/arrow/util/memory-pool.cc
index fb417e7..961554f 100644
--- a/cpp/src/arrow/util/memory-pool.cc
+++ b/cpp/src/arrow/util/memory-pool.cc
@@ -18,8 +18,8 @@
#include "arrow/util/memory-pool.h"
#include <cstdlib>
-#include <sstream>
#include <mutex>
+#include <sstream>
#include "arrow/util/status.h"
http://git-wip-us.apache.org/repos/asf/arrow/blob/0b472d86/python/pyarrow/tests/test_array.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_array.py b/python/pyarrow/tests/test_array.py
index d608f81..bf5a220 100644
--- a/python/pyarrow/tests/test_array.py
+++ b/python/pyarrow/tests/test_array.py
@@ -31,14 +31,15 @@ class TestArrayAPI(unittest.TestCase):
assert arr[1] is pyarrow.NA
def test_list_format(self):
- arr = pyarrow.from_pylist([[1], None, [2, 3]])
+ arr = pyarrow.from_pylist([[1], None, [2, 3, None]])
result = fmt.array_format(arr)
expected = """\
[
[1],
NA,
[2,
- 3]
+ 3,
+ NA]
]"""
assert result == expected