You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/02/06 16:25:35 UTC
[2/3] arrow git commit: ARROW-33: [C++] Implement zero-copy array
slicing, integrate with IPC code paths
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index d039bba..27fad71 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -76,10 +76,10 @@ class RangeEqualsVisitor : public ArrayVisitor {
const bool is_null = left.IsNull(i);
if (is_null != right.IsNull(o_i)) { return false; }
if (is_null) continue;
- const int32_t begin_offset = left.offset(i);
- const int32_t end_offset = left.offset(i + 1);
- const int32_t right_begin_offset = right.offset(o_i);
- const int32_t right_end_offset = right.offset(o_i + 1);
+ const int32_t begin_offset = left.value_offset(i);
+ const int32_t end_offset = left.value_offset(i + 1);
+ const int32_t right_begin_offset = right.value_offset(o_i);
+ const int32_t right_end_offset = right.value_offset(o_i + 1);
// Underlying can't be equal if the size isn't equal
if (end_offset - begin_offset != right_end_offset - right_begin_offset) {
return false;
@@ -169,10 +169,10 @@ class RangeEqualsVisitor : public ArrayVisitor {
const bool is_null = left.IsNull(i);
if (is_null != right.IsNull(o_i)) { return false; }
if (is_null) continue;
- const int32_t begin_offset = left.offset(i);
- const int32_t end_offset = left.offset(i + 1);
- const int32_t right_begin_offset = right.offset(o_i);
- const int32_t right_end_offset = right.offset(o_i + 1);
+ const int32_t begin_offset = left.value_offset(i);
+ const int32_t end_offset = left.value_offset(i + 1);
+ const int32_t right_begin_offset = right.value_offset(o_i);
+ const int32_t right_end_offset = right.value_offset(o_i + 1);
// Underlying can't be equal if the size isn't equal
if (end_offset - begin_offset != right_end_offset - right_begin_offset) {
return false;
@@ -200,7 +200,11 @@ class RangeEqualsVisitor : public ArrayVisitor {
for (size_t j = 0; j < left.fields().size(); ++j) {
// TODO: really we should be comparing stretches of non-null data rather
// than looking at one value at a time.
- equal_fields = left.field(j)->RangeEquals(i, i + 1, o_i, right.field(j));
+ const int left_abs_index = i + left.offset();
+ const int right_abs_index = o_i + right.offset();
+
+ equal_fields = left.field(j)->RangeEquals(
+ left_abs_index, left_abs_index + 1, right_abs_index, right.field(j));
if (!equal_fields) { return false; }
}
}
@@ -223,7 +227,7 @@ class RangeEqualsVisitor : public ArrayVisitor {
// Define a mapping from the type id to child number
uint8_t max_code = 0;
- const std::vector<uint8_t> type_codes = left_type.type_ids;
+ const std::vector<uint8_t> type_codes = left_type.type_codes;
for (size_t i = 0; i < type_codes.size(); ++i) {
const uint8_t code = type_codes[i];
if (code > max_code) { max_code = code; }
@@ -248,15 +252,19 @@ class RangeEqualsVisitor : public ArrayVisitor {
id = left_ids[i];
child_num = type_id_to_child_num[id];
+ const int left_abs_index = i + left.offset();
+ const int right_abs_index = o_i + right.offset();
+
// TODO(wesm): really we should be comparing stretches of non-null data
// rather than looking at one value at a time.
if (union_mode == UnionMode::SPARSE) {
- if (!left.child(child_num)->RangeEquals(i, i + 1, o_i, right.child(child_num))) {
+ if (!left.child(child_num)->RangeEquals(left_abs_index, left_abs_index + 1,
+ right_abs_index, right.child(child_num))) {
return false;
}
} else {
- const int32_t offset = left.raw_offsets()[i];
- const int32_t o_offset = right.raw_offsets()[i];
+ const int32_t offset = left.raw_value_offsets()[i];
+ const int32_t o_offset = right.raw_value_offsets()[i];
if (!left.child(child_num)->RangeEquals(
offset, offset + 1, o_offset, right.child(child_num))) {
return false;
@@ -315,20 +323,29 @@ class EqualsVisitor : public RangeEqualsVisitor {
}
result_ = true;
} else {
- result_ = left.data()->Equals(*right.data(), BitUtil::BytesForBits(left.length()));
+ result_ = BitmapEquals(left.data()->data(), left.offset(), right.data()->data(),
+ right.offset(), left.length());
}
return Status::OK();
}
bool IsEqualPrimitive(const PrimitiveArray& left) {
const auto& right = static_cast<const PrimitiveArray&>(right_);
- if (left.null_count() > 0) {
- const uint8_t* left_data = left.data()->data();
- const uint8_t* right_data = right.data()->data();
- const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
- const int value_byte_size = size_meta.bit_width() / 8;
- DCHECK_GT(value_byte_size, 0);
+ const auto& size_meta = dynamic_cast<const FixedWidthType&>(*left.type());
+ const int value_byte_size = size_meta.bit_width() / 8;
+ DCHECK_GT(value_byte_size, 0);
+
+ const uint8_t* left_data = nullptr;
+ if (left.length() > 0) {
+ left_data = left.data()->data() + left.offset() * value_byte_size;
+ }
+
+ const uint8_t* right_data = nullptr;
+ if (right.length() > 0) {
+ right_data = right.data()->data() + right.offset() * value_byte_size;
+ }
+ if (left.null_count() > 0) {
for (int i = 0; i < left.length(); ++i) {
if (!left.IsNull(i) && memcmp(left_data, right_data, value_byte_size)) {
return false;
@@ -339,7 +356,7 @@ class EqualsVisitor : public RangeEqualsVisitor {
return true;
} else {
if (left.length() == 0) { return true; }
- return left.data()->Equals(*right.data(), left.length());
+ return memcmp(left_data, right_data, value_byte_size * left.length()) == 0;
}
}
@@ -376,13 +393,46 @@ class EqualsVisitor : public RangeEqualsVisitor {
Status Visit(const IntervalArray& left) override { return ComparePrimitive(left); }
+ template <typename ArrayType>
+ bool ValueOffsetsEqual(const ArrayType& left) {
+ const auto& right = static_cast<const ArrayType&>(right_);
+
+ if (left.offset() == 0 && right.offset() == 0) {
+ return left.value_offsets()->Equals(
+ *right.value_offsets(), (left.length() + 1) * sizeof(int32_t));
+ } else {
+ // One of the arrays is sliced; logic is more complicated because the
+ // value offsets are not both 0-based
+ auto left_offsets =
+ reinterpret_cast<const int32_t*>(left.value_offsets()->data()) + left.offset();
+ auto right_offsets =
+ reinterpret_cast<const int32_t*>(right.value_offsets()->data()) +
+ right.offset();
+
+ for (int32_t i = 0; i < left.length() + 1; ++i) {
+ if (left_offsets[i] - left_offsets[0] != right_offsets[i] - right_offsets[0]) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
bool CompareBinary(const BinaryArray& left) {
const auto& right = static_cast<const BinaryArray&>(right_);
- bool equal_offsets =
- left.offsets()->Equals(*right.offsets(), (left.length() + 1) * sizeof(int32_t));
+
+ bool equal_offsets = ValueOffsetsEqual<BinaryArray>(left);
if (!equal_offsets) { return false; }
- if (!left.data() && !(right.data())) { return true; }
- return left.data()->Equals(*right.data(), left.raw_offsets()[left.length()]);
+
+ if (left.offset() == 0 && right.offset() == 0) {
+ if (!left.data() && !(right.data())) { return true; }
+ return left.data()->Equals(*right.data(), left.raw_value_offsets()[left.length()]);
+ } else {
+ // Compare the corresponding data range
+ const int64_t total_bytes = left.value_offset(left.length()) - left.value_offset(0);
+ return std::memcmp(left.data()->data() + left.value_offset(0),
+ right.data()->data() + right.value_offset(0), total_bytes) == 0;
+ }
}
Status Visit(const StringArray& left) override {
@@ -397,12 +447,20 @@ class EqualsVisitor : public RangeEqualsVisitor {
Status Visit(const ListArray& left) override {
const auto& right = static_cast<const ListArray&>(right_);
- if (!left.offsets()->Equals(
- *right.offsets(), (left.length() + 1) * sizeof(int32_t))) {
+ bool equal_offsets = ValueOffsetsEqual<ListArray>(left);
+ if (!equal_offsets) {
result_ = false;
- } else {
+ return Status::OK();
+ }
+
+ if (left.offset() == 0 && right.offset() == 0) {
result_ = left.values()->Equals(right.values());
+ } else {
+ // One of the arrays is sliced
+ result_ = left.values()->RangeEquals(left.value_offset(0),
+ left.value_offset(left.length()), right.value_offset(0), right.values());
}
+
return Status::OK();
}
@@ -422,8 +480,8 @@ inline bool FloatingApproxEquals(
const NumericArray<TYPE>& left, const NumericArray<TYPE>& right) {
using T = typename TYPE::c_type;
- auto left_data = reinterpret_cast<const T*>(left.data()->data());
- auto right_data = reinterpret_cast<const T*>(right.data()->data());
+ const T* left_data = left.raw_data();
+ const T* right_data = right.raw_data();
static constexpr T EPSILON = 1E-5;
@@ -465,8 +523,8 @@ static bool BaseDataEquals(const Array& left, const Array& right) {
return false;
}
if (left.null_count() > 0) {
- return left.null_bitmap()->Equals(
- *right.null_bitmap(), BitUtil::BytesForBits(left.length()));
+ return BitmapEquals(left.null_bitmap()->data(), left.offset(),
+ right.null_bitmap()->data(), right.offset(), left.length());
}
return true;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index ff58e53..c1f0ea4 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -401,8 +401,8 @@ class ReadableFile::ReadableFileImpl : public OSFile {
Status Open(const std::string& path) { return OpenReadable(path); }
Status ReadBuffer(int64_t nbytes, std::shared_ptr<Buffer>* out) {
- auto buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(buffer->Resize(nbytes));
+ std::shared_ptr<ResizableBuffer> buffer;
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
int64_t bytes_read = 0;
RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/hdfs.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/hdfs.cc b/cpp/src/arrow/io/hdfs.cc
index 2845b0d..5682f44 100644
--- a/cpp/src/arrow/io/hdfs.cc
+++ b/cpp/src/arrow/io/hdfs.cc
@@ -125,8 +125,8 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
}
Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) {
- auto buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(buffer->Resize(nbytes));
+ std::shared_ptr<ResizableBuffer> buffer;
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
int64_t bytes_read = 0;
RETURN_NOT_OK(ReadAt(position, nbytes, &bytes_read, buffer->mutable_data()));
@@ -152,8 +152,8 @@ class HdfsReadableFile::HdfsReadableFileImpl : public HdfsAnyFileImpl {
}
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
- auto buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(buffer->Resize(nbytes));
+ std::shared_ptr<ResizableBuffer> buffer;
+ RETURN_NOT_OK(AllocateResizableBuffer(pool_, nbytes, &buffer));
int64_t bytes_read = 0;
RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data()));
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/io-hdfs-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-hdfs-test.cc b/cpp/src/arrow/io/io-hdfs-test.cc
index 72e0ba8..f0e5a28 100644
--- a/cpp/src/arrow/io/io-hdfs-test.cc
+++ b/cpp/src/arrow/io/io-hdfs-test.cc
@@ -336,8 +336,9 @@ TYPED_TEST(TestHdfsClient, LargeFile) {
std::shared_ptr<HdfsReadableFile> file;
ASSERT_OK(this->client_->OpenReadable(path, &file));
- auto buffer = std::make_shared<PoolBuffer>();
- ASSERT_OK(buffer->Resize(size));
+ std::shared_ptr<MutableBuffer> buffer;
+ ASSERT_OK(AllocateBuffer(nullptr, size, &buffer));
+
int64_t bytes_read = 0;
ASSERT_OK(file->Read(size, &bytes_read, buffer->mutable_data()));
@@ -348,8 +349,9 @@ TYPED_TEST(TestHdfsClient, LargeFile) {
std::shared_ptr<HdfsReadableFile> file2;
ASSERT_OK(this->client_->OpenReadable(path, 1 << 18, &file2));
- auto buffer2 = std::make_shared<PoolBuffer>();
- ASSERT_OK(buffer2->Resize(size));
+ std::shared_ptr<MutableBuffer> buffer2;
+ ASSERT_OK(AllocateBuffer(nullptr, size, &buffer2));
+
ASSERT_OK(file2->Read(size, &bytes_read, buffer2->mutable_data()));
ASSERT_EQ(0, std::memcmp(buffer2->data(), data.data(), size));
ASSERT_EQ(size, bytes_read);
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/io/io-memory-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc
index c0b0165..442cd0c 100644
--- a/cpp/src/arrow/io/io-memory-test.cc
+++ b/cpp/src/arrow/io/io-memory-test.cc
@@ -73,8 +73,8 @@ TEST(TestBufferReader, RetainParentReference) {
std::shared_ptr<Buffer> slice1;
std::shared_ptr<Buffer> slice2;
{
- auto buffer = std::make_shared<PoolBuffer>();
- ASSERT_OK(buffer->Resize(static_cast<int64_t>(data.size())));
+ std::shared_ptr<MutableBuffer> buffer;
+ ASSERT_OK(AllocateBuffer(nullptr, static_cast<int64_t>(data.size()), &buffer));
std::memcpy(buffer->mutable_data(), data.c_str(), data.size());
BufferReader reader(buffer);
ASSERT_OK(reader.Read(4, &slice1));
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index c8e631c..3613ccb 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -17,6 +17,7 @@
#include "arrow/ipc/adapter.h"
+#include <algorithm>
#include <cstdint>
#include <cstring>
#include <sstream>
@@ -30,6 +31,7 @@
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
+#include "arrow/memory_pool.h"
#include "arrow/schema.h"
#include "arrow/status.h"
#include "arrow/table.h"
@@ -49,9 +51,10 @@ namespace ipc {
class RecordBatchWriter : public ArrayVisitor {
public:
- RecordBatchWriter(
- const RecordBatch& batch, int64_t buffer_start_offset, int max_recursion_depth)
- : batch_(batch),
+ RecordBatchWriter(MemoryPool* pool, const RecordBatch& batch,
+ int64_t buffer_start_offset, int max_recursion_depth)
+ : pool_(pool),
+ batch_(batch),
max_recursion_depth_(max_recursion_depth),
buffer_start_offset_(buffer_start_offset) {}
@@ -62,7 +65,15 @@ class RecordBatchWriter : public ArrayVisitor {
// push back all common elements
field_nodes_.push_back(flatbuf::FieldNode(arr.length(), arr.null_count()));
if (arr.null_count() > 0) {
- buffers_.push_back(arr.null_bitmap());
+ std::shared_ptr<Buffer> bitmap = arr.null_bitmap();
+
+ if (arr.offset() != 0) {
+ // With a sliced array / non-zero offset, we must copy the bitmap
+ RETURN_NOT_OK(
+ CopyBitmap(pool_, bitmap->data(), arr.offset(), arr.length(), &bitmap));
+ }
+
+ buffers_.push_back(bitmap);
} else {
// Push a dummy zero-length buffer, not to be copied
buffers_.push_back(std::make_shared<Buffer>(nullptr, 0));
@@ -208,50 +219,136 @@ class RecordBatchWriter : public ArrayVisitor {
private:
Status Visit(const NullArray& array) override { return Status::NotImplemented("null"); }
- Status VisitPrimitive(const PrimitiveArray& array) {
- buffers_.push_back(array.data());
+ template <typename ArrayType>
+ Status VisitFixedWidth(const ArrayType& array) {
+ std::shared_ptr<Buffer> data_buffer = array.data();
+
+ if (array.offset() != 0) {
+ // Non-zero offset, slice the buffer
+ const auto& fw_type = static_cast<const FixedWidthType&>(*array.type());
+ const int type_width = fw_type.bit_width() / 8;
+ const int64_t byte_offset = array.offset() * type_width;
+
+ // Send padding if it's available
+ const int64_t buffer_length =
+ std::min(BitUtil::RoundUpToMultipleOf64(array.length() * type_width),
+ data_buffer->size() - byte_offset);
+ data_buffer = SliceBuffer(data_buffer, byte_offset, buffer_length);
+ }
+ buffers_.push_back(data_buffer);
+ return Status::OK();
+ }
+
+ template <typename ArrayType>
+ Status GetZeroBasedValueOffsets(
+ const ArrayType& array, std::shared_ptr<Buffer>* value_offsets) {
+ // Share slicing logic between ListArray and BinaryArray
+
+ auto offsets = array.value_offsets();
+
+ if (array.offset() != 0) {
+ // If we have a non-zero offset, then the value offsets do not start at
+ // zero. We must a) create a new offsets array with shifted offsets and
+ // b) slice the values array accordingly
+
+ std::shared_ptr<MutableBuffer> shifted_offsets;
+ RETURN_NOT_OK(AllocateBuffer(
+ pool_, sizeof(int32_t) * (array.length() + 1), &shifted_offsets));
+
+ int32_t* dest_offsets = reinterpret_cast<int32_t*>(shifted_offsets->mutable_data());
+ const int32_t start_offset = array.value_offset(0);
+
+ for (int i = 0; i < array.length(); ++i) {
+ dest_offsets[i] = array.value_offset(i) - start_offset;
+ }
+ // Final offset
+ dest_offsets[array.length()] = array.value_offset(array.length()) - start_offset;
+ offsets = shifted_offsets;
+ }
+
+ *value_offsets = offsets;
return Status::OK();
}
Status VisitBinary(const BinaryArray& array) {
- buffers_.push_back(array.offsets());
- buffers_.push_back(array.data());
+ std::shared_ptr<Buffer> value_offsets;
+ RETURN_NOT_OK(GetZeroBasedValueOffsets<BinaryArray>(array, &value_offsets));
+ auto data = array.data();
+
+ if (array.offset() != 0) {
+ // Slice the data buffer to include only the range we need now
+ data = SliceBuffer(data, array.value_offset(0), array.value_offset(array.length()));
+ }
+
+ buffers_.push_back(value_offsets);
+ buffers_.push_back(data);
return Status::OK();
}
- Status Visit(const BooleanArray& array) override { return VisitPrimitive(array); }
+ Status Visit(const BooleanArray& array) override {
+ buffers_.push_back(array.data());
+ return Status::OK();
+ }
- Status Visit(const Int8Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const Int8Array& array) override {
+ return VisitFixedWidth<Int8Array>(array);
+ }
- Status Visit(const Int16Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const Int16Array& array) override {
+ return VisitFixedWidth<Int16Array>(array);
+ }
- Status Visit(const Int32Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const Int32Array& array) override {
+ return VisitFixedWidth<Int32Array>(array);
+ }
- Status Visit(const Int64Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const Int64Array& array) override {
+ return VisitFixedWidth<Int64Array>(array);
+ }
- Status Visit(const UInt8Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const UInt8Array& array) override {
+ return VisitFixedWidth<UInt8Array>(array);
+ }
- Status Visit(const UInt16Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const UInt16Array& array) override {
+ return VisitFixedWidth<UInt16Array>(array);
+ }
- Status Visit(const UInt32Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const UInt32Array& array) override {
+ return VisitFixedWidth<UInt32Array>(array);
+ }
- Status Visit(const UInt64Array& array) override { return VisitPrimitive(array); }
+ Status Visit(const UInt64Array& array) override {
+ return VisitFixedWidth<UInt64Array>(array);
+ }
- Status Visit(const HalfFloatArray& array) override { return VisitPrimitive(array); }
+ Status Visit(const HalfFloatArray& array) override {
+ return VisitFixedWidth<HalfFloatArray>(array);
+ }
- Status Visit(const FloatArray& array) override { return VisitPrimitive(array); }
+ Status Visit(const FloatArray& array) override {
+ return VisitFixedWidth<FloatArray>(array);
+ }
- Status Visit(const DoubleArray& array) override { return VisitPrimitive(array); }
+ Status Visit(const DoubleArray& array) override {
+ return VisitFixedWidth<DoubleArray>(array);
+ }
Status Visit(const StringArray& array) override { return VisitBinary(array); }
Status Visit(const BinaryArray& array) override { return VisitBinary(array); }
- Status Visit(const DateArray& array) override { return VisitPrimitive(array); }
+ Status Visit(const DateArray& array) override {
+ return VisitFixedWidth<DateArray>(array);
+ }
- Status Visit(const TimeArray& array) override { return VisitPrimitive(array); }
+ Status Visit(const TimeArray& array) override {
+ return VisitFixedWidth<TimeArray>(array);
+ }
- Status Visit(const TimestampArray& array) override { return VisitPrimitive(array); }
+ Status Visit(const TimestampArray& array) override {
+ return VisitFixedWidth<TimestampArray>(array);
+ }
Status Visit(const IntervalArray& array) override {
return Status::NotImplemented("interval");
@@ -262,30 +359,109 @@ class RecordBatchWriter : public ArrayVisitor {
}
Status Visit(const ListArray& array) override {
- buffers_.push_back(array.offsets());
+ std::shared_ptr<Buffer> value_offsets;
+ RETURN_NOT_OK(GetZeroBasedValueOffsets<ListArray>(array, &value_offsets));
+ buffers_.push_back(value_offsets);
+
--max_recursion_depth_;
- RETURN_NOT_OK(VisitArray(*array.values().get()));
+ std::shared_ptr<Array> values = array.values();
+
+ if (array.offset() != 0) {
+ // For non-zero offset, we slice the values array accordingly
+ const int32_t offset = array.value_offset(0);
+ const int32_t length = array.value_offset(array.length()) - offset;
+ values = values->Slice(offset, length);
+ }
+ RETURN_NOT_OK(VisitArray(*values));
++max_recursion_depth_;
return Status::OK();
}
Status Visit(const StructArray& array) override {
--max_recursion_depth_;
- for (const auto& field : array.fields()) {
- RETURN_NOT_OK(VisitArray(*field.get()));
+ for (std::shared_ptr<Array> field : array.fields()) {
+ if (array.offset() != 0) {
+ // If offset is non-zero, slice the child array
+ field = field->Slice(array.offset(), array.length());
+ }
+ RETURN_NOT_OK(VisitArray(*field));
}
++max_recursion_depth_;
return Status::OK();
}
Status Visit(const UnionArray& array) override {
- buffers_.push_back(array.type_ids());
+ auto type_ids = array.type_ids();
+ if (array.offset() != 0) {
+ type_ids = SliceBuffer(type_ids, array.offset() * sizeof(UnionArray::type_id_t),
+ array.length() * sizeof(UnionArray::type_id_t));
+ }
- if (array.mode() == UnionMode::DENSE) { buffers_.push_back(array.offsets()); }
+ buffers_.push_back(type_ids);
--max_recursion_depth_;
- for (const auto& field : array.children()) {
- RETURN_NOT_OK(VisitArray(*field.get()));
+ if (array.mode() == UnionMode::DENSE) {
+ const auto& type = static_cast<const UnionType&>(*array.type());
+ auto value_offsets = array.value_offsets();
+
+ // The Union type codes are not necessary 0-indexed
+ uint8_t max_code = 0;
+ for (uint8_t code : type.type_codes) {
+ if (code > max_code) { max_code = code; }
+ }
+
+ // Allocate an array of child offsets. Set all to -1 to indicate that we
+ // haven't observed a first occurrence of a particular child yet
+ std::vector<int32_t> child_offsets(max_code + 1);
+ std::vector<int32_t> child_lengths(max_code + 1, 0);
+
+ if (array.offset() != 0) {
+ // This is an unpleasant case. Because the offsets are different for
+ // each child array, when we have a sliced array, we need to "rebase"
+ // the value_offsets for each array
+
+ const int32_t* unshifted_offsets = array.raw_value_offsets();
+ const uint8_t* type_ids = array.raw_type_ids();
+
+ // Allocate the shifted offsets
+ std::shared_ptr<MutableBuffer> shifted_offsets_buffer;
+ RETURN_NOT_OK(AllocateBuffer(
+ pool_, array.length() * sizeof(int32_t), &shifted_offsets_buffer));
+ int32_t* shifted_offsets =
+ reinterpret_cast<int32_t*>(shifted_offsets_buffer->mutable_data());
+
+ for (int32_t i = 0; i < array.length(); ++i) {
+ const uint8_t code = type_ids[i];
+ int32_t shift = child_offsets[code];
+ if (shift == -1) { child_offsets[code] = shift = unshifted_offsets[i]; }
+ shifted_offsets[i] = unshifted_offsets[i] - shift;
+
+ // Update the child length to account for observed value
+ ++child_lengths[code];
+ }
+
+ value_offsets = shifted_offsets_buffer;
+ }
+ buffers_.push_back(value_offsets);
+
+ // Visit children and slice accordingly
+ for (int i = 0; i < type.num_children(); ++i) {
+ std::shared_ptr<Array> child = array.child(i);
+ if (array.offset() != 0) {
+ const uint8_t code = type.type_codes[i];
+ child = child->Slice(child_offsets[code], child_lengths[code]);
+ }
+ RETURN_NOT_OK(VisitArray(*child));
+ }
+ } else {
+ for (std::shared_ptr<Array> child : array.children()) {
+ // Sparse union, slicing is simpler
+ if (array.offset() != 0) {
+ // If offset is non-zero, slice the child array
+ child = child->Slice(array.offset(), array.length());
+ }
+ RETURN_NOT_OK(VisitArray(*child));
+ }
}
++max_recursion_depth_;
return Status::OK();
@@ -298,6 +474,8 @@ class RecordBatchWriter : public ArrayVisitor {
return Status::OK();
}
+ // In some cases, intermediate buffers may need to be allocated (with sliced arrays)
+ MemoryPool* pool_;
const RecordBatch& batch_;
std::vector<flatbuf::FieldNode> field_nodes_;
@@ -310,14 +488,14 @@ class RecordBatchWriter : public ArrayVisitor {
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- int max_recursion_depth) {
+ MemoryPool* pool, int max_recursion_depth) {
DCHECK_GT(max_recursion_depth, 0);
- RecordBatchWriter serializer(batch, buffer_start_offset, max_recursion_depth);
+ RecordBatchWriter serializer(pool, batch, buffer_start_offset, max_recursion_depth);
return serializer.Write(dst, metadata_length, body_length);
}
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
- RecordBatchWriter serializer(batch, 0, kMaxIpcRecursionDepth);
+ RecordBatchWriter serializer(default_memory_pool(), batch, 0, kMaxIpcRecursionDepth);
RETURN_NOT_OK(serializer.GetTotalSize(size));
return Status::OK();
}
@@ -372,7 +550,7 @@ class ArrayLoader : public TypeVisitor {
BufferMetadata metadata = context_->metadata->buffer(buffer_index);
if (metadata.length == 0) {
- *out = std::make_shared<Buffer>(nullptr, 0);
+ *out = nullptr;
return Status::OK();
} else {
return file_->ReadAt(metadata.offset, metadata.length, out);
@@ -412,8 +590,8 @@ class ArrayLoader : public TypeVisitor {
context_->buffer_index++;
data.reset(new Buffer(nullptr, 0));
}
- return MakePrimitiveArray(field_.type, field_meta.length, data, field_meta.null_count,
- null_bitmap, &result_);
+ return MakePrimitiveArray(field_.type, field_meta.length, data, null_bitmap,
+ field_meta.null_count, 0, &result_);
}
template <typename CONTAINER>
@@ -428,7 +606,7 @@ class ArrayLoader : public TypeVisitor {
RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
result_ = std::make_shared<CONTAINER>(
- field_meta.length, offsets, values, field_meta.null_count, null_bitmap);
+ field_meta.length, offsets, values, null_bitmap, field_meta.null_count);
return Status::OK();
}
@@ -496,7 +674,7 @@ class ArrayLoader : public TypeVisitor {
RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array));
result_ = std::make_shared<ListArray>(field_.type, field_meta.length, offsets,
- values_array, field_meta.null_count, null_bitmap);
+ values_array, null_bitmap, field_meta.null_count);
return Status::OK();
}
@@ -521,7 +699,7 @@ class ArrayLoader : public TypeVisitor {
RETURN_NOT_OK(LoadChildren(type.children(), &fields));
result_ = std::make_shared<StructArray>(
- field_.type, field_meta.length, fields, field_meta.null_count, null_bitmap);
+ field_.type, field_meta.length, fields, null_bitmap, field_meta.null_count);
return Status::OK();
}
@@ -542,7 +720,7 @@ class ArrayLoader : public TypeVisitor {
RETURN_NOT_OK(LoadChildren(type.children(), &fields));
result_ = std::make_shared<UnionArray>(field_.type, field_meta.length, fields,
- type_ids, offsets, field_meta.null_count, null_bitmap);
+ type_ids, offsets, null_bitmap, field_meta.null_count);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index f9ef7d9..83542d0 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -30,6 +30,7 @@
namespace arrow {
class Array;
+class MemoryPool;
class RecordBatch;
class Schema;
class Status;
@@ -71,14 +72,15 @@ constexpr int kMaxIpcRecursionDepth = 64;
//
// @param(out) body_length: the size of the contiguous buffer block plus
// padding bytes
-ARROW_EXPORT Status WriteRecordBatch(const RecordBatch& batch,
+Status ARROW_EXPORT WriteRecordBatch(const RecordBatch& batch,
int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
- int64_t* body_length, int max_recursion_depth = kMaxIpcRecursionDepth);
+ int64_t* body_length, MemoryPool* pool,
+ int max_recursion_depth = kMaxIpcRecursionDepth);
// Compute the precise number of bytes needed in a contiguous memory segment to
// write the record batch. This involves generating the complete serialized
// Flatbuffers metadata.
-ARROW_EXPORT Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
+Status ARROW_EXPORT GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
// ----------------------------------------------------------------------
// "Read" path; does not copy data if the input supports zero copy reads
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/ipc-adapter-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-adapter-test.cc b/cpp/src/arrow/ipc/ipc-adapter-test.cc
index 17868f8..bae6578 100644
--- a/cpp/src/arrow/ipc/ipc-adapter-test.cc
+++ b/cpp/src/arrow/ipc/ipc-adapter-test.cc
@@ -32,6 +32,7 @@
#include "arrow/buffer.h"
#include "arrow/memory_pool.h"
+#include "arrow/pretty_print.h"
#include "arrow/status.h"
#include "arrow/test-util.h"
#include "arrow/util/bit-util.h"
@@ -56,7 +57,7 @@ class TestWriteRecordBatch : public ::testing::TestWithParam<MakeRecordBatch*>,
const int64_t buffer_offset = 0;
RETURN_NOT_OK(WriteRecordBatch(
- batch, buffer_offset, mmap_.get(), &metadata_length, &body_length));
+ batch, buffer_offset, mmap_.get(), &metadata_length, &body_length, pool_));
std::shared_ptr<RecordBatchMetadata> metadata;
RETURN_NOT_OK(ReadRecordBatchMetadata(0, metadata_length, mmap_.get(), &metadata));
@@ -92,17 +93,49 @@ TEST_P(TestWriteRecordBatch, RoundTrip) {
}
}
-INSTANTIATE_TEST_CASE_P(RoundTripTests, TestWriteRecordBatch,
- ::testing::Values(&MakeIntRecordBatch, &MakeListRecordBatch, &MakeNonNullRecordBatch,
- &MakeZeroLengthRecordBatch, &MakeDeeplyNestedList,
- &MakeStringTypesRecordBatch, &MakeStruct, &MakeUnion));
+TEST_P(TestWriteRecordBatch, SliceRoundTrip) {
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK((*GetParam())(&batch)); // NOLINT clang-tidy gtest issue
+ std::shared_ptr<RecordBatch> batch_result;
+
+ // Skip the zero-length case
+ if (batch->num_rows() < 2) { return; }
+
+ auto sliced_batch = batch->Slice(2, 10);
+
+ ASSERT_OK(RoundTripHelper(*sliced_batch, 1 << 16, &batch_result));
+
+ EXPECT_EQ(sliced_batch->num_rows(), batch_result->num_rows());
+
+ for (int i = 0; i < sliced_batch->num_columns(); ++i) {
+ const auto& left = *sliced_batch->column(i);
+ const auto& right = *batch_result->column(i);
+ if (!left.Equals(right)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ ASSERT_OK(PrettyPrint(left, 0, &pp_expected));
+ ASSERT_OK(PrettyPrint(right, 0, &pp_result));
+
+ FAIL() << "Index: " << i << " Expected: " << pp_expected.str()
+ << "\nGot: " << pp_result.str();
+ }
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(
+ RoundTripTests, TestWriteRecordBatch,
+ ::testing::Values(&MakeIntRecordBatch, &MakeStringTypesRecordBatch,
+ &MakeNonNullRecordBatch, &MakeZeroLengthRecordBatch, &MakeListRecordBatch,
+ &MakeDeeplyNestedList, &MakeStruct, &MakeUnion));
void TestGetRecordBatchSize(std::shared_ptr<RecordBatch> batch) {
ipc::MockOutputStream mock;
int32_t mock_metadata_length = -1;
int64_t mock_body_length = -1;
int64_t size = -1;
- ASSERT_OK(WriteRecordBatch(*batch, 0, &mock, &mock_metadata_length, &mock_body_length));
+ ASSERT_OK(WriteRecordBatch(
+ *batch, 0, &mock, &mock_metadata_length, &mock_body_length, default_memory_pool()));
ASSERT_OK(GetRecordBatchSize(*batch, &size));
ASSERT_EQ(mock.GetExtentBytesWritten(), size);
}
@@ -156,10 +189,11 @@ class RecursionLimits : public ::testing::Test, public io::MemoryMapFixture {
io::MemoryMapFixture::InitMemoryMap(memory_map_size, path, &mmap_);
if (override_level) {
- return WriteRecordBatch(
- *batch, 0, mmap_.get(), metadata_length, body_length, recursion_level + 1);
+ return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length, pool_,
+ recursion_level + 1);
} else {
- return WriteRecordBatch(*batch, 0, mmap_.get(), metadata_length, body_length);
+ return WriteRecordBatch(
+ *batch, 0, mmap_.get(), metadata_length, body_length, pool_);
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/ipc-json-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/ipc-json-test.cc b/cpp/src/arrow/ipc/ipc-json-test.cc
index 30f968c..3e759cc 100644
--- a/cpp/src/arrow/ipc/ipc-json-test.cc
+++ b/cpp/src/arrow/ipc/ipc-json-test.cc
@@ -80,7 +80,7 @@ template <typename T, typename ValueType>
void CheckPrimitive(const std::shared_ptr<DataType>& type,
const std::vector<bool>& is_valid, const std::vector<ValueType>& values) {
MemoryPool* pool = default_memory_pool();
- typename TypeTraits<T>::BuilderType builder(pool, type);
+ typename TypeTraits<T>::BuilderType builder(pool);
for (size_t i = 0; i < values.size(); ++i) {
if (is_valid[i]) {
@@ -146,12 +146,11 @@ TEST(TestJsonArrayWriter, NestedTypes) {
std::vector<int32_t> values = {0, 1, 2, 3, 4, 5, 6};
std::shared_ptr<Array> values_array;
- ArrayFromVector<Int32Type, int32_t>(int32(), values_is_valid, values, &values_array);
+ ArrayFromVector<Int32Type, int32_t>(values_is_valid, values, &values_array);
std::vector<int16_t> i16_values = {0, 1, 2, 3, 4, 5, 6};
std::shared_ptr<Array> i16_values_array;
- ArrayFromVector<Int16Type, int16_t>(
- int16(), values_is_valid, i16_values, &i16_values_array);
+ ArrayFromVector<Int16Type, int16_t>(values_is_valid, i16_values, &i16_values_array);
// List
std::vector<bool> list_is_valid = {true, false, true, true, true};
@@ -161,7 +160,7 @@ TEST(TestJsonArrayWriter, NestedTypes) {
ASSERT_OK(test::GetBitmapFromBoolVector(list_is_valid, &list_bitmap));
std::shared_ptr<Buffer> offsets_buffer = test::GetBufferFromVector(offsets);
- ListArray list_array(list(value_type), 5, offsets_buffer, values_array, 1, list_bitmap);
+ ListArray list_array(list(value_type), 5, offsets_buffer, values_array, list_bitmap, 1);
TestArrayRoundTrip(list_array);
@@ -175,7 +174,7 @@ TEST(TestJsonArrayWriter, NestedTypes) {
std::vector<std::shared_ptr<Array>> fields = {values_array, values_array, values_array};
StructArray struct_array(
- struct_type, static_cast<int>(struct_is_valid.size()), fields, 2, struct_bitmap);
+ struct_type, static_cast<int>(struct_is_valid.size()), fields, struct_bitmap, 2);
TestArrayRoundTrip(struct_array);
}
@@ -202,15 +201,15 @@ void MakeBatchArrays(const std::shared_ptr<Schema>& schema, const int num_rows,
test::randint<int32_t>(num_rows, 0, 100, &v2_values);
std::shared_ptr<Array> v1;
- ArrayFromVector<Int8Type, int8_t>(schema->field(0)->type, is_valid, v1_values, &v1);
+ ArrayFromVector<Int8Type, int8_t>(is_valid, v1_values, &v1);
std::shared_ptr<Array> v2;
- ArrayFromVector<Int32Type, int32_t>(schema->field(1)->type, is_valid, v2_values, &v2);
+ ArrayFromVector<Int32Type, int32_t>(is_valid, v2_values, &v2);
static const int kBufferSize = 10;
static uint8_t buffer[kBufferSize];
static uint32_t seed = 0;
- StringBuilder string_builder(default_memory_pool(), utf8());
+ StringBuilder string_builder(default_memory_pool());
for (int i = 0; i < num_rows; ++i) {
if (!is_valid[i]) {
string_builder.AppendNull();
@@ -338,13 +337,13 @@ TEST(TestJsonFileReadWrite, MinimalFormatExample) {
std::vector<bool> foo_valid = {true, false, true, true, true};
std::vector<int32_t> foo_values = {1, 2, 3, 4, 5};
std::shared_ptr<Array> foo;
- ArrayFromVector<Int32Type, int32_t>(int32(), foo_valid, foo_values, &foo);
+ ArrayFromVector<Int32Type, int32_t>(foo_valid, foo_values, &foo);
ASSERT_TRUE(batch->column(0)->Equals(foo));
std::vector<bool> bar_valid = {true, false, false, true, true};
std::vector<double> bar_values = {1, 2, 3, 4, 5};
std::shared_ptr<Array> bar;
- ArrayFromVector<DoubleType, double>(float64(), bar_valid, bar_values, &bar);
+ ArrayFromVector<DoubleType, double>(bar_valid, bar_values, &bar);
ASSERT_TRUE(batch->column(1)->Equals(bar));
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/json-integration-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-integration-test.cc b/cpp/src/arrow/ipc/json-integration-test.cc
index 95bc742..17ccc4a 100644
--- a/cpp/src/arrow/ipc/json-integration-test.cc
+++ b/cpp/src/arrow/ipc/json-integration-test.cc
@@ -144,10 +144,8 @@ static Status ValidateArrowVsJson(
if (!json_schema->Equals(arrow_schema)) {
std::stringstream ss;
- ss << "JSON schema: \n"
- << json_schema->ToString() << "\n"
- << "Arrow schema: \n"
- << arrow_schema->ToString();
+ ss << "JSON schema: \n" << json_schema->ToString() << "\n"
+ << "Arrow schema: \n" << arrow_schema->ToString();
if (FLAGS_verbose) { std::cout << ss.str() << std::endl; }
return Status::Invalid("Schemas did not match");
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/json-internal.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/json-internal.cc b/cpp/src/arrow/ipc/json-internal.cc
index 43bd8a4..1a95b2c 100644
--- a/cpp/src/arrow/ipc/json-internal.cc
+++ b/cpp/src/arrow/ipc/json-internal.cc
@@ -199,8 +199,8 @@ class JsonSchemaWriter : public TypeVisitor {
// Write type ids
writer_->Key("typeIds");
writer_->StartArray();
- for (size_t i = 0; i < type.type_ids.size(); ++i) {
- writer_->Uint(type.type_ids[i]);
+ for (size_t i = 0; i < type.type_codes.size(); ++i) {
+ writer_->Uint(type.type_codes[i]);
}
writer_->EndArray();
}
@@ -464,7 +464,7 @@ class JsonArrayWriter : public ArrayVisitor {
template <typename T>
Status WriteVarBytes(const T& array) {
WriteValidityField(array);
- WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1);
+ WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
WriteDataField(array);
SetNoChildren();
return Status::OK();
@@ -532,7 +532,7 @@ class JsonArrayWriter : public ArrayVisitor {
Status Visit(const ListArray& array) override {
WriteValidityField(array);
- WriteIntegerField("OFFSET", array.raw_offsets(), array.length() + 1);
+ WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length() + 1);
auto type = static_cast<const ListType*>(array.type().get());
return WriteChildren(type->children(), {array.values()});
}
@@ -549,7 +549,7 @@ class JsonArrayWriter : public ArrayVisitor {
WriteIntegerField("TYPE_ID", array.raw_type_ids(), array.length());
if (type->mode == UnionMode::DENSE) {
- WriteIntegerField("OFFSET", array.raw_offsets(), array.length());
+ WriteIntegerField("OFFSET", array.raw_value_offsets(), array.length());
}
return WriteChildren(type->children(), array.children());
}
@@ -718,17 +718,17 @@ class JsonSchemaReader {
return Status::Invalid(ss.str());
}
- const auto& json_type_ids = json_type.FindMember("typeIds");
- RETURN_NOT_ARRAY("typeIds", json_type_ids, json_type);
+ const auto& json_type_codes = json_type.FindMember("typeIds");
+ RETURN_NOT_ARRAY("typeIds", json_type_codes, json_type);
- std::vector<uint8_t> type_ids;
- const auto& id_array = json_type_ids->value.GetArray();
+ std::vector<uint8_t> type_codes;
+ const auto& id_array = json_type_codes->value.GetArray();
for (const rj::Value& val : id_array) {
DCHECK(val.IsUint());
- type_ids.push_back(val.GetUint());
+ type_codes.push_back(val.GetUint());
}
- *type = union_(children, type_ids, mode);
+ *type = union_(children, type_codes, mode);
return Status::OK();
}
@@ -844,7 +844,7 @@ class JsonArrayReader {
typename std::enable_if<std::is_base_of<BinaryType, T>::value, Status>::type ReadArray(
const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
- typename TypeTraits<T>::BuilderType builder(pool_, type);
+ typename TypeTraits<T>::BuilderType builder(pool_);
const auto& json_data = json_array.FindMember("DATA");
RETURN_NOT_ARRAY("DATA", json_data, json_array);
@@ -869,8 +869,9 @@ class JsonArrayReader {
template <typename T>
Status GetIntArray(
const RjArray& json_array, const int32_t length, std::shared_ptr<Buffer>* out) {
- auto buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(buffer->Resize(length * sizeof(T)));
+ std::shared_ptr<MutableBuffer> buffer;
+ RETURN_NOT_OK(AllocateBuffer(pool_, length * sizeof(T), &buffer));
+
T* values = reinterpret_cast<T*>(buffer->mutable_data());
for (int i = 0; i < length; ++i) {
const rj::Value& val = json_array[i];
@@ -901,7 +902,7 @@ class JsonArrayReader {
DCHECK_EQ(children.size(), 1);
*array = std::make_shared<ListArray>(
- type, length, offsets_buffer, children[0], null_count, validity_buffer);
+ type, length, offsets_buffer, children[0], validity_buffer, null_count);
return Status::OK();
}
@@ -918,7 +919,7 @@ class JsonArrayReader {
RETURN_NOT_OK(GetChildren(json_array, type, &fields));
*array =
- std::make_shared<StructArray>(type, length, fields, null_count, validity_buffer);
+ std::make_shared<StructArray>(type, length, fields, validity_buffer, null_count);
return Status::OK();
}
@@ -953,7 +954,7 @@ class JsonArrayReader {
RETURN_NOT_OK(GetChildren(json_array, type, &children));
*array = std::make_shared<UnionArray>(type, length, children, type_id_buffer,
- offsets_buffer, null_count, validity_buffer);
+ offsets_buffer, validity_buffer, null_count);
return Status::OK();
}
@@ -962,7 +963,7 @@ class JsonArrayReader {
typename std::enable_if<std::is_base_of<NullType, T>::value, Status>::type ReadArray(
const RjObject& json_array, int32_t length, const std::vector<bool>& is_valid,
const std::shared_ptr<DataType>& type, std::shared_ptr<Array>* array) {
- *array = std::make_shared<NullArray>(type, length);
+ *array = std::make_shared<NullArray>(length);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/stream.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.cc b/cpp/src/arrow/ipc/stream.cc
index c9057e8..72eb134 100644
--- a/cpp/src/arrow/ipc/stream.cc
+++ b/cpp/src/arrow/ipc/stream.cc
@@ -28,6 +28,7 @@
#include "arrow/ipc/adapter.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
+#include "arrow/memory_pool.h"
#include "arrow/schema.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
@@ -41,7 +42,11 @@ namespace ipc {
StreamWriter::~StreamWriter() {}
StreamWriter::StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema)
- : sink_(sink), schema_(schema), position_(-1), started_(false) {}
+ : sink_(sink),
+ schema_(schema),
+ pool_(default_memory_pool()),
+ position_(-1),
+ started_(false) {}
Status StreamWriter::UpdatePosition() {
return sink_->Tell(&position_);
@@ -76,8 +81,8 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block
// Frame of reference in file format is 0, see ARROW-384
const int64_t buffer_start_offset = 0;
- RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(
- batch, buffer_start_offset, sink_, &block->metadata_length, &block->body_length));
+ RETURN_NOT_OK(arrow::ipc::WriteRecordBatch(batch, buffer_start_offset, sink_,
+ &block->metadata_length, &block->body_length, pool_));
RETURN_NOT_OK(UpdatePosition());
DCHECK(position_ % 8 == 0) << "WriteRecordBatch did not perform aligned writes";
@@ -85,6 +90,10 @@ Status StreamWriter::WriteRecordBatch(const RecordBatch& batch, FileBlock* block
return Status::OK();
}
+void StreamWriter::set_memory_pool(MemoryPool* pool) {
+ pool_ = pool;
+}
+
// ----------------------------------------------------------------------
// StreamWriter implementation
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/stream.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/stream.h b/cpp/src/arrow/ipc/stream.h
index 53f51dc..12414fa 100644
--- a/cpp/src/arrow/ipc/stream.h
+++ b/cpp/src/arrow/ipc/stream.h
@@ -30,6 +30,7 @@ namespace arrow {
class Array;
class Buffer;
struct Field;
+class MemoryPool;
class RecordBatch;
class Schema;
class Status;
@@ -59,6 +60,10 @@ class ARROW_EXPORT StreamWriter {
/// closing the actual OutputStream
virtual Status Close();
+ // In some cases, writing may require memory allocation. We use the default
+ // memory pool, but provide the option to override
+ void set_memory_pool(MemoryPool* pool);
+
protected:
StreamWriter(io::OutputStream* sink, const std::shared_ptr<Schema>& schema);
@@ -81,6 +86,9 @@ class ARROW_EXPORT StreamWriter {
io::OutputStream* sink_;
std::shared_ptr<Schema> schema_;
+
+ MemoryPool* pool_;
+
int64_t position_;
bool started_;
};
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/ipc/test-common.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h
index ca790de..b4930c4 100644
--- a/cpp/src/arrow/ipc/test-common.h
+++ b/cpp/src/arrow/ipc/test-common.h
@@ -21,6 +21,7 @@
#include <algorithm>
#include <cstdint>
#include <memory>
+#include <numeric>
#include <string>
#include <vector>
@@ -28,6 +29,7 @@
#include "arrow/buffer.h"
#include "arrow/builder.h"
#include "arrow/memory_pool.h"
+#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/test-util.h"
#include "arrow/type.h"
@@ -104,8 +106,8 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
const int length = 1000;
// Make the schema
- auto f0 = std::make_shared<Field>("f0", int32());
- auto f1 = std::make_shared<Field>("f1", int32());
+ auto f0 = field("f0", int32());
+ auto f1 = field("f1", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1}));
// Example data
@@ -119,10 +121,10 @@ Status MakeIntRecordBatch(std::shared_ptr<RecordBatch>* out) {
template <class Builder, class RawType>
Status MakeRandomBinaryArray(
- const TypePtr& type, int32_t length, MemoryPool* pool, std::shared_ptr<Array>* out) {
+ int32_t length, MemoryPool* pool, std::shared_ptr<Array>* out) {
const std::vector<std::string> values = {
"", "", "abc", "123", "efg", "456!@#!@#", "12312"};
- Builder builder(pool, type);
+ Builder builder(pool);
const auto values_len = values.size();
for (int32_t i = 0; i < length; ++i) {
int values_index = i % values_len;
@@ -141,22 +143,22 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
const int32_t length = 500;
auto string_type = utf8();
auto binary_type = binary();
- auto f0 = std::make_shared<Field>("f0", string_type);
- auto f1 = std::make_shared<Field>("f1", binary_type);
+ auto f0 = field("f0", string_type);
+ auto f1 = field("f1", binary_type);
std::shared_ptr<Schema> schema(new Schema({f0, f1}));
std::shared_ptr<Array> a0, a1;
MemoryPool* pool = default_memory_pool();
+ // Quirk with RETURN_NOT_OK macro and templated functions
{
- auto status =
- MakeRandomBinaryArray<StringBuilder, char>(string_type, length, pool, &a0);
- RETURN_NOT_OK(status);
+ auto s = MakeRandomBinaryArray<StringBuilder, char>(length, pool, &a0);
+ RETURN_NOT_OK(s);
}
+
{
- auto status =
- MakeRandomBinaryArray<BinaryBuilder, uint8_t>(binary_type, length, pool, &a1);
- RETURN_NOT_OK(status);
+ auto s = MakeRandomBinaryArray<BinaryBuilder, uint8_t>(length, pool, &a1);
+ RETURN_NOT_OK(s);
}
out->reset(new RecordBatch(schema, length, {a0, a1}));
return Status::OK();
@@ -164,9 +166,9 @@ Status MakeStringTypesRecordBatch(std::shared_ptr<RecordBatch>* out) {
Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
- auto f0 = std::make_shared<Field>("f0", kListInt32);
- auto f1 = std::make_shared<Field>("f1", kListListInt32);
- auto f2 = std::make_shared<Field>("f2", int32());
+ auto f0 = field("f0", kListInt32);
+ auto f1 = field("f1", kListListInt32);
+ auto f2 = field("f2", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
// Example data
@@ -187,14 +189,13 @@ Status MakeListRecordBatch(std::shared_ptr<RecordBatch>* out) {
Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
- auto f0 = std::make_shared<Field>("f0", kListInt32);
- auto f1 = std::make_shared<Field>("f1", kListListInt32);
- auto f2 = std::make_shared<Field>("f2", int32());
+ auto f0 = field("f0", kListInt32);
+ auto f1 = field("f1", kListListInt32);
+ auto f2 = field("f2", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
// Example data
MemoryPool* pool = default_memory_pool();
- const int length = 200;
const bool include_nulls = true;
std::shared_ptr<Array> leaf_values, list_array, list_list_array, flat_array;
RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &leaf_values));
@@ -202,15 +203,15 @@ Status MakeZeroLengthRecordBatch(std::shared_ptr<RecordBatch>* out) {
RETURN_NOT_OK(
MakeRandomListArray(list_array, 0, include_nulls, pool, &list_list_array));
RETURN_NOT_OK(MakeRandomInt32Array(0, include_nulls, pool, &flat_array));
- out->reset(new RecordBatch(schema, length, {list_array, list_list_array, flat_array}));
+ out->reset(new RecordBatch(schema, 0, {list_array, list_list_array, flat_array}));
return Status::OK();
}
Status MakeNonNullRecordBatch(std::shared_ptr<RecordBatch>* out) {
// Make the schema
- auto f0 = std::make_shared<Field>("f0", kListInt32);
- auto f1 = std::make_shared<Field>("f1", kListListInt32);
- auto f2 = std::make_shared<Field>("f2", int32());
+ auto f0 = field("f0", kListInt32);
+ auto f1 = field("f1", kListListInt32);
+ auto f2 = field("f2", int32());
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
// Example data
@@ -242,7 +243,7 @@ Status MakeDeeplyNestedList(std::shared_ptr<RecordBatch>* out) {
RETURN_NOT_OK(MakeRandomListArray(array, batch_length, include_nulls, pool, &array));
}
- auto f0 = std::make_shared<Field>("f0", type);
+ auto f0 = field("f0", type);
std::shared_ptr<Schema> schema(new Schema({f0}));
std::vector<std::shared_ptr<Array>> arrays = {array};
out->reset(new RecordBatch(schema, batch_length, arrays));
@@ -260,8 +261,8 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
// Define schema
std::shared_ptr<DataType> type(new StructType(
{list_schema->field(0), list_schema->field(1), list_schema->field(2)}));
- auto f0 = std::make_shared<Field>("non_null_struct", type);
- auto f1 = std::make_shared<Field>("null_struct", type);
+ auto f0 = field("non_null_struct", type);
+ auto f1 = field("null_struct", type);
std::shared_ptr<Schema> schema(new Schema({f0, f1}));
// construct individual nullable/non-nullable struct arrays
@@ -271,7 +272,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
std::shared_ptr<Buffer> null_bitmask;
RETURN_NOT_OK(BitUtil::BytesToBits(null_bytes, &null_bitmask));
std::shared_ptr<Array> with_nulls(
- new StructArray(type, list_batch->num_rows(), columns, 1, null_bitmask));
+ new StructArray(type, list_batch->num_rows(), columns, null_bitmask, 1));
// construct batch
std::vector<std::shared_ptr<Array>> arrays = {no_nulls, with_nulls};
@@ -282,7 +283,7 @@ Status MakeStruct(std::shared_ptr<RecordBatch>* out) {
Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
// Define schema
std::vector<std::shared_ptr<Field>> union_types(
- {std::make_shared<Field>("u0", int32()), std::make_shared<Field>("u1", uint8())});
+ {field("u0", int32()), field("u1", uint8())});
std::vector<uint8_t> type_codes = {5, 10};
auto sparse_type =
@@ -291,9 +292,9 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
auto dense_type =
std::make_shared<UnionType>(union_types, type_codes, UnionMode::DENSE);
- auto f0 = std::make_shared<Field>("sparse_nonnull", sparse_type, false);
- auto f1 = std::make_shared<Field>("sparse", sparse_type);
- auto f2 = std::make_shared<Field>("dense", dense_type);
+ auto f0 = field("sparse_nonnull", sparse_type, false);
+ auto f1 = field("sparse", sparse_type);
+ auto f2 = field("dense", dense_type);
std::shared_ptr<Schema> schema(new Schema({f0, f1, f2}));
@@ -308,21 +309,17 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
RETURN_NOT_OK(test::CopyBufferFromVector(type_ids, &type_ids_buffer));
std::vector<int32_t> u0_values = {0, 1, 2, 3, 4, 5, 6};
- ArrayFromVector<Int32Type, int32_t>(
- sparse_type->child(0)->type, u0_values, &sparse_children[0]);
+ ArrayFromVector<Int32Type, int32_t>(u0_values, &sparse_children[0]);
std::vector<uint8_t> u1_values = {10, 11, 12, 13, 14, 15, 16};
- ArrayFromVector<UInt8Type, uint8_t>(
- sparse_type->child(1)->type, u1_values, &sparse_children[1]);
+ ArrayFromVector<UInt8Type, uint8_t>(u1_values, &sparse_children[1]);
// dense children
u0_values = {0, 2, 3, 7};
- ArrayFromVector<Int32Type, int32_t>(
- dense_type->child(0)->type, u0_values, &dense_children[0]);
+ ArrayFromVector<Int32Type, int32_t>(u0_values, &dense_children[0]);
u1_values = {11, 14, 15};
- ArrayFromVector<UInt8Type, uint8_t>(
- dense_type->child(1)->type, u1_values, &dense_children[1]);
+ ArrayFromVector<UInt8Type, uint8_t>(u1_values, &dense_children[1]);
std::shared_ptr<Buffer> offsets_buffer;
std::vector<int32_t> offsets = {0, 0, 1, 2, 1, 2, 3};
@@ -337,10 +334,10 @@ Status MakeUnion(std::shared_ptr<RecordBatch>* out) {
auto sparse_no_nulls =
std::make_shared<UnionArray>(sparse_type, length, sparse_children, type_ids_buffer);
auto sparse = std::make_shared<UnionArray>(
- sparse_type, length, sparse_children, type_ids_buffer, nullptr, 1, null_bitmask);
+ sparse_type, length, sparse_children, type_ids_buffer, nullptr, null_bitmask, 1);
auto dense = std::make_shared<UnionArray>(dense_type, length, dense_children,
- type_ids_buffer, offsets_buffer, 1, null_bitmask);
+ type_ids_buffer, offsets_buffer, null_bitmask, 1);
// construct batch
std::vector<std::shared_ptr<Array>> arrays = {sparse_no_nulls, sparse, dense};
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/pretty_print-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print-test.cc b/cpp/src/arrow/pretty_print-test.cc
index 4725d5d..aca650f 100644
--- a/cpp/src/arrow/pretty_print-test.cc
+++ b/cpp/src/arrow/pretty_print-test.cc
@@ -55,7 +55,7 @@ template <typename TYPE, typename C_TYPE>
void CheckPrimitive(int indent, const std::vector<bool>& is_valid,
const std::vector<C_TYPE>& values, const char* expected) {
std::shared_ptr<Array> array;
- ArrayFromVector<TYPE, C_TYPE>(std::make_shared<TYPE>(), is_valid, values, &array);
+ ArrayFromVector<TYPE, C_TYPE>(is_valid, values, &array);
CheckArray(*array.get(), indent, expected);
}
@@ -76,12 +76,12 @@ TEST_F(TestPrettyPrint, DictionaryType) {
std::shared_ptr<Array> dict;
std::vector<std::string> dict_values = {"foo", "bar", "baz"};
- ArrayFromVector<StringType, std::string>(utf8(), dict_values, &dict);
+ ArrayFromVector<StringType, std::string>(dict_values, &dict);
std::shared_ptr<DataType> dict_type = dictionary(int16(), dict);
std::shared_ptr<Array> indices;
std::vector<int16_t> indices_values = {1, 2, -1, 0, 2, 0};
- ArrayFromVector<Int16Type, int16_t>(int16(), is_valid, indices_values, &indices);
+ ArrayFromVector<Int16Type, int16_t>(is_valid, indices_values, &indices);
auto arr = std::make_shared<DictionaryArray>(dict_type, indices);
static const char* expected = R"expected(
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/pretty_print.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index e30f4cc..23c0580 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -164,39 +164,56 @@ class ArrayPrinter : public ArrayVisitor {
Status WriteValidityBitmap(const Array& array) {
Newline();
Write("-- is_valid: ");
- BooleanArray is_valid(array.length(), array.null_bitmap());
- return PrettyPrint(is_valid, indent_ + 2, sink_);
+
+ if (array.null_count() > 0) {
+ BooleanArray is_valid(
+ array.length(), array.null_bitmap(), nullptr, 0, array.offset());
+ return PrettyPrint(is_valid, indent_ + 2, sink_);
+ } else {
+ Write("all not null");
+ return Status::OK();
+ }
}
Status Visit(const ListArray& array) override {
RETURN_NOT_OK(WriteValidityBitmap(array));
Newline();
- Write("-- offsets: ");
- Int32Array offsets(array.length() + 1, array.offsets());
- RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_));
+ Write("-- value_offsets: ");
+ Int32Array value_offsets(
+ array.length() + 1, array.value_offsets(), nullptr, 0, array.offset());
+ RETURN_NOT_OK(PrettyPrint(value_offsets, indent_ + 2, sink_));
Newline();
Write("-- values: ");
- RETURN_NOT_OK(PrettyPrint(*array.values().get(), indent_ + 2, sink_));
+ auto values = array.values();
+ if (array.offset() != 0) {
+ values = values->Slice(array.value_offset(0), array.value_offset(array.length()));
+ }
+ RETURN_NOT_OK(PrettyPrint(*values, indent_ + 2, sink_));
return Status::OK();
}
- Status PrintChildren(const std::vector<std::shared_ptr<Array>>& fields) {
+ Status PrintChildren(
+ const std::vector<std::shared_ptr<Array>>& fields, int32_t offset, int32_t length) {
for (size_t i = 0; i < fields.size(); ++i) {
Newline();
std::stringstream ss;
ss << "-- child " << i << " type: " << fields[i]->type()->ToString() << " values: ";
Write(ss.str());
- RETURN_NOT_OK(PrettyPrint(*fields[i].get(), indent_ + 2, sink_));
+
+ std::shared_ptr<Array> field = fields[i];
+ if (offset != 0) { field = field->Slice(offset, length); }
+
+ RETURN_NOT_OK(PrettyPrint(*field, indent_ + 2, sink_));
}
return Status::OK();
}
Status Visit(const StructArray& array) override {
RETURN_NOT_OK(WriteValidityBitmap(array));
- return PrintChildren(array.fields());
+ return PrintChildren(array.fields(), array.offset(), array.length());
}
Status Visit(const UnionArray& array) override {
@@ -204,17 +221,19 @@ class ArrayPrinter : public ArrayVisitor {
Newline();
Write("-- type_ids: ");
- UInt8Array type_ids(array.length(), array.type_ids());
+ UInt8Array type_ids(array.length(), array.type_ids(), nullptr, 0, array.offset());
RETURN_NOT_OK(PrettyPrint(type_ids, indent_ + 2, sink_));
if (array.mode() == UnionMode::DENSE) {
Newline();
- Write("-- offsets: ");
- Int32Array offsets(array.length(), array.offsets());
- RETURN_NOT_OK(PrettyPrint(offsets, indent_ + 2, sink_));
+ Write("-- value_offsets: ");
+ Int32Array value_offsets(
+ array.length(), array.value_offsets(), nullptr, 0, array.offset());
+ RETURN_NOT_OK(PrettyPrint(value_offsets, indent_ + 2, sink_));
}
- return PrintChildren(array.children());
+ // Print the children without any offset, because the type ids are absolute
+ return PrintChildren(array.children(), 0, array.length() + array.offset());
}
Status Visit(const DictionaryArray& array) override {
@@ -222,11 +241,11 @@ class ArrayPrinter : public ArrayVisitor {
Newline();
Write("-- dictionary: ");
- RETURN_NOT_OK(PrettyPrint(*array.dictionary().get(), indent_ + 2, sink_));
+ RETURN_NOT_OK(PrettyPrint(*array.dictionary(), indent_ + 2, sink_));
Newline();
Write("-- indices: ");
- return PrettyPrint(*array.indices().get(), indent_ + 2, sink_);
+ return PrettyPrint(*array.indices(), indent_ + 2, sink_);
}
void Write(const char* data) { (*sink_) << data; }
@@ -260,7 +279,7 @@ Status PrettyPrint(const RecordBatch& batch, int indent, std::ostream* sink) {
for (int i = 0; i < batch.num_columns(); ++i) {
const std::string& name = batch.column_name(i);
(*sink) << name << ": ";
- RETURN_NOT_OK(PrettyPrint(*batch.column(i).get(), indent + 2, sink));
+ RETURN_NOT_OK(PrettyPrint(*batch.column(i), indent + 2, sink));
(*sink) << "\n";
}
return Status::OK();
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc
index 67c9f67..e7c5d66 100644
--- a/cpp/src/arrow/table-test.cc
+++ b/cpp/src/arrow/table-test.cc
@@ -242,4 +242,30 @@ TEST_F(TestRecordBatch, Equals) {
ASSERT_FALSE(b1.Equals(b4));
}
+TEST_F(TestRecordBatch, Slice) {
+ const int length = 10;
+
+ auto f0 = std::make_shared<Field>("f0", int32());
+ auto f1 = std::make_shared<Field>("f1", uint8());
+
+ vector<shared_ptr<Field>> fields = {f0, f1};
+ auto schema = std::make_shared<Schema>(fields);
+
+ auto a0 = MakePrimitive<Int32Array>(length);
+ auto a1 = MakePrimitive<UInt8Array>(length);
+
+ RecordBatch batch(schema, length, {a0, a1});
+
+ auto batch_slice = batch.Slice(2);
+ auto batch_slice2 = batch.Slice(1, 5);
+
+ for (int i = 0; i < batch.num_columns(); ++i) {
+ ASSERT_EQ(2, batch_slice->column(i)->offset());
+ ASSERT_EQ(length - 2, batch_slice->column(i)->length());
+
+ ASSERT_EQ(1, batch_slice2->column(i)->offset());
+ ASSERT_EQ(5, batch_slice2->column(i)->length());
+ }
+}
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index b3563ea..9e31ba5 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -60,6 +60,19 @@ bool RecordBatch::ApproxEquals(const RecordBatch& other) const {
return true;
}
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int32_t offset) {
+ return Slice(offset, this->num_rows() - offset);
+}
+
+std::shared_ptr<RecordBatch> RecordBatch::Slice(int32_t offset, int32_t length) {
+ std::vector<std::shared_ptr<Array>> arrays;
+ arrays.reserve(num_columns());
+ for (const auto& field : columns_) {
+ arrays.emplace_back(field->Slice(offset, length));
+ }
+ return std::make_shared<RecordBatch>(schema_, num_rows_, arrays);
+}
+
// ----------------------------------------------------------------------
// Table methods
@@ -93,8 +106,7 @@ Status Table::FromRecordBatches(const std::string& name,
if (!batches[i]->schema()->Equals(schema)) {
std::stringstream ss;
ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
- << schema->ToString() << "\nvs\n"
- << batches[i]->schema()->ToString();
+ << schema->ToString() << "\nvs\n" << batches[i]->schema()->ToString();
return Status::Invalid(ss.str());
}
}
@@ -126,8 +138,7 @@ Status ConcatenateTables(const std::string& output_name,
if (!tables[i]->schema()->Equals(schema)) {
std::stringstream ss;
ss << "Schema at index " << static_cast<int>(i) << " was different: \n"
- << schema->ToString() << "\nvs\n"
- << tables[i]->schema()->ToString();
+ << schema->ToString() << "\nvs\n" << tables[i]->schema()->ToString();
return Status::Invalid(ss.str());
}
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 583847c..fa56824 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -64,6 +64,10 @@ class ARROW_EXPORT RecordBatch {
// @returns: the number of rows (the corresponding length of each column)
int32_t num_rows() const { return num_rows_; }
+ /// Slice each of the arrays in the record batch and construct a new RecordBatch object
+ std::shared_ptr<RecordBatch> Slice(int32_t offset);
+ std::shared_ptr<RecordBatch> Slice(int32_t offset, int32_t length);
+
private:
std::shared_ptr<Schema> schema_;
int32_t num_rows_;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index 4e52580..ffc7806 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -61,14 +61,6 @@
EXPECT_TRUE(s.ok()); \
} while (0)
-// Alias MSVC popcount to GCC name
-#ifdef _MSC_VER
-#include <intrin.h>
-#define __builtin_popcount __popcnt
-#include <nmmintrin.h>
-#define __builtin_popcountll _mm_popcnt_u64
-#endif
-
namespace arrow {
namespace test {
@@ -175,29 +167,6 @@ void rand_uniform_int(int n, uint32_t seed, T min_value, T max_value, T* out) {
}
}
-static inline int bitmap_popcount(const uint8_t* data, int length) {
- // book keeping
- constexpr int pop_len = sizeof(uint64_t);
- const uint64_t* i64_data = reinterpret_cast<const uint64_t*>(data);
- const int fast_counts = length / pop_len;
- const uint64_t* end = i64_data + fast_counts;
-
- int count = 0;
- // popcount as much as possible with the widest possible count
- for (auto iter = i64_data; iter < end; ++iter) {
- count += __builtin_popcountll(*iter);
- }
-
- // Account for left over bytes (in theory we could fall back to smaller
- // versions of popcount but the code complexity is likely not worth it)
- const int loop_tail_index = fast_counts * pop_len;
- for (int i = loop_tail_index; i < length; ++i) {
- if (BitUtil::GetBit(data, i)) { ++count; }
- }
-
- return count;
-}
-
static inline int null_count(const std::vector<uint8_t>& valid_bytes) {
int result = 0;
for (size_t i = 0; i < valid_bytes.size(); ++i) {
@@ -254,7 +223,7 @@ class TestBase : public ::testing::Test {
auto null_bitmap = std::make_shared<PoolBuffer>(pool_);
EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length)));
- return std::make_shared<ArrayType>(length, data, null_count, null_bitmap);
+ return std::make_shared<ArrayType>(length, data, null_bitmap, null_count);
}
protected:
@@ -263,11 +232,10 @@ class TestBase : public ::testing::Test {
};
template <typename TYPE, typename C_TYPE>
-void ArrayFromVector(const std::shared_ptr<DataType>& type,
- const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values,
+void ArrayFromVector(const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values,
std::shared_ptr<Array>* out) {
MemoryPool* pool = default_memory_pool();
- typename TypeTraits<TYPE>::BuilderType builder(pool, std::make_shared<TYPE>());
+ typename TypeTraits<TYPE>::BuilderType builder(pool);
for (size_t i = 0; i < values.size(); ++i) {
if (is_valid[i]) {
ASSERT_OK(builder.Append(values[i]));
@@ -279,10 +247,9 @@ void ArrayFromVector(const std::shared_ptr<DataType>& type,
}
template <typename TYPE, typename C_TYPE>
-void ArrayFromVector(const std::shared_ptr<DataType>& type,
- const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) {
+void ArrayFromVector(const std::vector<C_TYPE>& values, std::shared_ptr<Array>* out) {
MemoryPool* pool = default_memory_pool();
- typename TypeTraits<TYPE>::BuilderType builder(pool, std::make_shared<TYPE>());
+ typename TypeTraits<TYPE>::BuilderType builder(pool);
for (size_t i = 0; i < values.size(); ++i) {
ASSERT_OK(builder.Append(values[i]));
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index ba77584..a1c2b79 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -115,7 +115,7 @@ std::string UnionType::ToString() const {
for (size_t i = 0; i < children_.size(); ++i) {
if (i) { s << ", "; }
- s << children_[i]->ToString() << "=" << static_cast<int>(type_ids[i]);
+ s << children_[i]->ToString() << "=" << static_cast<int>(type_codes[i]);
}
s << ">";
return s.str();
@@ -224,8 +224,8 @@ std::shared_ptr<DataType> struct_(const std::vector<std::shared_ptr<Field>>& fie
}
std::shared_ptr<DataType> union_(const std::vector<std::shared_ptr<Field>>& child_fields,
- const std::vector<uint8_t>& type_ids, UnionMode mode) {
- return std::make_shared<UnionType>(child_fields, type_ids, mode);
+ const std::vector<uint8_t>& type_codes, UnionMode mode) {
+ return std::make_shared<UnionType>(child_fields, type_codes, mode);
}
std::shared_ptr<DataType> dictionary(const std::shared_ptr<DataType>& index_type,
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 8638a3f..927b8a4 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -413,8 +413,8 @@ struct ARROW_EXPORT UnionType : public DataType {
static constexpr Type::type type_id = Type::UNION;
UnionType(const std::vector<std::shared_ptr<Field>>& fields,
- const std::vector<uint8_t>& type_ids, UnionMode mode = UnionMode::SPARSE)
- : DataType(Type::UNION), mode(mode), type_ids(type_ids) {
+ const std::vector<uint8_t>& type_codes, UnionMode mode = UnionMode::SPARSE)
+ : DataType(Type::UNION), mode(mode), type_codes(type_codes) {
children_ = fields;
}
@@ -429,7 +429,7 @@ struct ARROW_EXPORT UnionType : public DataType {
// The type id used in the data to indicate each data type in the union. For
// example, the first type in the union might be denoted by the id 5 (instead
// of 0).
- std::vector<uint8_t> type_ids;
+ std::vector<uint8_t> type_codes;
};
// ----------------------------------------------------------------------
@@ -551,7 +551,7 @@ std::shared_ptr<DataType> ARROW_EXPORT struct_(
std::shared_ptr<DataType> ARROW_EXPORT union_(
const std::vector<std::shared_ptr<Field>>& child_fields,
- const std::vector<uint8_t>& type_ids, UnionMode mode = UnionMode::SPARSE);
+ const std::vector<uint8_t>& type_codes, UnionMode mode = UnionMode::SPARSE);
std::shared_ptr<DataType> ARROW_EXPORT dictionary(
const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>& values);
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/type_traits.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index 5cd5f45..c4898b1 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -126,6 +126,15 @@ struct TypeTraits<TimestampType> {
};
template <>
+struct TypeTraits<TimeType> {
+ using ArrayType = TimeArray;
+ // using BuilderType = TimestampBuilder;
+
+ static inline int bytes_required(int elements) { return elements * sizeof(int64_t); }
+ constexpr static bool is_parameter_free = false;
+};
+
+template <>
struct TypeTraits<HalfFloatType> {
using ArrayType = HalfFloatArray;
using BuilderType = HalfFloatBuilder;
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util-test.cc b/cpp/src/arrow/util/bit-util-test.cc
index cfdee04..cb2fd1a 100644
--- a/cpp/src/arrow/util/bit-util-test.cc
+++ b/cpp/src/arrow/util/bit-util-test.cc
@@ -17,11 +17,17 @@
#include "arrow/util/bit-util.h"
+#include <cstdint>
+#include <vector>
+
#include "gtest/gtest.h"
+#include "arrow/buffer.h"
+#include "arrow/test-util.h"
+
namespace arrow {
-TEST(UtilTests, TestIsMultipleOf64) {
+TEST(BitUtilTests, TestIsMultipleOf64) {
using BitUtil::IsMultipleOf64;
EXPECT_TRUE(IsMultipleOf64(64));
EXPECT_TRUE(IsMultipleOf64(0));
@@ -31,7 +37,7 @@ TEST(UtilTests, TestIsMultipleOf64) {
EXPECT_FALSE(IsMultipleOf64(32));
}
-TEST(UtilTests, TestNextPower2) {
+TEST(BitUtilTests, TestNextPower2) {
using BitUtil::NextPower2;
ASSERT_EQ(8, NextPower2(6));
@@ -51,4 +57,56 @@ TEST(UtilTests, TestNextPower2) {
ASSERT_EQ(1LL << 62, NextPower2((1LL << 62) - 1));
}
+static inline int64_t SlowCountBits(
+ const uint8_t* data, int64_t bit_offset, int64_t length) {
+ int64_t count = 0;
+ for (int64_t i = bit_offset; i < bit_offset + length; ++i) {
+ if (BitUtil::GetBit(data, i)) { ++count; }
+ }
+ return count;
+}
+
+TEST(BitUtilTests, TestCountSetBits) {
+ const int kBufferSize = 1000;
+ uint8_t buffer[kBufferSize] = {0};
+
+ test::random_bytes(kBufferSize, 0, buffer);
+
+ const int num_bits = kBufferSize * 8;
+
+ std::vector<int64_t> offsets = {
+ 0, 12, 16, 32, 37, 63, 64, 128, num_bits - 30, num_bits - 64};
+ for (int64_t offset : offsets) {
+ int64_t result = CountSetBits(buffer, offset, num_bits - offset);
+ int64_t expected = SlowCountBits(buffer, offset, num_bits - offset);
+
+ ASSERT_EQ(expected, result);
+ }
+}
+
+TEST(BitUtilTests, TestCopyBitmap) {
+ const int kBufferSize = 1000;
+
+ std::shared_ptr<MutableBuffer> buffer;
+ ASSERT_OK(AllocateBuffer(default_memory_pool(), kBufferSize, &buffer));
+ memset(buffer->mutable_data(), 0, kBufferSize);
+ test::random_bytes(kBufferSize, 0, buffer->mutable_data());
+
+ const int num_bits = kBufferSize * 8;
+
+ const uint8_t* src = buffer->data();
+
+ std::vector<int64_t> offsets = {0, 12, 16, 32, 37, 63, 64, 128};
+ for (int64_t offset : offsets) {
+ const int64_t copy_length = num_bits - offset;
+
+ std::shared_ptr<Buffer> copy;
+ ASSERT_OK(CopyBitmap(default_memory_pool(), src, offset, copy_length, ©));
+
+ for (int64_t i = 0; i < copy_length; ++i) {
+ ASSERT_EQ(BitUtil::GetBit(src, i + offset), BitUtil::GetBit(copy->data(), i));
+ }
+ }
+}
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.cc b/cpp/src/arrow/util/bit-util.cc
index 9c82407..f3fbb41 100644
--- a/cpp/src/arrow/util/bit-util.cc
+++ b/cpp/src/arrow/util/bit-util.cc
@@ -15,10 +15,20 @@
// specific language governing permissions and limitations
// under the License.
+// Alias MSVC popcount to GCC name
+#ifdef _MSC_VER
+#include <intrin.h>
+#define __builtin_popcount __popcnt
+#include <nmmintrin.h>
+#define __builtin_popcountll _mm_popcnt_u64
+#endif
+
+#include <algorithm>
#include <cstring>
#include <vector>
#include "arrow/buffer.h"
+#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/util/bit-util.h"
@@ -34,8 +44,9 @@ Status BitUtil::BytesToBits(
const std::vector<uint8_t>& bytes, std::shared_ptr<Buffer>* out) {
int bit_length = BitUtil::BytesForBits(bytes.size());
- auto buffer = std::make_shared<PoolBuffer>();
- RETURN_NOT_OK(buffer->Resize(bit_length));
+ std::shared_ptr<MutableBuffer> buffer;
+ RETURN_NOT_OK(AllocateBuffer(default_memory_pool(), bit_length, &buffer));
+
memset(buffer->mutable_data(), 0, bit_length);
BytesToBits(bytes, buffer->mutable_data());
@@ -43,4 +54,72 @@ Status BitUtil::BytesToBits(
return Status::OK();
}
+int64_t CountSetBits(const uint8_t* data, int64_t bit_offset, int64_t length) {
+ constexpr int64_t pop_len = sizeof(uint64_t) * 8;
+
+ int64_t count = 0;
+
+ // The first bit offset where we can use a 64-bit wide hardware popcount
+ const int64_t fast_count_start = BitUtil::RoundUp(bit_offset, pop_len);
+
+ // The number of bits until fast_count_start
+ const int64_t initial_bits = std::min(length, fast_count_start - bit_offset);
+ for (int64_t i = bit_offset; i < bit_offset + initial_bits; ++i) {
+ if (BitUtil::GetBit(data, i)) { ++count; }
+ }
+
+ const int64_t fast_counts = (length - initial_bits) / pop_len;
+
+ // Advance until the first aligned 8-byte word after the initial bits
+ const uint64_t* u64_data =
+ reinterpret_cast<const uint64_t*>(data) + fast_count_start / pop_len;
+
+ const uint64_t* end = u64_data + fast_counts;
+
+ // popcount as much as possible with the widest possible count
+ for (auto iter = u64_data; iter < end; ++iter) {
+ count += __builtin_popcountll(*iter);
+ }
+
+ // Account for left over bit (in theory we could fall back to smaller
+ // versions of popcount but the code complexity is likely not worth it)
+ const int64_t tail_index = bit_offset + initial_bits + fast_counts * pop_len;
+ for (int64_t i = tail_index; i < bit_offset + length; ++i) {
+ if (BitUtil::GetBit(data, i)) { ++count; }
+ }
+
+ return count;
+}
+
+Status GetEmptyBitmap(
+ MemoryPool* pool, int64_t length, std::shared_ptr<MutableBuffer>* result) {
+ RETURN_NOT_OK(AllocateBuffer(pool, BitUtil::BytesForBits(length), result));
+ memset((*result)->mutable_data(), 0, (*result)->size());
+ return Status::OK();
+}
+
+Status CopyBitmap(MemoryPool* pool, const uint8_t* data, int32_t offset, int32_t length,
+ std::shared_ptr<Buffer>* out) {
+ std::shared_ptr<MutableBuffer> buffer;
+ RETURN_NOT_OK(GetEmptyBitmap(pool, length, &buffer));
+ uint8_t* dest = buffer->mutable_data();
+ for (int64_t i = 0; i < length; ++i) {
+ BitUtil::SetBitTo(dest, i, BitUtil::GetBit(data, i + offset));
+ }
+ *out = buffer;
+ return Status::OK();
+}
+
+bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right,
+ int64_t right_offset, int64_t bit_length) {
+ // TODO(wesm): Make this faster using word-wise comparisons
+ for (int64_t i = 0; i < bit_length; ++i) {
+ if (BitUtil::GetBit(left, left_offset + i) !=
+ BitUtil::GetBit(right, right_offset + i)) {
+ return false;
+ }
+ }
+ return true;
+}
+
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/bit-util.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h
index 5c8055f..a0fbdd2 100644
--- a/cpp/src/arrow/util/bit-util.h
+++ b/cpp/src/arrow/util/bit-util.h
@@ -28,6 +28,8 @@
namespace arrow {
class Buffer;
+class MemoryPool;
+class MutableBuffer;
class Status;
namespace BitUtil {
@@ -62,6 +64,12 @@ static inline void SetBit(uint8_t* bits, int i) {
bits[i / 8] |= kBitmask[i % 8];
}
+static inline void SetBitTo(uint8_t* bits, int i, bool bit_is_set) {
+ // See https://graphics.stanford.edu/~seander/bithacks.html
+ // "Conditionally set or clear bits without branching"
+ bits[i / 8] ^= (-bit_is_set ^ bits[i / 8]) & kBitmask[i % 8];
+}
+
static inline int64_t NextPower2(int64_t n) {
n--;
n |= n >> 1;
@@ -82,6 +90,11 @@ static inline bool IsMultipleOf8(int64_t n) {
return (n & 7) == 0;
}
+/// Returns 'value' rounded up to the nearest multiple of 'factor'
+inline int64_t RoundUp(int64_t value, int64_t factor) {
+ return (value + (factor - 1)) / factor * factor;
+}
+
inline int64_t RoundUpToMultipleOf64(int64_t num) {
// TODO(wesm): is this definitely needed?
// DCHECK_GE(num, 0);
@@ -98,6 +111,38 @@ void BytesToBits(const std::vector<uint8_t>& bytes, uint8_t* bits);
ARROW_EXPORT Status BytesToBits(const std::vector<uint8_t>&, std::shared_ptr<Buffer>*);
} // namespace BitUtil
+
+// ----------------------------------------------------------------------
+// Bitmap utilities
+
+Status ARROW_EXPORT GetEmptyBitmap(
+ MemoryPool* pool, int64_t length, std::shared_ptr<MutableBuffer>* result);
+
+/// Copy a bit range of an existing bitmap
+///
+/// \param[in] pool memory pool to allocate memory from
+/// \param[in] bitmap source data
+/// \param[in] offset bit offset into the source data
+/// \param[in] length number of bits to copy
+/// \param[out] out the resulting copy
+///
+/// \return Status message
+Status ARROW_EXPORT CopyBitmap(MemoryPool* pool, const uint8_t* bitmap, int32_t offset,
+ int32_t length, std::shared_ptr<Buffer>* out);
+
+/// Compute the number of 1's in the given data array
+///
+/// \param[in] data a packed LSB-ordered bitmap as a byte array
+/// \param[in] bit_offset a bitwise offset into the bitmap
+/// \param[in] length the number of bits to inspect in the bitmap relative to the offset
+///
+/// \return The number of set (1) bits in the range
+int64_t ARROW_EXPORT CountSetBits(
+ const uint8_t* data, int64_t bit_offset, int64_t length);
+
+bool ARROW_EXPORT BitmapEquals(const uint8_t* left, int64_t left_offset,
+ const uint8_t* right, int64_t right_offset, int64_t bit_length);
+
} // namespace arrow
#endif // ARROW_UTIL_BIT_UTIL_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index b22f07d..06ee841 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -118,9 +118,9 @@ class CerrLog {
class FatalLog : public CerrLog {
public:
explicit FatalLog(int /* severity */) // NOLINT
- : CerrLog(ARROW_FATAL){} // NOLINT
+ : CerrLog(ARROW_FATAL) {} // NOLINT
- [[noreturn]] ~FatalLog() {
+ [[noreturn]] ~FatalLog() {
if (has_logged_) { std::cerr << std::endl; }
std::exit(1);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/cpp/src/arrow/util/macros.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/macros.h b/cpp/src/arrow/util/macros.h
index c4a62a4..81a9b0c 100644
--- a/cpp/src/arrow/util/macros.h
+++ b/cpp/src/arrow/util/macros.h
@@ -25,6 +25,6 @@
TypeName& operator=(const TypeName&) = delete
#endif
-#define UNUSED(x) (void)x
+#define UNUSED(x) (void) x
#endif // ARROW_UTIL_MACROS_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 842a219..ba26692 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -95,7 +95,7 @@ if ("${COMPILER_FAMILY}" STREQUAL "clang")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Qunused-arguments")
# Cython warnings in clang
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-parentheses-equality")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-parentheses-equality -Wno-constant-logical-operand")
endif()
set(PYARROW_LINK "a")
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 38883e8..ebfdc41 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -179,8 +179,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
double Value(int i)
cdef cppclass CListArray" arrow::ListArray"(CArray):
- const int32_t* offsets()
- int32_t offset(int i)
+ const int32_t* raw_value_offsets()
+ int32_t value_offset(int i)
int32_t value_length(int i)
shared_ptr[CArray] values()
shared_ptr[CDataType] value_type()
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/pyarrow/scalar.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/scalar.pyx b/python/pyarrow/scalar.pyx
index 30b9040..9d2b2b1 100644
--- a/python/pyarrow/scalar.pyx
+++ b/python/pyarrow/scalar.pyx
@@ -202,7 +202,7 @@ cdef class ListValue(ArrayValue):
self.value_type = box_data_type(self.ap.value_type())
cdef getitem(self, int i):
- cdef int j = self.ap.offset(self.index) + i
+ cdef int j = self.ap.value_offset(self.index) + i
return box_arrow_scalar(self.value_type, self.ap.values(), j)
def as_py(self):
http://git-wip-us.apache.org/repos/asf/arrow/blob/5439b715/python/src/pyarrow/adapters/builtin.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/builtin.cc b/python/src/pyarrow/adapters/builtin.cc
index 1abfb40..5fd8eef 100644
--- a/python/src/pyarrow/adapters/builtin.cc
+++ b/python/src/pyarrow/adapters/builtin.cc
@@ -505,7 +505,7 @@ Status ConvertPySequence(
// Handle NA / NullType case
if (type->type == Type::NA) {
- out->reset(new arrow::NullArray(type, size));
+ out->reset(new arrow::NullArray(size));
return Status::OK();
}