You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/03/09 19:00:59 UTC
arrow git commit: ARROW-605: [C++] Refactor IPC adapter code into
generic ArrayLoader class. Add Date32Type
Repository: arrow
Updated Branches:
refs/heads/master b109a246f -> 6b3ae2aec
ARROW-605: [C++] Refactor IPC adapter code into generic ArrayLoader class. Add Date32Type
These are various changes introduced to support the Feather merge in ARROW-452 #361
Author: Wes McKinney <we...@twosigma.com>
Closes #365 from wesm/array-loader and squashes the following commits:
bc22872 [Wes McKinney] Revert Array::type_id to type_enum since Parquet uses this API
344e6b1 [Wes McKinney] fix compiler warning
997b7a2 [Wes McKinney] Refactor IPC adapter code into generic ArrayLoader class. Add Date32Type
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/6b3ae2ae
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/6b3ae2ae
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/6b3ae2ae
Branch: refs/heads/master
Commit: 6b3ae2aecc8cd31425035a021fa04b9ed3385a8d
Parents: b109a24
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Mar 9 14:00:48 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Thu Mar 9 14:00:48 2017 -0500
----------------------------------------------------------------------
cpp/CMakeLists.txt | 1 +
cpp/src/arrow/CMakeLists.txt | 5 +-
cpp/src/arrow/array.cc | 52 +----
cpp/src/arrow/array.h | 17 +-
cpp/src/arrow/builder.cc | 1 +
cpp/src/arrow/builder.h | 1 +
cpp/src/arrow/column.cc | 3 +
cpp/src/arrow/column.h | 3 +
cpp/src/arrow/compare.cc | 8 +-
cpp/src/arrow/io/memory.cc | 19 +-
cpp/src/arrow/io/memory.h | 6 +
cpp/src/arrow/ipc/adapter.cc | 252 ++------------------
cpp/src/arrow/ipc/adapter.h | 8 +-
cpp/src/arrow/ipc/metadata.cc | 1 +
cpp/src/arrow/ipc/metadata.h | 7 +-
cpp/src/arrow/loader.cc | 285 +++++++++++++++++++++++
cpp/src/arrow/loader.h | 89 +++++++
cpp/src/arrow/pretty_print.cc | 6 +-
cpp/src/arrow/type.cc | 22 +-
cpp/src/arrow/type.h | 38 ++-
cpp/src/arrow/type_fwd.h | 5 +
cpp/src/arrow/type_traits.h | 12 +
python/pyarrow/array.pyx | 31 ++-
python/pyarrow/table.pyx | 3 +-
python/pyarrow/tests/test_convert_pandas.py | 6 +-
python/src/pyarrow/adapters/pandas.cc | 17 +-
26 files changed, 558 insertions(+), 340 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 22c6e9a..294c439 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -799,6 +799,7 @@ set(ARROW_SRCS
src/arrow/builder.cc
src/arrow/column.cc
src/arrow/compare.cc
+ src/arrow/loader.cc
src/arrow/memory_pool.cc
src/arrow/pretty_print.cc
src/arrow/schema.cc
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index d1efa02..ddeb81c 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -19,10 +19,11 @@
install(FILES
api.h
array.h
- column.h
- compare.h
buffer.h
builder.h
+ column.h
+ compare.h
+ loader.h
memory_pool.h
pretty_print.h
schema.h
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 284bb57..49da6bb 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -165,6 +165,7 @@ template class NumericArray<Int32Type>;
template class NumericArray<Int64Type>;
template class NumericArray<TimestampType>;
template class NumericArray<DateType>;
+template class NumericArray<Date32Type>;
template class NumericArray<TimeType>;
template class NumericArray<HalfFloatType>;
template class NumericArray<FloatType>;
@@ -193,7 +194,7 @@ std::shared_ptr<Array> BooleanArray::Slice(int64_t offset, int64_t length) const
Status ListArray::Validate() const {
if (length_ < 0) { return Status::Invalid("Length was negative"); }
- if (!value_offsets_) { return Status::Invalid("value_offsets_ was null"); }
+ if (length_ && !value_offsets_) { return Status::Invalid("value_offsets_ was null"); }
if (value_offsets_->size() / static_cast<int>(sizeof(int32_t)) < length_) {
std::stringstream ss;
ss << "offset buffer size (bytes): " << value_offsets_->size()
@@ -425,20 +426,6 @@ std::shared_ptr<Array> UnionArray::Slice(int64_t offset, int64_t length) const {
// ----------------------------------------------------------------------
// DictionaryArray
-Status DictionaryArray::FromBuffer(const std::shared_ptr<DataType>& type, int64_t length,
- const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap,
- int64_t null_count, int64_t offset, std::shared_ptr<DictionaryArray>* out) {
- DCHECK_EQ(type->type, Type::DICTIONARY);
- const auto& dict_type = static_cast<const DictionaryType*>(type.get());
-
- std::shared_ptr<Array> boxed_indices;
- RETURN_NOT_OK(MakePrimitiveArray(dict_type->index_type(), length, indices, null_bitmap,
- null_count, offset, &boxed_indices));
-
- *out = std::make_shared<DictionaryArray>(type, boxed_indices);
- return Status::OK();
-}
-
DictionaryArray::DictionaryArray(
const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices)
: Array(type, indices->length(), indices->null_bitmap(), indices->null_count(),
@@ -470,40 +457,6 @@ std::shared_ptr<Array> DictionaryArray::Slice(int64_t offset, int64_t length) co
}
// ----------------------------------------------------------------------
-
-#define MAKE_PRIMITIVE_ARRAY_CASE(ENUM, ArrayType) \
- case Type::ENUM: \
- out->reset(new ArrayType(type, length, data, null_bitmap, null_count, offset)); \
- break;
-
-Status MakePrimitiveArray(const std::shared_ptr<DataType>& type, int64_t length,
- const std::shared_ptr<Buffer>& data, const std::shared_ptr<Buffer>& null_bitmap,
- int64_t null_count, int64_t offset, std::shared_ptr<Array>* out) {
- switch (type->type) {
- MAKE_PRIMITIVE_ARRAY_CASE(BOOL, BooleanArray);
- MAKE_PRIMITIVE_ARRAY_CASE(UINT8, UInt8Array);
- MAKE_PRIMITIVE_ARRAY_CASE(INT8, Int8Array);
- MAKE_PRIMITIVE_ARRAY_CASE(UINT16, UInt16Array);
- MAKE_PRIMITIVE_ARRAY_CASE(INT16, Int16Array);
- MAKE_PRIMITIVE_ARRAY_CASE(UINT32, UInt32Array);
- MAKE_PRIMITIVE_ARRAY_CASE(INT32, Int32Array);
- MAKE_PRIMITIVE_ARRAY_CASE(UINT64, UInt64Array);
- MAKE_PRIMITIVE_ARRAY_CASE(INT64, Int64Array);
- MAKE_PRIMITIVE_ARRAY_CASE(FLOAT, FloatArray);
- MAKE_PRIMITIVE_ARRAY_CASE(DOUBLE, DoubleArray);
- MAKE_PRIMITIVE_ARRAY_CASE(TIME, Int64Array);
- MAKE_PRIMITIVE_ARRAY_CASE(TIMESTAMP, TimestampArray);
- default:
- return Status::NotImplemented(type->ToString());
- }
-#ifdef NDEBUG
- return Status::OK();
-#else
- return (*out)->Validate();
-#endif
-}
-
-// ----------------------------------------------------------------------
// Default implementations of ArrayVisitor methods
#define ARRAY_VISITOR_DEFAULT(ARRAY_CLASS) \
@@ -527,6 +480,7 @@ ARRAY_VISITOR_DEFAULT(DoubleArray);
ARRAY_VISITOR_DEFAULT(StringArray);
ARRAY_VISITOR_DEFAULT(BinaryArray);
ARRAY_VISITOR_DEFAULT(DateArray);
+ARRAY_VISITOR_DEFAULT(Date32Array);
ARRAY_VISITOR_DEFAULT(TimeArray);
ARRAY_VISITOR_DEFAULT(TimestampArray);
ARRAY_VISITOR_DEFAULT(IntervalArray);
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index f20f212..f111609 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -58,6 +58,7 @@ class ARROW_EXPORT ArrayVisitor {
virtual Status Visit(const StringArray& array);
virtual Status Visit(const BinaryArray& array);
virtual Status Visit(const DateArray& array);
+ virtual Status Visit(const Date32Array& array);
virtual Status Visit(const TimeArray& array);
virtual Status Visit(const TimestampArray& array);
virtual Status Visit(const IntervalArray& array);
@@ -485,12 +486,6 @@ class ARROW_EXPORT DictionaryArray : public Array {
DictionaryArray(
const std::shared_ptr<DataType>& type, const std::shared_ptr<Array>& indices);
- // Alternate ctor; other attributes (like null count) are inherited from the
- // passed indices array
- static Status FromBuffer(const std::shared_ptr<DataType>& type, int64_t length,
- const std::shared_ptr<Buffer>& indices, const std::shared_ptr<Buffer>& null_bitmap,
- int64_t null_count, int64_t offset, std::shared_ptr<DictionaryArray>* out);
-
Status Validate() const override;
std::shared_ptr<Array> indices() const { return indices_; }
@@ -531,21 +526,13 @@ extern template class ARROW_EXPORT NumericArray<FloatType>;
extern template class ARROW_EXPORT NumericArray<DoubleType>;
extern template class ARROW_EXPORT NumericArray<TimestampType>;
extern template class ARROW_EXPORT NumericArray<DateType>;
+extern template class ARROW_EXPORT NumericArray<Date32Type>;
extern template class ARROW_EXPORT NumericArray<TimeType>;
#if defined(__GNUC__) && !defined(__clang__)
#pragma GCC diagnostic pop
#endif
-// ----------------------------------------------------------------------
-// Helper functions
-
-// Create new arrays for logical types that are backed by primitive arrays.
-Status ARROW_EXPORT MakePrimitiveArray(const std::shared_ptr<DataType>& type,
- int64_t length, const std::shared_ptr<Buffer>& data,
- const std::shared_ptr<Buffer>& null_bitmap, int64_t null_count, int64_t offset,
- std::shared_ptr<Array>* out);
-
} // namespace arrow
#endif
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/builder.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc
index 9086598..4372925 100644
--- a/cpp/src/arrow/builder.cc
+++ b/cpp/src/arrow/builder.cc
@@ -238,6 +238,7 @@ template class PrimitiveBuilder<Int16Type>;
template class PrimitiveBuilder<Int32Type>;
template class PrimitiveBuilder<Int64Type>;
template class PrimitiveBuilder<DateType>;
+template class PrimitiveBuilder<Date32Type>;
template class PrimitiveBuilder<TimestampType>;
template class PrimitiveBuilder<TimeType>;
template class PrimitiveBuilder<HalfFloatType>;
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/builder.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/builder.h b/cpp/src/arrow/builder.h
index e642d3c..ebc683a 100644
--- a/cpp/src/arrow/builder.h
+++ b/cpp/src/arrow/builder.h
@@ -233,6 +233,7 @@ using Int64Builder = NumericBuilder<Int64Type>;
using TimestampBuilder = NumericBuilder<TimestampType>;
using TimeBuilder = NumericBuilder<TimeType>;
using DateBuilder = NumericBuilder<DateType>;
+using Date32Builder = NumericBuilder<Date32Type>;
using HalfFloatBuilder = NumericBuilder<HalfFloatType>;
using FloatBuilder = NumericBuilder<FloatType>;
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/column.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc
index 1822870..78501f9 100644
--- a/cpp/src/arrow/column.cc
+++ b/cpp/src/arrow/column.cc
@@ -97,6 +97,9 @@ Column::Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>
}
}
+Column::Column(const std::string& name, const std::shared_ptr<Array>& data)
+ : Column(::arrow::field(name, data->type()), data) {}
+
Column::Column(
const std::shared_ptr<Field>& field, const std::shared_ptr<ChunkedArray>& data)
: field_(field), data_(data) {}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/column.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h
index 93a34c7..bfcfd8e 100644
--- a/cpp/src/arrow/column.h
+++ b/cpp/src/arrow/column.h
@@ -69,6 +69,9 @@ class ARROW_EXPORT Column {
Column(const std::shared_ptr<Field>& field, const std::shared_ptr<Array>& data);
+ /// Construct from name and array
+ Column(const std::string& name, const std::shared_ptr<Array>& data);
+
int64_t length() const { return data_->length(); }
int64_t null_count() const { return data_->null_count(); }
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/compare.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc
index f38f8d6..17b8833 100644
--- a/cpp/src/arrow/compare.cc
+++ b/cpp/src/arrow/compare.cc
@@ -145,6 +145,10 @@ class RangeEqualsVisitor : public ArrayVisitor {
Status Visit(const DateArray& left) override { return CompareValues<DateArray>(left); }
+ Status Visit(const Date32Array& left) override {
+ return CompareValues<Date32Array>(left);
+ }
+
Status Visit(const TimeArray& left) override { return CompareValues<TimeArray>(left); }
Status Visit(const TimestampArray& left) override {
@@ -381,6 +385,8 @@ class ArrayEqualsVisitor : public RangeEqualsVisitor {
Status Visit(const DateArray& left) override { return ComparePrimitive(left); }
+ Status Visit(const Date32Array& left) override { return ComparePrimitive(left); }
+
Status Visit(const TimeArray& left) override { return ComparePrimitive(left); }
Status Visit(const TimestampArray& left) override { return ComparePrimitive(left); }
@@ -622,7 +628,7 @@ class TypeEqualsVisitor : public TypeVisitor {
Status Visit(const TimestampType& left) override {
const auto& right = static_cast<const TimestampType&>(right_);
- result_ = left.unit == right.unit;
+ result_ = left.unit == right.unit && left.timezone == right.timezone;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 1339a99..5b5c864 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -28,6 +28,7 @@
#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/status.h"
+#include "arrow/util/logging.h"
namespace arrow {
namespace io {
@@ -43,9 +44,17 @@ BufferOutputStream::BufferOutputStream(const std::shared_ptr<ResizableBuffer>& b
position_(0),
mutable_data_(buffer->mutable_data()) {}
+Status BufferOutputStream::Create(int64_t initial_capacity, MemoryPool* pool,
+ std::shared_ptr<BufferOutputStream>* out) {
+ std::shared_ptr<ResizableBuffer> buffer;
+ RETURN_NOT_OK(AllocateResizableBuffer(pool, initial_capacity, &buffer));
+ *out = std::make_shared<BufferOutputStream>(buffer);
+ return Status::OK();
+}
+
BufferOutputStream::~BufferOutputStream() {
// This can fail, better to explicitly call close
- Close();
+ if (buffer_) { Close(); }
}
Status BufferOutputStream::Close() {
@@ -56,12 +65,20 @@ Status BufferOutputStream::Close() {
}
}
+Status BufferOutputStream::Finish(std::shared_ptr<Buffer>* result) {
+ RETURN_NOT_OK(Close());
+ *result = buffer_;
+ buffer_ = nullptr;
+ return Status::OK();
+}
+
Status BufferOutputStream::Tell(int64_t* position) {
*position = position_;
return Status::OK();
}
Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) {
+ DCHECK(buffer_);
RETURN_NOT_OK(Reserve(nbytes));
std::memcpy(mutable_data_ + position_, data, nbytes);
position_ += nbytes;
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 2d3df42..8280750 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -43,6 +43,9 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
public:
explicit BufferOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
+ static Status Create(int64_t initial_capacity, MemoryPool* pool,
+ std::shared_ptr<BufferOutputStream>* out);
+
~BufferOutputStream();
// Implement the OutputStream interface
@@ -50,6 +53,9 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
Status Tell(int64_t* position) override;
Status Write(const uint8_t* data, int64_t nbytes) override;
+ /// Close the stream and return the buffer
+ Status Finish(std::shared_ptr<Buffer>* result);
+
private:
// Ensures there is sufficient space available to write nbytes
Status Reserve(int64_t nbytes);
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/adapter.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc
index f11c88a..78d5810 100644
--- a/cpp/src/arrow/ipc/adapter.cc
+++ b/cpp/src/arrow/ipc/adapter.cc
@@ -32,6 +32,7 @@
#include "arrow/ipc/metadata-internal.h"
#include "arrow/ipc/metadata.h"
#include "arrow/ipc/util.h"
+#include "arrow/loader.h"
#include "arrow/memory_pool.h"
#include "arrow/schema.h"
#include "arrow/status.h"
@@ -531,12 +532,12 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length, MemoryPool* pool) {
- DictionaryWriter writer(pool, buffer_start_offset, kMaxIpcRecursionDepth);
+ DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth);
return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length);
}
Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
- RecordBatchWriter writer(default_memory_pool(), 0, kMaxIpcRecursionDepth);
+ RecordBatchWriter writer(default_memory_pool(), 0, kMaxNestingDepth);
RETURN_NOT_OK(writer.GetTotalSize(batch, size));
return Status::OK();
}
@@ -544,235 +545,33 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) {
// ----------------------------------------------------------------------
// Record batch read path
-struct RecordBatchContext {
- const RecordBatchMetadata* metadata;
- int buffer_index;
- int field_index;
- int max_recursion_depth;
-};
-
-// Traverse the flattened record batch metadata and reassemble the
-// corresponding array containers
-class ArrayLoader : public TypeVisitor {
+class IpcComponentSource : public ArrayComponentSource {
public:
- ArrayLoader(
- const Field& field, RecordBatchContext* context, io::ReadableFileInterface* file)
- : field_(field), context_(context), file_(file) {}
-
- Status Load(std::shared_ptr<Array>* out) {
- if (context_->max_recursion_depth <= 0) {
- return Status::Invalid("Max recursion depth reached");
- }
-
- // Load the array
- RETURN_NOT_OK(field_.type->Accept(this));
+ IpcComponentSource(const RecordBatchMetadata& metadata, io::ReadableFileInterface* file)
+ : metadata_(metadata), file_(file) {}
- *out = std::move(result_);
- return Status::OK();
- }
-
- private:
- const Field& field_;
- RecordBatchContext* context_;
- io::ReadableFileInterface* file_;
-
- // Used in visitor pattern
- std::shared_ptr<Array> result_;
-
- Status LoadChild(const Field& field, std::shared_ptr<Array>* out) {
- ArrayLoader loader(field, context_, file_);
- --context_->max_recursion_depth;
- RETURN_NOT_OK(loader.Load(out));
- ++context_->max_recursion_depth;
- return Status::OK();
- }
-
- Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
- BufferMetadata metadata = context_->metadata->buffer(buffer_index);
-
- if (metadata.length == 0) {
+ Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) override {
+ BufferMetadata buffer_meta = metadata_.buffer(buffer_index);
+ if (buffer_meta.length == 0) {
*out = nullptr;
return Status::OK();
} else {
- return file_->ReadAt(metadata.offset, metadata.length, out);
+ return file_->ReadAt(buffer_meta.offset, buffer_meta.length, out);
}
}
- Status LoadCommon(FieldMetadata* field_meta, std::shared_ptr<Buffer>* null_bitmap) {
+ Status GetFieldMetadata(int field_index, FieldMetadata* metadata) override {
// pop off a field
- if (context_->field_index >= context_->metadata->num_fields()) {
+ if (field_index >= metadata_.num_fields()) {
return Status::Invalid("Ran out of field metadata, likely malformed");
}
-
- // This only contains the length and null count, which we need to figure
- // out what to do with the buffers. For example, if null_count == 0, then
- // we can skip that buffer without reading from shared memory
- *field_meta = context_->metadata->field(context_->field_index++);
-
- // extract null_bitmap which is common to all arrays
- if (field_meta->null_count == 0) {
- *null_bitmap = nullptr;
- } else {
- RETURN_NOT_OK(GetBuffer(context_->buffer_index, null_bitmap));
- }
- context_->buffer_index++;
- return Status::OK();
- }
-
- Status LoadPrimitive(const DataType& type) {
- FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap, data;
-
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
- if (field_meta.length > 0) {
- RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
- } else {
- context_->buffer_index++;
- data.reset(new Buffer(nullptr, 0));
- }
- return MakePrimitiveArray(field_.type, field_meta.length, data, null_bitmap,
- field_meta.null_count, 0, &result_);
- }
-
- template <typename CONTAINER>
- Status LoadBinary() {
- FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap, offsets, values;
-
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
- if (field_meta.length > 0) {
- RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
- RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
- } else {
- context_->buffer_index += 2;
- offsets = values = nullptr;
- }
-
- result_ = std::make_shared<CONTAINER>(
- field_meta.length, offsets, values, null_bitmap, field_meta.null_count);
- return Status::OK();
- }
-
- Status Visit(const BooleanType& type) override { return LoadPrimitive(type); }
-
- Status Visit(const Int8Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const Int16Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const Int32Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const Int64Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const UInt8Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const UInt16Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const UInt32Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const UInt64Type& type) override { return LoadPrimitive(type); }
-
- Status Visit(const HalfFloatType& type) override { return LoadPrimitive(type); }
-
- Status Visit(const FloatType& type) override { return LoadPrimitive(type); }
-
- Status Visit(const DoubleType& type) override { return LoadPrimitive(type); }
-
- Status Visit(const StringType& type) override { return LoadBinary<StringArray>(); }
-
- Status Visit(const BinaryType& type) override { return LoadBinary<BinaryArray>(); }
-
- Status Visit(const DateType& type) override { return LoadPrimitive(type); }
-
- Status Visit(const TimeType& type) override { return LoadPrimitive(type); }
-
- Status Visit(const TimestampType& type) override { return LoadPrimitive(type); }
-
- Status Visit(const ListType& type) override {
- FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap, offsets;
-
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
- if (field_meta.length > 0) {
- RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets));
- } else {
- offsets = nullptr;
- }
- ++context_->buffer_index;
-
- const int num_children = type.num_children();
- if (num_children != 1) {
- std::stringstream ss;
- ss << "Wrong number of children: " << num_children;
- return Status::Invalid(ss.str());
- }
- std::shared_ptr<Array> values_array;
-
- RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array));
-
- result_ = std::make_shared<ListArray>(field_.type, field_meta.length, offsets,
- values_array, null_bitmap, field_meta.null_count);
- return Status::OK();
- }
-
- Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields,
- std::vector<std::shared_ptr<Array>>* arrays) {
- arrays->reserve(static_cast<int>(child_fields.size()));
-
- for (const auto& child_field : child_fields) {
- std::shared_ptr<Array> field_array;
- RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array));
- arrays->emplace_back(field_array);
- }
+ *metadata = metadata_.field(field_index);
return Status::OK();
}
- Status Visit(const StructType& type) override {
- FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap;
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
-
- std::vector<std::shared_ptr<Array>> fields;
- RETURN_NOT_OK(LoadChildren(type.children(), &fields));
-
- result_ = std::make_shared<StructArray>(
- field_.type, field_meta.length, fields, null_bitmap, field_meta.null_count);
- return Status::OK();
- }
-
- Status Visit(const UnionType& type) override {
- FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap, type_ids, offsets;
-
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
- if (field_meta.length > 0) {
- RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids));
- if (type.mode == UnionMode::DENSE) {
- RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &offsets));
- }
- }
- context_->buffer_index += type.mode == UnionMode::DENSE ? 2 : 1;
-
- std::vector<std::shared_ptr<Array>> fields;
- RETURN_NOT_OK(LoadChildren(type.children(), &fields));
-
- result_ = std::make_shared<UnionArray>(field_.type, field_meta.length, fields,
- type_ids, offsets, null_bitmap, field_meta.null_count);
- return Status::OK();
- }
-
- Status Visit(const DictionaryType& type) override {
- FieldMetadata field_meta;
- std::shared_ptr<Buffer> null_bitmap, indices_data;
- RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
- RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &indices_data));
-
- std::shared_ptr<Array> indices;
- RETURN_NOT_OK(MakePrimitiveArray(type.index_type(), field_meta.length, indices_data,
- null_bitmap, field_meta.null_count, 0, &indices));
-
- result_ = std::make_shared<DictionaryArray>(field_.type, indices);
- return Status::OK();
- };
+ private:
+ const RecordBatchMetadata& metadata_;
+ io::ReadableFileInterface* file_;
};
class RecordBatchReader {
@@ -788,17 +587,15 @@ class RecordBatchReader {
Status Read(std::shared_ptr<RecordBatch>* out) {
std::vector<std::shared_ptr<Array>> arrays(schema_->num_fields());
- // The field_index and buffer_index are incremented in the ArrayLoader
- // based on how much of the batch is "consumed" (through nested data
- // reconstruction, for example)
- context_.metadata = &metadata_;
- context_.field_index = 0;
- context_.buffer_index = 0;
- context_.max_recursion_depth = max_recursion_depth_;
+ IpcComponentSource source(metadata_, file_);
+ ArrayLoaderContext context;
+ context.source = &source;
+ context.field_index = 0;
+ context.buffer_index = 0;
+ context.max_recursion_depth = max_recursion_depth_;
for (int i = 0; i < schema_->num_fields(); ++i) {
- ArrayLoader loader(*schema_->field(i).get(), &context_, file_);
- RETURN_NOT_OK(loader.Load(&arrays[i]));
+ RETURN_NOT_OK(LoadArray(schema_->field(i)->type, &context, &arrays[i]));
}
*out = std::make_shared<RecordBatch>(schema_, metadata_.length(), arrays);
@@ -806,7 +603,6 @@ class RecordBatchReader {
}
private:
- RecordBatchContext context_;
const RecordBatchMetadata& metadata_;
std::shared_ptr<Schema> schema_;
int max_recursion_depth_;
@@ -816,7 +612,7 @@ class RecordBatchReader {
Status ReadRecordBatch(const RecordBatchMetadata& metadata,
const std::shared_ptr<Schema>& schema, io::ReadableFileInterface* file,
std::shared_ptr<RecordBatch>* out) {
- return ReadRecordBatch(metadata, schema, kMaxIpcRecursionDepth, file, out);
+ return ReadRecordBatch(metadata, schema, kMaxNestingDepth, file, out);
}
Status ReadRecordBatch(const RecordBatchMetadata& metadata,
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/adapter.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/adapter.h b/cpp/src/arrow/ipc/adapter.h
index 933d3a4..21d814d 100644
--- a/cpp/src/arrow/ipc/adapter.h
+++ b/cpp/src/arrow/ipc/adapter.h
@@ -26,6 +26,7 @@
#include <vector>
#include "arrow/ipc/metadata.h"
+#include "arrow/loader.h"
#include "arrow/util/visibility.h"
namespace arrow {
@@ -47,11 +48,6 @@ namespace ipc {
// ----------------------------------------------------------------------
// Write path
-//
-// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
-// deeply nested schemas, it is expected the user will indicate explicitly the
-// maximum allowed recursion depth
-constexpr int kMaxIpcRecursionDepth = 64;
// Write the RecordBatch (collection of equal-length Arrow arrays) to the
// output stream in a contiguous block. The record batch metadata is written as
@@ -75,7 +71,7 @@ constexpr int kMaxIpcRecursionDepth = 64;
// padding bytes
Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset,
io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length,
- MemoryPool* pool, int max_recursion_depth = kMaxIpcRecursionDepth);
+ MemoryPool* pool, int max_recursion_depth = kMaxNestingDepth);
// Write Array as a DictionaryBatch message
Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/metadata.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.cc b/cpp/src/arrow/ipc/metadata.cc
index 2ba44ac..695e788 100644
--- a/cpp/src/arrow/ipc/metadata.cc
+++ b/cpp/src/arrow/ipc/metadata.cc
@@ -289,6 +289,7 @@ FieldMetadata RecordBatchMetadata::field(int i) const {
FieldMetadata result;
result.length = node->length();
result.null_count = node->null_count();
+ result.offset = 0;
return result;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/ipc/metadata.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/ipc/metadata.h b/cpp/src/arrow/ipc/metadata.h
index f12529b..f6a0a3a 100644
--- a/cpp/src/arrow/ipc/metadata.h
+++ b/cpp/src/arrow/ipc/metadata.h
@@ -25,6 +25,7 @@
#include <unordered_map>
#include <vector>
+#include "arrow/loader.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
@@ -135,12 +136,6 @@ class ARROW_EXPORT SchemaMetadata {
DISALLOW_COPY_AND_ASSIGN(SchemaMetadata);
};
-// Field metadata
-struct ARROW_EXPORT FieldMetadata {
- int32_t length;
- int32_t null_count;
-};
-
struct ARROW_EXPORT BufferMetadata {
int32_t page;
int64_t offset;
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/loader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/loader.cc b/cpp/src/arrow/loader.cc
new file mode 100644
index 0000000..3cb51ae
--- /dev/null
+++ b/cpp/src/arrow/loader.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/loader.h"
+
+#include <cstdint>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "arrow/array.h"
+#include "arrow/buffer.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+struct DataType;
+class Status;
+
+class ArrayLoader : public TypeVisitor {
+ public:
+ ArrayLoader(const std::shared_ptr<DataType>& type, ArrayLoaderContext* context)
+ : type_(type), context_(context) {}
+
+ Status Load(std::shared_ptr<Array>* out) {
+ if (context_->max_recursion_depth <= 0) {
+ return Status::Invalid("Max recursion depth reached");
+ }
+
+ // Load the array
+ RETURN_NOT_OK(type_->Accept(this));
+
+ *out = std::move(result_);
+ return Status::OK();
+ }
+
+ Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
+ return context_->source->GetBuffer(buffer_index, out);
+ }
+
+ Status LoadCommon(FieldMetadata* field_meta, std::shared_ptr<Buffer>* null_bitmap) {
+ // This only contains the length and null count, which we need to figure
+ // out what to do with the buffers. For example, if null_count == 0, then
+ // we can skip that buffer without reading from shared memory
+ RETURN_NOT_OK(
+ context_->source->GetFieldMetadata(context_->field_index++, field_meta));
+
+ // extract null_bitmap which is common to all arrays
+ if (field_meta->null_count == 0) {
+ *null_bitmap = nullptr;
+ } else {
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index, null_bitmap));
+ }
+ context_->buffer_index++;
+ return Status::OK();
+ }
+
+ template <typename TYPE>
+ Status LoadPrimitive() {
+ using ArrayType = typename TypeTraits<TYPE>::ArrayType;
+
+ FieldMetadata field_meta;
+ std::shared_ptr<Buffer> null_bitmap, data;
+
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+ if (field_meta.length > 0) {
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &data));
+ } else {
+ context_->buffer_index++;
+ data.reset(new Buffer(nullptr, 0));
+ }
+ result_ = std::make_shared<ArrayType>(type_, field_meta.length, data, null_bitmap,
+ field_meta.null_count, field_meta.offset);
+ return Status::OK();
+ }
+
+ template <typename CONTAINER>
+ Status LoadBinary() {
+ FieldMetadata field_meta;
+ std::shared_ptr<Buffer> null_bitmap, offsets, values;
+
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+ if (field_meta.length > 0) {
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &offsets));
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index++, &values));
+ } else {
+ context_->buffer_index += 2;
+ offsets = values = nullptr;
+ }
+
+ result_ = std::make_shared<CONTAINER>(
+ field_meta.length, offsets, values, null_bitmap, field_meta.null_count);
+ return Status::OK();
+ }
+
+ Status LoadChild(const Field& field, std::shared_ptr<Array>* out) {
+ ArrayLoader loader(field.type, context_);
+ --context_->max_recursion_depth;
+ RETURN_NOT_OK(loader.Load(out));
+ ++context_->max_recursion_depth;
+ return Status::OK();
+ }
+
+ Status LoadChildren(std::vector<std::shared_ptr<Field>> child_fields,
+ std::vector<std::shared_ptr<Array>>* arrays) {
+ arrays->reserve(static_cast<int>(child_fields.size()));
+
+ for (const auto& child_field : child_fields) {
+ std::shared_ptr<Array> field_array;
+ RETURN_NOT_OK(LoadChild(*child_field.get(), &field_array));
+ arrays->emplace_back(field_array);
+ }
+ return Status::OK();
+ }
+
+#define VISIT_PRIMITIVE(TYPE) \
+ Status Visit(const TYPE& type) override { return LoadPrimitive<TYPE>(); }
+
+ VISIT_PRIMITIVE(BooleanType);
+ VISIT_PRIMITIVE(Int8Type);
+ VISIT_PRIMITIVE(Int16Type);
+ VISIT_PRIMITIVE(Int32Type);
+ VISIT_PRIMITIVE(Int64Type);
+ VISIT_PRIMITIVE(UInt8Type);
+ VISIT_PRIMITIVE(UInt16Type);
+ VISIT_PRIMITIVE(UInt32Type);
+ VISIT_PRIMITIVE(UInt64Type);
+ VISIT_PRIMITIVE(HalfFloatType);
+ VISIT_PRIMITIVE(FloatType);
+ VISIT_PRIMITIVE(DoubleType);
+ VISIT_PRIMITIVE(DateType);
+ VISIT_PRIMITIVE(Date32Type);
+ VISIT_PRIMITIVE(TimeType);
+ VISIT_PRIMITIVE(TimestampType);
+
+#undef VISIT_PRIMITIVE
+
+ Status Visit(const StringType& type) override { return LoadBinary<StringArray>(); }
+
+ Status Visit(const BinaryType& type) override { return LoadBinary<BinaryArray>(); }
+
+ Status Visit(const ListType& type) override {
+ FieldMetadata field_meta;
+ std::shared_ptr<Buffer> null_bitmap, offsets;
+
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+ if (field_meta.length > 0) {
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index, &offsets));
+ } else {
+ offsets = nullptr;
+ }
+ ++context_->buffer_index;
+
+ const int num_children = type.num_children();
+ if (num_children != 1) {
+ std::stringstream ss;
+ ss << "Wrong number of children: " << num_children;
+ return Status::Invalid(ss.str());
+ }
+ std::shared_ptr<Array> values_array;
+
+ RETURN_NOT_OK(LoadChild(*type.child(0).get(), &values_array));
+
+ result_ = std::make_shared<ListArray>(type_, field_meta.length, offsets, values_array,
+ null_bitmap, field_meta.null_count);
+ return Status::OK();
+ }
+
+ Status Visit(const StructType& type) override {
+ FieldMetadata field_meta;
+ std::shared_ptr<Buffer> null_bitmap;
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+
+ std::vector<std::shared_ptr<Array>> fields;
+ RETURN_NOT_OK(LoadChildren(type.children(), &fields));
+
+ result_ = std::make_shared<StructArray>(
+ type_, field_meta.length, fields, null_bitmap, field_meta.null_count);
+ return Status::OK();
+ }
+
+ Status Visit(const UnionType& type) override {
+ FieldMetadata field_meta;
+ std::shared_ptr<Buffer> null_bitmap, type_ids, offsets;
+
+ RETURN_NOT_OK(LoadCommon(&field_meta, &null_bitmap));
+ if (field_meta.length > 0) {
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index, &type_ids));
+ if (type.mode == UnionMode::DENSE) {
+ RETURN_NOT_OK(GetBuffer(context_->buffer_index + 1, &offsets));
+ }
+ }
+ context_->buffer_index += type.mode == UnionMode::DENSE ? 2 : 1;
+
+ std::vector<std::shared_ptr<Array>> fields;
+ RETURN_NOT_OK(LoadChildren(type.children(), &fields));
+
+ result_ = std::make_shared<UnionArray>(type_, field_meta.length, fields, type_ids,
+ offsets, null_bitmap, field_meta.null_count);
+ return Status::OK();
+ }
+
+ Status Visit(const DictionaryType& type) override {
+ std::shared_ptr<Array> indices;
+ RETURN_NOT_OK(LoadArray(type.index_type(), context_, &indices));
+ result_ = std::make_shared<DictionaryArray>(type_, indices);
+ return Status::OK();
+ };
+
+ std::shared_ptr<Array> result() const { return result_; }
+
+ private:
+ const std::shared_ptr<DataType> type_;
+ ArrayLoaderContext* context_;
+
+ // Used in visitor pattern
+ std::shared_ptr<Array> result_;
+};
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+ ArrayComponentSource* source, std::shared_ptr<Array>* out) {
+ ArrayLoaderContext context;
+ context.source = source;
+ context.field_index = context.buffer_index = 0;
+ context.max_recursion_depth = kMaxNestingDepth;
+ return LoadArray(type, &context, out);
+}
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+ ArrayLoaderContext* context, std::shared_ptr<Array>* out) {
+ ArrayLoader loader(type, context);
+ RETURN_NOT_OK(loader.Load(out));
+
+ return Status::OK();
+}
+
+class InMemorySource : public ArrayComponentSource {
+ public:
+ InMemorySource(const std::vector<FieldMetadata>& fields,
+ const std::vector<std::shared_ptr<Buffer>>& buffers)
+ : fields_(fields), buffers_(buffers) {}
+
+ Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) {
+ DCHECK(buffer_index < static_cast<int>(buffers_.size()));
+ *out = buffers_[buffer_index];
+ return Status::OK();
+ }
+
+ Status GetFieldMetadata(int field_index, FieldMetadata* metadata) {
+ DCHECK(field_index < static_cast<int>(fields_.size()));
+ *metadata = fields_[field_index];
+ return Status::OK();
+ }
+
+ private:
+ const std::vector<FieldMetadata>& fields_;
+ const std::vector<std::shared_ptr<Buffer>>& buffers_;
+};
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+ const std::vector<FieldMetadata>& fields,
+ const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out) {
+ InMemorySource source(fields, buffers);
+ return LoadArray(type, &source, out);
+}
+
+} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/loader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/loader.h b/cpp/src/arrow/loader.h
new file mode 100644
index 0000000..b4949f2
--- /dev/null
+++ b/cpp/src/arrow/loader.h
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Function for constructing Array array objects from metadata and raw memory
+// buffers
+
+#ifndef ARROW_LOADER_H
+#define ARROW_LOADER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+class Array;
+class Buffer;
+struct DataType;
+
+// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
+// deeply nested schemas, it is expected the user will indicate explicitly the
+// maximum allowed recursion depth
+constexpr int kMaxNestingDepth = 64;
+
+struct ARROW_EXPORT FieldMetadata {
+ int64_t length;
+ int64_t null_count;
+ int64_t offset;
+};
+
+/// Implement this to create new types of Arrow data loaders
+class ARROW_EXPORT ArrayComponentSource {
+ public:
+ virtual ~ArrayComponentSource() = default;
+
+ virtual Status GetBuffer(int buffer_index, std::shared_ptr<Buffer>* out) = 0;
+ virtual Status GetFieldMetadata(int field_index, FieldMetadata* metadata) = 0;
+};
+
+/// Bookkeeping struct for loading array objects from their constituent pieces of raw data
+///
+/// The field_index and buffer_index are incremented in the ArrayLoader
+/// based on how much of the batch is "consumed" (through nested data
+/// reconstruction, for example)
+struct ArrayLoaderContext {
+ ArrayComponentSource* source;
+ int buffer_index;
+ int field_index;
+ int max_recursion_depth;
+};
+
+/// Construct an Array container from type metadata and a collection of memory
+/// buffers
+///
+/// \param[in] field the data type of the array being loaded
+/// \param[in] source an implementation of ArrayComponentSource
+/// \param[out] out the constructed array
+/// \return Status indicating success or failure
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+ ArrayComponentSource* source, std::shared_ptr<Array>* out);
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& field,
+ ArrayLoaderContext* context, std::shared_ptr<Array>* out);
+
+Status ARROW_EXPORT LoadArray(const std::shared_ptr<DataType>& type,
+ const std::vector<FieldMetadata>& fields,
+ const std::vector<std::shared_ptr<Buffer>>& buffers, std::shared_ptr<Array>* out);
+
+} // namespace arrow
+
+#endif // ARROW_LOADER_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/pretty_print.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/pretty_print.cc b/cpp/src/arrow/pretty_print.cc
index 7e69e42..2508fa5 100644
--- a/cpp/src/arrow/pretty_print.cc
+++ b/cpp/src/arrow/pretty_print.cc
@@ -145,9 +145,11 @@ class ArrayPrinter : public ArrayVisitor {
Status Visit(const BinaryArray& array) override { return WriteVarBytes(array); }
- Status Visit(const DateArray& array) override { return Status::NotImplemented("date"); }
+ Status Visit(const DateArray& array) override { return WritePrimitive(array); }
- Status Visit(const TimeArray& array) override { return Status::NotImplemented("time"); }
+ Status Visit(const Date32Array& array) override { return WritePrimitive(array); }
+
+ Status Visit(const TimeArray& array) override { return WritePrimitive(array); }
Status Visit(const TimestampArray& array) override {
return Status::NotImplemented("timestamp");
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 7e5f13a..4679a2f 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -54,9 +54,7 @@ bool DataType::Equals(const DataType& other) const {
}
bool DataType::Equals(const std::shared_ptr<DataType>& other) const {
- if (!other) {
- return false;
- }
+ if (!other) { return false; }
return Equals(*other.get());
}
@@ -106,6 +104,10 @@ std::string DateType::ToString() const {
return std::string("date");
}
+std::string Date32Type::ToString() const {
+ return std::string("date32");
+}
+
// ----------------------------------------------------------------------
// Union type
@@ -135,11 +137,12 @@ std::string UnionType::ToString() const {
// ----------------------------------------------------------------------
// DictionaryType
-DictionaryType::DictionaryType(
- const std::shared_ptr<DataType>& index_type, const std::shared_ptr<Array>& dictionary)
+DictionaryType::DictionaryType(const std::shared_ptr<DataType>& index_type,
+ const std::shared_ptr<Array>& dictionary, bool ordered)
: FixedWidthType(Type::DICTIONARY),
index_type_(index_type),
- dictionary_(dictionary) {}
+ dictionary_(dictionary),
+ ordered_(ordered) {}
int DictionaryType::bit_width() const {
return static_cast<const FixedWidthType*>(index_type_.get())->bit_width();
@@ -178,6 +181,7 @@ ACCEPT_VISITOR(StructType);
ACCEPT_VISITOR(DecimalType);
ACCEPT_VISITOR(UnionType);
ACCEPT_VISITOR(DateType);
+ACCEPT_VISITOR(Date32Type);
ACCEPT_VISITOR(TimeType);
ACCEPT_VISITOR(TimestampType);
ACCEPT_VISITOR(IntervalType);
@@ -205,11 +209,16 @@ TYPE_FACTORY(float64, DoubleType);
TYPE_FACTORY(utf8, StringType);
TYPE_FACTORY(binary, BinaryType);
TYPE_FACTORY(date, DateType);
+TYPE_FACTORY(date32, Date32Type);
std::shared_ptr<DataType> timestamp(TimeUnit unit) {
return std::make_shared<TimestampType>(unit);
}
+std::shared_ptr<DataType> timestamp(const std::string& timezone, TimeUnit unit) {
+ return std::make_shared<TimestampType>(timezone, unit);
+}
+
std::shared_ptr<DataType> time(TimeUnit unit) {
return std::make_shared<TimeType>(unit);
}
@@ -313,6 +322,7 @@ TYPE_VISITOR_DEFAULT(DoubleType);
TYPE_VISITOR_DEFAULT(StringType);
TYPE_VISITOR_DEFAULT(BinaryType);
TYPE_VISITOR_DEFAULT(DateType);
+TYPE_VISITOR_DEFAULT(Date32Type);
TYPE_VISITOR_DEFAULT(TimeType);
TYPE_VISITOR_DEFAULT(TimestampType);
TYPE_VISITOR_DEFAULT(IntervalType);
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 9b1ab32..aa0d70e 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -67,9 +67,12 @@ struct Type {
// Variable-length bytes (no guarantee of UTF8-ness)
BINARY,
- // By default, int32 days since the UNIX epoch
+ // int64_t milliseconds since the UNIX epoch
DATE,
+ // int32_t days since the UNIX epoch
+ DATE32,
+
// Exact timestamp encoded with int64 since UNIX epoch
// Default unit millisecond
TIMESTAMP,
@@ -132,6 +135,7 @@ class ARROW_EXPORT TypeVisitor {
virtual Status Visit(const StringType& type);
virtual Status Visit(const BinaryType& type);
virtual Status Visit(const DateType& type);
+ virtual Status Visit(const Date32Type& type);
virtual Status Visit(const TimeType& type);
virtual Status Visit(const TimestampType& type);
virtual Status Visit(const IntervalType& type);
@@ -425,6 +429,7 @@ struct ARROW_EXPORT UnionType : public DataType {
// ----------------------------------------------------------------------
// Date and time types
+/// Date as int64_t milliseconds since UNIX epoch
struct ARROW_EXPORT DateType : public FixedWidthType {
static constexpr Type::type type_id = Type::DATE;
@@ -439,6 +444,20 @@ struct ARROW_EXPORT DateType : public FixedWidthType {
static std::string name() { return "date"; }
};
+/// Date as int32_t days since UNIX epoch
+struct ARROW_EXPORT Date32Type : public FixedWidthType {
+ static constexpr Type::type type_id = Type::DATE32;
+
+ using c_type = int32_t;
+
+ Date32Type() : FixedWidthType(Type::DATE32) {}
+
+ int bit_width() const override { return static_cast<int>(sizeof(c_type) * 8); }
+
+ Status Accept(TypeVisitor* visitor) const override;
+ std::string ToString() const override;
+};
+
enum class TimeUnit : char { SECOND = 0, MILLI = 1, MICRO = 2, NANO = 3 };
struct ARROW_EXPORT TimeType : public FixedWidthType {
@@ -467,16 +486,20 @@ struct ARROW_EXPORT TimestampType : public FixedWidthType {
int bit_width() const override { return static_cast<int>(sizeof(int64_t) * 8); }
- TimeUnit unit;
-
explicit TimestampType(TimeUnit unit = TimeUnit::MILLI)
: FixedWidthType(Type::TIMESTAMP), unit(unit) {}
+ explicit TimestampType(const std::string& timezone, TimeUnit unit = TimeUnit::MILLI)
+ : FixedWidthType(Type::TIMESTAMP), unit(unit), timezone(timezone) {}
+
TimestampType(const TimestampType& other) : TimestampType(other.unit) {}
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override { return name(); }
static std::string name() { return "timestamp"; }
+
+ TimeUnit unit;
+ std::string timezone;
};
struct ARROW_EXPORT IntervalType : public FixedWidthType {
@@ -507,7 +530,7 @@ class ARROW_EXPORT DictionaryType : public FixedWidthType {
static constexpr Type::type type_id = Type::DICTIONARY;
DictionaryType(const std::shared_ptr<DataType>& index_type,
- const std::shared_ptr<Array>& dictionary);
+ const std::shared_ptr<Array>& dictionary, bool ordered = false);
int bit_width() const override;
@@ -518,11 +541,13 @@ class ARROW_EXPORT DictionaryType : public FixedWidthType {
Status Accept(TypeVisitor* visitor) const override;
std::string ToString() const override;
+ bool ordered() const { return ordered_; }
+
private:
// Must be an integer type (not currently checked)
std::shared_ptr<DataType> index_type_;
-
std::shared_ptr<Array> dictionary_;
+ bool ordered_;
};
// ----------------------------------------------------------------------
@@ -532,6 +557,8 @@ std::shared_ptr<DataType> ARROW_EXPORT list(const std::shared_ptr<Field>& value_
std::shared_ptr<DataType> ARROW_EXPORT list(const std::shared_ptr<DataType>& value_type);
std::shared_ptr<DataType> ARROW_EXPORT timestamp(TimeUnit unit);
+std::shared_ptr<DataType> ARROW_EXPORT timestamp(
+ const std::string& timezone, TimeUnit unit);
std::shared_ptr<DataType> ARROW_EXPORT time(TimeUnit unit);
std::shared_ptr<DataType> ARROW_EXPORT struct_(
@@ -595,6 +622,7 @@ static inline bool is_primitive(Type::type type_id) {
case Type::FLOAT:
case Type::DOUBLE:
case Type::DATE:
+ case Type::DATE32:
case Type::TIMESTAMP:
case Type::TIME:
case Type::INTERVAL:
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type_fwd.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type_fwd.h b/cpp/src/arrow/type_fwd.h
index fc4ad3d..e53afe1 100644
--- a/cpp/src/arrow/type_fwd.h
+++ b/cpp/src/arrow/type_fwd.h
@@ -95,6 +95,10 @@ struct DateType;
using DateArray = NumericArray<DateType>;
using DateBuilder = NumericBuilder<DateType>;
+struct Date32Type;
+using Date32Array = NumericArray<Date32Type>;
+using Date32Builder = NumericBuilder<Date32Type>;
+
struct TimeType;
using TimeArray = NumericArray<TimeType>;
using TimeBuilder = NumericBuilder<TimeType>;
@@ -125,6 +129,7 @@ std::shared_ptr<DataType> ARROW_EXPORT float64();
std::shared_ptr<DataType> ARROW_EXPORT utf8();
std::shared_ptr<DataType> ARROW_EXPORT binary();
std::shared_ptr<DataType> ARROW_EXPORT date();
+std::shared_ptr<DataType> ARROW_EXPORT date32();
} // namespace arrow
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/cpp/src/arrow/type_traits.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/type_traits.h b/cpp/src/arrow/type_traits.h
index d6687c1..2cd1420 100644
--- a/cpp/src/arrow/type_traits.h
+++ b/cpp/src/arrow/type_traits.h
@@ -131,6 +131,18 @@ struct TypeTraits<DateType> {
};
template <>
+struct TypeTraits<Date32Type> {
+ using ArrayType = Date32Array;
+ using BuilderType = Date32Builder;
+
+ static inline int64_t bytes_required(int64_t elements) {
+ return elements * sizeof(int32_t);
+ }
+ constexpr static bool is_parameter_free = true;
+ static inline std::shared_ptr<DataType> type_singleton() { return date32(); }
+};
+
+template <>
struct TypeTraits<TimestampType> {
using ArrayType = TimestampArray;
// using BuilderType = TimestampBuilder;
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/array.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx
index 7787e95..6a6b4ba 100644
--- a/python/pyarrow/array.pyx
+++ b/python/pyarrow/array.pyx
@@ -54,7 +54,8 @@ cdef class Array:
self.type.init(self.sp_array.get().type())
@staticmethod
- def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None, MemoryPool memory_pool=None):
+ def from_pandas(obj, mask=None, timestamps_to_ms=False, Field field=None,
+ MemoryPool memory_pool=None):
"""
Convert pandas.Series to an Arrow Array.
@@ -75,8 +76,9 @@ cdef class Array:
Notes
-----
- Localized timestamps will currently be returned as UTC (pandas's native representation).
- Timezone-naive data will be implicitly interpreted as UTC.
+ Localized timestamps will currently be returned as UTC (pandas's native
+ representation). Timezone-naive data will be implicitly interpreted as
+ UTC.
Examples
--------
@@ -119,9 +121,9 @@ cdef class Array:
series_values = get_series_values(obj)
if isinstance(series_values, pd.Categorical):
- return DictionaryArray.from_arrays(series_values.codes,
- series_values.categories.values,
- mask=mask, memory_pool=memory_pool)
+ return DictionaryArray.from_arrays(
+ series_values.codes, series_values.categories.values,
+ mask=mask, memory_pool=memory_pool)
else:
if series_values.dtype.type == np.datetime64 and timestamps_to_ms:
series_values = series_values.astype('datetime64[ms]')
@@ -134,7 +136,8 @@ cdef class Array:
return box_array(out)
@staticmethod
- def from_list(object list_obj, DataType type=None, MemoryPool memory_pool=None):
+ def from_list(object list_obj, DataType type=None,
+ MemoryPool memory_pool=None):
"""
Convert Python list to Arrow array
@@ -358,7 +361,8 @@ cdef class BinaryArray(Array):
cdef class DictionaryArray(Array):
@staticmethod
- def from_arrays(indices, dictionary, mask=None, MemoryPool memory_pool=None):
+ def from_arrays(indices, dictionary, mask=None,
+ MemoryPool memory_pool=None):
"""
Construct Arrow DictionaryArray from array of indices (must be
non-negative integers) and corresponding array of dictionary values
@@ -380,8 +384,15 @@ cdef class DictionaryArray(Array):
shared_ptr[CDataType] c_type
shared_ptr[CArray] c_result
- arrow_indices = Array.from_pandas(indices, mask=mask, memory_pool=memory_pool)
- arrow_dictionary = Array.from_pandas(dictionary, memory_pool=memory_pool)
+ if mask is None:
+ mask = indices == -1
+ else:
+ mask = mask | (indices == -1)
+
+ arrow_indices = Array.from_pandas(indices, mask=mask,
+ memory_pool=memory_pool)
+ arrow_dictionary = Array.from_pandas(dictionary,
+ memory_pool=memory_pool)
if not isinstance(arrow_indices, IntegerArray):
raise ValueError('Indices must be integer type')
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/table.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx
index 93bc6dd..ad5af1b 100644
--- a/python/pyarrow/table.pyx
+++ b/python/pyarrow/table.pyx
@@ -359,7 +359,8 @@ cdef class RecordBatch:
"""
Number of rows
- Due to the definition of a RecordBatch, all columns have the same number of rows.
+ Due to the definition of a RecordBatch, all columns have the same
+ number of rows.
Returns
-------
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/pyarrow/tests/test_convert_pandas.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py
index 960653d..953fa2c 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -81,7 +81,11 @@ class TestPandasConversion(unittest.TestCase):
arr = A.Array.from_pandas(values, timestamps_to_ms=timestamps_to_ms,
field=field)
result = arr.to_pandas()
- tm.assert_series_equal(pd.Series(result), pd.Series(values), check_names=False)
+
+ assert arr.null_count == pd.isnull(values).sum()
+
+ tm.assert_series_equal(pd.Series(result), pd.Series(values),
+ check_names=False)
def test_float_no_nulls(self):
data = {}
http://git-wip-us.apache.org/repos/asf/arrow/blob/6b3ae2ae/python/src/pyarrow/adapters/pandas.cc
----------------------------------------------------------------------
diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc
index cadb53e..c707ada 100644
--- a/python/src/pyarrow/adapters/pandas.cc
+++ b/python/src/pyarrow/adapters/pandas.cc
@@ -34,6 +34,7 @@
#include <unordered_map>
#include "arrow/api.h"
+#include "arrow/loader.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
@@ -610,6 +611,7 @@ class PandasBlock {
DOUBLE,
BOOL,
DATETIME,
+ DATETIME_WITH_TZ,
CATEGORICAL
};
@@ -1157,7 +1159,7 @@ class DataFrameBlockCreator {
}
int block_placement = 0;
- if (column_type == Type::DICTIONARY) {
+ if (output_type == PandasBlock::CATEGORICAL) {
std::shared_ptr<PandasBlock> block;
RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block));
categorical_blocks_[i] = block;
@@ -1518,15 +1520,16 @@ inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) {
null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_);
}
- // For readability
- constexpr int64_t kOffset = 0;
-
RETURN_NOT_OK(ConvertData());
std::shared_ptr<DataType> type;
RETURN_NOT_OK(MakeDataType(&type));
- RETURN_NOT_OK(
- MakePrimitiveArray(type, length_, data_, null_bitmap_, null_count, kOffset, out));
- return Status::OK();
+
+ std::vector<arrow::FieldMetadata> fields(1);
+ fields[0].length = length_;
+ fields[0].null_count = null_count;
+ fields[0].offset = 0;
+
+ return arrow::LoadArray(type, fields, {null_bitmap_, data_}, out);
}
template <>