You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/09/08 22:32:04 UTC
arrow git commit: ARROW-1480: [Python] Improve performance of
serializing sets
Repository: arrow
Updated Branches:
refs/heads/master 11ebe9387 -> 5aca7b669
ARROW-1480: [Python] Improve performance of serializing sets
Author: Philipp Moritz <pc...@gmail.com>
Author: Wes McKinney <we...@twosigma.com>
Closes #1060 from pcmoritz/serialize-sets and squashes the following commits:
86707aaa [Wes McKinney] Update RecordBatch::column docstring now that columns are being cached
cb451aab [Wes McKinney] Incorporate code review comments. Add internal caching of boxed arrays to StructArray, UnionArray, RecordBatch
89f191b0 [Philipp Moritz] fix linting
3d335e5a [Philipp Moritz] fix
c705e435 [Philipp Moritz] deserialization
a59cb989 [Philipp Moritz] support serializing sets
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5aca7b66
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5aca7b66
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5aca7b66
Branch: refs/heads/master
Commit: 5aca7b669530b20121b8dda566ddc20ac1dadbeb
Parents: 11ebe93
Author: Philipp Moritz <pc...@gmail.com>
Authored: Fri Sep 8 18:31:59 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Fri Sep 8 18:31:59 2017 -0400
----------------------------------------------------------------------
cpp/src/arrow/array-test.cc | 2 +
cpp/src/arrow/array.cc | 21 ++--
cpp/src/arrow/array.h | 7 ++
cpp/src/arrow/python/arrow_to_python.cc | 155 ++++++++++++++----------
cpp/src/arrow/python/python_to_arrow.cc | 55 ++++++---
cpp/src/arrow/table.cc | 26 ++--
cpp/src/arrow/table.h | 13 +-
python/pyarrow/tests/test_serialization.py | 1 +
8 files changed, 183 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/array-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index 5d9eb18..c92c23d 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -2460,6 +2460,8 @@ TEST(TestUnionArrayAdHoc, TestSliceEquals) {
auto CheckUnion = [&size](std::shared_ptr<Array> array) {
std::shared_ptr<Array> slice, slice2;
slice = array->Slice(2);
+ ASSERT_EQ(size - 2, slice->length());
+
slice2 = array->Slice(2);
ASSERT_EQ(size - 2, slice->length());
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/array.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 34f0868..2d37274 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -329,6 +329,7 @@ std::string DecimalArray::FormatValue(int64_t i) const {
StructArray::StructArray(const std::shared_ptr<ArrayData>& data) {
DCHECK_EQ(data->type->id(), Type::STRUCT);
SetData(data);
+ boxed_fields_.resize(data->child_data.size());
}
StructArray::StructArray(const std::shared_ptr<DataType>& type, int64_t length,
@@ -341,12 +342,14 @@ StructArray::StructArray(const std::shared_ptr<DataType>& type, int64_t length,
for (const auto& child : children) {
data_->child_data.push_back(child->data());
}
+ boxed_fields_.resize(children.size());
}
-std::shared_ptr<Array> StructArray::field(int pos) const {
- std::shared_ptr<Array> result;
- DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok());
- return result;
+std::shared_ptr<Array> StructArray::field(int i) const {
+ if (!boxed_fields_[i]) {
+ DCHECK(internal::MakeArray(data_->child_data[i], &boxed_fields_[i]).ok());
+ }
+ return boxed_fields_[i];
}
// ----------------------------------------------------------------------
@@ -362,6 +365,7 @@ void UnionArray::SetData(const std::shared_ptr<ArrayData>& data) {
raw_value_offsets_ = value_offsets == nullptr
? nullptr
: reinterpret_cast<const int32_t*>(value_offsets->data());
+ boxed_fields_.resize(data->child_data.size());
}
UnionArray::UnionArray(const std::shared_ptr<ArrayData>& data) {
@@ -384,10 +388,11 @@ UnionArray::UnionArray(const std::shared_ptr<DataType>& type, int64_t length,
SetData(internal_data);
}
-std::shared_ptr<Array> UnionArray::child(int pos) const {
- std::shared_ptr<Array> result;
- DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok());
- return result;
+std::shared_ptr<Array> UnionArray::child(int i) const {
+ if (!boxed_fields_[i]) {
+ DCHECK(internal::MakeArray(data_->child_data[i], &boxed_fields_[i]).ok());
+ }
+ return boxed_fields_[i];
}
// ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 3faff71..bfeedd2 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -555,6 +555,10 @@ class ARROW_EXPORT StructArray : public Array {
// Return a shared pointer in case the requestor desires to share ownership
// with this array.
std::shared_ptr<Array> field(int pos) const;
+
+ private:
+ // For caching boxed child data
+ mutable std::vector<std::shared_ptr<Array>> boxed_fields_;
};
// ----------------------------------------------------------------------
@@ -592,6 +596,9 @@ class ARROW_EXPORT UnionArray : public Array {
const type_id_t* raw_type_ids_;
const int32_t* raw_value_offsets_;
+
+ // For caching boxed child data
+ mutable std::vector<std::shared_ptr<Array>> boxed_fields_;
};
// ----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/python/arrow_to_python.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc
index b127971..bc12ba7 100644
--- a/cpp/src/arrow/python/arrow_to_python.cc
+++ b/cpp/src/arrow/python/arrow_to_python.cc
@@ -37,26 +37,31 @@ namespace py {
Status CallDeserializeCallback(PyObject* context, PyObject* value,
PyObject** deserialized_object);
-Status DeserializeTuple(PyObject* context, std::shared_ptr<Array> array,
- int64_t start_idx, int64_t stop_idx, PyObject* base,
+Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx,
+ int64_t stop_idx, PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);
-Status DeserializeList(PyObject* context, std::shared_ptr<Array> array, int64_t start_idx,
+Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx,
int64_t stop_idx, PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);
-Status DeserializeDict(PyObject* context, std::shared_ptr<Array> array, int64_t start_idx,
+Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
+ int64_t stop_idx, PyObject* base,
+ const std::vector<std::shared_ptr<Tensor>>& tensors,
+ PyObject** out);
+
+Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
int64_t stop_idx, PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out) {
- auto data = std::dynamic_pointer_cast<StructArray>(array);
+ const auto& data = static_cast<const StructArray&>(array);
ScopedRef keys, vals;
ScopedRef result(PyDict_New());
- RETURN_NOT_OK(DeserializeList(context, data->field(0), start_idx, stop_idx, base,
+ RETURN_NOT_OK(DeserializeList(context, *data.field(0), start_idx, stop_idx, base,
tensors, keys.ref()));
- RETURN_NOT_OK(DeserializeList(context, data->field(1), start_idx, stop_idx, base,
+ RETURN_NOT_OK(DeserializeList(context, *data.field(1), start_idx, stop_idx, base,
tensors, vals.ref()));
for (int64_t i = start_idx; i < stop_idx; ++i) {
// PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem.
@@ -75,11 +80,10 @@ Status DeserializeDict(PyObject* context, std::shared_ptr<Array> array, int64_t
return Status::OK();
}
-Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject* base,
+Status DeserializeArray(const Array& array, int64_t offset, PyObject* base,
const std::vector<std::shared_ptr<arrow::Tensor>>& tensors,
PyObject** out) {
- DCHECK(array);
- int32_t index = std::static_pointer_cast<Int32Array>(array)->Value(offset);
+ int32_t index = static_cast<const Int32Array&>(array).Value(offset);
RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out));
// Mark the array as immutable
ScopedRef flags(PyObject_GetAttrString(*out, "flags"));
@@ -90,54 +94,51 @@ Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject*
return Status::OK();
}
-Status GetValue(PyObject* context, std::shared_ptr<Array> arr, int64_t index,
- int32_t type, PyObject* base,
- const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** result) {
- switch (arr->type()->id()) {
+Status GetValue(PyObject* context, const Array& arr, int64_t index, int32_t type,
+ PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors,
+ PyObject** result) {
+ switch (arr.type()->id()) {
case Type::BOOL:
- *result =
- PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
+ *result = PyBool_FromLong(static_cast<const BooleanArray&>(arr).Value(index));
return Status::OK();
case Type::INT64:
- *result =
- PyLong_FromSsize_t(std::static_pointer_cast<Int64Array>(arr)->Value(index));
+ *result = PyLong_FromSsize_t(static_cast<const Int64Array&>(arr).Value(index));
return Status::OK();
case Type::BINARY: {
int32_t nchars;
- const uint8_t* str =
- std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
+ const uint8_t* str = static_cast<const BinaryArray&>(arr).GetValue(index, &nchars);
*result = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return CheckPyError();
}
case Type::STRING: {
int32_t nchars;
- const uint8_t* str =
- std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
+ const uint8_t* str = static_cast<const StringArray&>(arr).GetValue(index, &nchars);
*result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return CheckPyError();
}
case Type::FLOAT:
- *result =
- PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
+ *result = PyFloat_FromDouble(static_cast<const FloatArray&>(arr).Value(index));
return Status::OK();
case Type::DOUBLE:
- *result =
- PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
+ *result = PyFloat_FromDouble(static_cast<const DoubleArray&>(arr).Value(index));
return Status::OK();
case Type::STRUCT: {
- auto s = std::static_pointer_cast<StructArray>(arr);
- auto l = std::static_pointer_cast<ListArray>(s->field(0));
- if (s->type()->child(0)->name() == "list") {
- return DeserializeList(context, l->values(), l->value_offset(index),
- l->value_offset(index + 1), base, tensors, result);
- } else if (s->type()->child(0)->name() == "tuple") {
- return DeserializeTuple(context, l->values(), l->value_offset(index),
- l->value_offset(index + 1), base, tensors, result);
- } else if (s->type()->child(0)->name() == "dict") {
- return DeserializeDict(context, l->values(), l->value_offset(index),
- l->value_offset(index + 1), base, tensors, result);
+ const auto& s = static_cast<const StructArray&>(arr);
+ const auto& l = static_cast<const ListArray&>(*s.field(0));
+ if (s.type()->child(0)->name() == "list") {
+ return DeserializeList(context, *l.values(), l.value_offset(index),
+ l.value_offset(index + 1), base, tensors, result);
+ } else if (s.type()->child(0)->name() == "tuple") {
+ return DeserializeTuple(context, *l.values(), l.value_offset(index),
+ l.value_offset(index + 1), base, tensors, result);
+ } else if (s.type()->child(0)->name() == "dict") {
+ return DeserializeDict(context, *l.values(), l.value_offset(index),
+ l.value_offset(index + 1), base, tensors, result);
+ } else if (s.type()->child(0)->name() == "set") {
+ return DeserializeSet(context, *l.values(), l.value_offset(index),
+ l.value_offset(index + 1), base, tensors, result);
} else {
- DCHECK(false) << "unexpected StructArray type " << s->type()->child(0)->name();
+ DCHECK(false) << "unexpected StructArray type " << s.type()->child(0)->name();
}
}
// We use an Int32Builder here to distinguish the tensor indices from
@@ -151,42 +152,72 @@ Status GetValue(PyObject* context, std::shared_ptr<Array> arr, int64_t index,
return Status::OK();
}
-#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \
- auto data = std::dynamic_pointer_cast<UnionArray>(array); \
- int64_t size = array->length(); \
- ScopedRef result(CREATE_FN(stop_idx - start_idx)); \
- auto types = std::make_shared<Int8Array>(size, data->type_ids()); \
- auto offsets = std::make_shared<Int32Array>(size, data->value_offsets()); \
- for (int64_t i = start_idx; i < stop_idx; ++i) { \
- if (data->IsNull(i)) { \
- Py_INCREF(Py_None); \
- SET_ITEM_FN(result.get(), i - start_idx, Py_None); \
- } else { \
- int64_t offset = offsets->Value(i); \
- int8_t type = types->Value(i); \
- std::shared_ptr<Array> arr = data->child(type); \
- PyObject* value; \
- RETURN_NOT_OK(GetValue(context, arr, offset, type, base, tensors, &value)); \
- SET_ITEM_FN(result.get(), i - start_idx, value); \
- } \
- } \
- *out = result.release(); \
+#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \
+ const auto& data = static_cast<const UnionArray&>(array); \
+ int64_t size = array.length(); \
+ ScopedRef result(CREATE_FN(stop_idx - start_idx)); \
+ auto types = std::make_shared<Int8Array>(size, data.type_ids()); \
+ auto offsets = std::make_shared<Int32Array>(size, data.value_offsets()); \
+ for (int64_t i = start_idx; i < stop_idx; ++i) { \
+ if (data.IsNull(i)) { \
+ Py_INCREF(Py_None); \
+ SET_ITEM_FN(result.get(), i - start_idx, Py_None); \
+ } else { \
+ int64_t offset = offsets->Value(i); \
+ int8_t type = types->Value(i); \
+ PyObject* value; \
+ RETURN_NOT_OK( \
+ GetValue(context, *data.child(type), offset, type, base, tensors, &value)); \
+ SET_ITEM_FN(result.get(), i - start_idx, value); \
+ } \
+ } \
+ *out = result.release(); \
return Status::OK()
-Status DeserializeList(PyObject* context, std::shared_ptr<Array> array, int64_t start_idx,
+Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx,
int64_t stop_idx, PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out) {
DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM);
}
-Status DeserializeTuple(PyObject* context, std::shared_ptr<Array> array,
- int64_t start_idx, int64_t stop_idx, PyObject* base,
+Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx,
+ int64_t stop_idx, PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out) {
DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM);
}
+Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
+ int64_t stop_idx, PyObject* base,
+ const std::vector<std::shared_ptr<Tensor>>& tensors,
+ PyObject** out) {
+ const auto& data = static_cast<const UnionArray&>(array);
+ int64_t size = array.length();
+ ScopedRef result(PySet_New(nullptr));
+ auto types = std::make_shared<Int8Array>(size, data.type_ids());
+ auto offsets = std::make_shared<Int32Array>(size, data.value_offsets());
+ for (int64_t i = start_idx; i < stop_idx; ++i) {
+ if (data.IsNull(i)) {
+ Py_INCREF(Py_None);
+ if (PySet_Add(result.get(), Py_None) < 0) {
+ RETURN_IF_PYERROR();
+ }
+ } else {
+ int64_t offset = offsets->Value(i);
+ int8_t type = types->Value(i);
+ PyObject* value;
+ RETURN_NOT_OK(
+ GetValue(context, *data.child(type), offset, type, base, tensors, &value));
+ if (PySet_Add(result.get(), value) < 0) {
+ RETURN_IF_PYERROR();
+ }
+ }
+ }
+ *out = result.release();
+ return Status::OK();
+}
+
Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) {
int64_t offset;
int64_t bytes_read;
@@ -213,7 +244,7 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out)
Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObject* base,
PyObject** out) {
PyAcquireGIL lock;
- return DeserializeList(context, obj.batch->column(0), 0, obj.batch->num_rows(), base,
+ return DeserializeList(context, *obj.batch->column(0), 0, obj.batch->num_rows(), base,
obj.tensors, out);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/python/python_to_arrow.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc
index 65e5f6a..c57091f 100644
--- a/cpp/src/arrow/python/python_to_arrow.cc
+++ b/cpp/src/arrow/python/python_to_arrow.cc
@@ -62,7 +62,8 @@ class SequenceBuilder {
tensor_indices_(::arrow::int32(), pool),
list_offsets_({0}),
tuple_offsets_({0}),
- dict_offsets_({0}) {}
+ dict_offsets_({0}),
+ set_offsets_({0}) {}
/// Appending a none to the sequence
Status AppendNone() {
@@ -163,6 +164,12 @@ class SequenceBuilder {
return Status::OK();
}
+ Status AppendSet(Py_ssize_t size) {
+ RETURN_NOT_OK(Update(set_offsets_.size() - 1, &set_tag_));
+ set_offsets_.push_back(set_offsets_.back() + static_cast<int32_t>(size));
+ return Status::OK();
+ }
+
template <typename BuilderType>
Status AddElement(const int8_t tag, BuilderType* out) {
if (tag != -1) {
@@ -200,7 +207,7 @@ class SequenceBuilder {
/// Finish building the sequence and return the result.
/// Input arrays may be nullptr
Status Finish(const Array* list_data, const Array* tuple_data, const Array* dict_data,
- std::shared_ptr<Array>* out) {
+ const Array* set_data, std::shared_ptr<Array>* out) {
fields_.resize(num_tags_);
children_.resize(num_tags_);
@@ -215,6 +222,7 @@ class SequenceBuilder {
RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list"));
RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple"));
RETURN_NOT_OK(AddSubsequence(dict_tag_, dict_data, dict_offsets_, "dict"));
+ RETURN_NOT_OK(AddSubsequence(set_tag_, set_data, set_offsets_, "set"));
auto type = ::arrow::union_(fields_, type_ids_, UnionMode::DENSE);
out->reset(new UnionArray(type, types_.length(), children_, types_.data(),
@@ -246,6 +254,7 @@ class SequenceBuilder {
std::vector<int32_t> list_offsets_;
std::vector<int32_t> tuple_offsets_;
std::vector<int32_t> dict_offsets_;
+ std::vector<int32_t> set_offsets_;
// Tags for members of the sequence. If they are set to -1 it means
// they are not used and will not part be of the metadata when we call
@@ -263,6 +272,7 @@ class SequenceBuilder {
int8_t list_tag_ = -1;
int8_t tuple_tag_ = -1;
int8_t dict_tag_ = -1;
+ int8_t set_tag_ = -1;
int8_t num_tags_ = 0;
@@ -297,12 +307,14 @@ class DictBuilder {
/// value list of the dictionary
Status Finish(const Array* key_tuple_data, const Array* key_dict_data,
const Array* val_list_data, const Array* val_tuple_data,
- const Array* val_dict_data, std::shared_ptr<Array>* out) {
- // lists and dicts can't be keys of dicts in Python, that is why for
+ const Array* val_dict_data, const Array* val_set_data,
+ std::shared_ptr<Array>* out) {
+ // lists and sets can't be keys of dicts in Python, that is why for
// the keys we do not need to collect sublists
std::shared_ptr<Array> keys, vals;
- RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, &keys));
- RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals));
+ RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, nullptr, &keys));
+ RETURN_NOT_OK(
+ vals_.Finish(val_list_data, val_tuple_data, val_dict_data, val_set_data, &vals));
auto keys_field = std::make_shared<Field>("keys", keys->type());
auto vals_field = std::make_shared<Field>("vals", vals->type());
auto type = std::make_shared<StructType>(
@@ -411,7 +423,8 @@ Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
std::vector<PyObject*>* sublists, std::vector<PyObject*>* subtuples,
- std::vector<PyObject*>* subdicts, std::vector<PyObject*>* tensors_out) {
+ std::vector<PyObject*>* subdicts, std::vector<PyObject*>* subsets,
+ std::vector<PyObject*>* tensors_out) {
// The bool case must precede the int case (PyInt_Check passes for bools)
if (PyBool_Check(elem)) {
RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
@@ -463,6 +476,9 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
} else if (PyTuple_CheckExact(elem)) {
RETURN_NOT_OK(builder->AppendTuple(PyTuple_Size(elem)));
subtuples->push_back(elem);
+ } else if (PySet_Check(elem)) {
+ RETURN_NOT_OK(builder->AppendSet(PySet_Size(elem)));
+ subsets->push_back(elem);
} else if (PyArray_IsScalar(elem, Generic)) {
RETURN_NOT_OK(AppendScalar(elem, builder));
} else if (PyArray_Check(elem)) {
@@ -522,14 +538,14 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
"recursively.");
}
SequenceBuilder builder(nullptr);
- std::vector<PyObject*> sublists, subtuples, subdicts;
+ std::vector<PyObject*> sublists, subtuples, subdicts, subsets;
for (const auto& sequence : sequences) {
ScopedRef iterator(PyObject_GetIter(sequence));
RETURN_IF_PYERROR();
ScopedRef item;
while (item.reset(PyIter_Next(iterator.get())), item.get()) {
RETURN_NOT_OK(Append(context, item.get(), &builder, &sublists, &subtuples,
- &subdicts, tensors_out));
+ &subdicts, &subsets, tensors_out));
}
}
std::shared_ptr<Array> list;
@@ -547,7 +563,12 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
RETURN_NOT_OK(
SerializeDict(context, subdicts, recursion_depth + 1, &dict, tensors_out));
}
- return builder.Finish(list.get(), tuple.get(), dict.get(), out);
+ std::shared_ptr<Array> set;
+ if (subsets.size() > 0) {
+ RETURN_NOT_OK(
+ SerializeSequences(context, subsets, recursion_depth + 1, &set, tensors_out));
+ }
+ return builder.Finish(list.get(), tuple.get(), dict.get(), set.get(), out);
}
Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
@@ -559,16 +580,17 @@ Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
"This object exceeds the maximum recursion depth. It may contain itself "
"recursively.");
}
- std::vector<PyObject*> key_tuples, key_dicts, val_lists, val_tuples, val_dicts, dummy;
+ std::vector<PyObject*> key_tuples, key_dicts, val_lists, val_tuples, val_dicts,
+ val_sets, dummy;
for (const auto& dict : dicts) {
PyObject *key, *value;
Py_ssize_t pos = 0;
while (PyDict_Next(dict, &pos, &key, &value)) {
RETURN_NOT_OK(Append(context, key, &result.keys(), &dummy, &key_tuples, &key_dicts,
- tensors_out));
+ &dummy, tensors_out));
DCHECK_EQ(dummy.size(), 0);
RETURN_NOT_OK(Append(context, value, &result.vals(), &val_lists, &val_tuples,
- &val_dicts, tensors_out));
+ &val_dicts, &val_sets, tensors_out));
}
}
std::shared_ptr<Array> key_tuples_arr;
@@ -596,9 +618,14 @@ Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
RETURN_NOT_OK(SerializeDict(context, val_dicts, recursion_depth + 1, &val_dict_arr,
tensors_out));
}
+ std::shared_ptr<Array> val_set_arr;
+ if (val_sets.size() > 0) {
+ RETURN_NOT_OK(SerializeSequences(context, val_sets, recursion_depth + 1, &val_set_arr,
+ tensors_out));
+ }
RETURN_NOT_OK(result.Finish(key_tuples_arr.get(), key_dicts_arr.get(),
val_list_arr.get(), val_tuples_arr.get(),
- val_dict_arr.get(), out));
+ val_dict_arr.get(), val_set_arr.get(), out));
// This block is used to decrement the reference counts of the results
// returned by the serialization callback, which is called in SerializeArray,
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/table.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index ae48698..8c7c4e2 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -160,9 +160,15 @@ void AssertBatchValid(const RecordBatch& batch) {
}
}
+RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows)
+ : schema_(schema), num_rows_(num_rows) {
+ boxed_columns_.resize(schema->num_fields());
+}
+
RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
const std::vector<std::shared_ptr<Array>>& columns)
- : schema_(schema), num_rows_(num_rows), columns_(columns.size()) {
+ : RecordBatch(schema, num_rows) {
+ columns_.resize(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
columns_[i] = columns[i]->data();
}
@@ -170,7 +176,8 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows
RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
std::vector<std::shared_ptr<Array>>&& columns)
- : schema_(schema), num_rows_(num_rows), columns_(columns.size()) {
+ : RecordBatch(schema, num_rows) {
+ columns_.resize(columns.size());
for (size_t i = 0; i < columns.size(); ++i) {
columns_[i] = columns[i]->data();
}
@@ -178,16 +185,21 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows
RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
std::vector<std::shared_ptr<internal::ArrayData>>&& columns)
- : schema_(schema), num_rows_(num_rows), columns_(std::move(columns)) {}
+ : RecordBatch(schema, num_rows) {
+ columns_ = std::move(columns);
+}
RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows,
const std::vector<std::shared_ptr<internal::ArrayData>>& columns)
- : schema_(schema), num_rows_(num_rows), columns_(columns) {}
+ : RecordBatch(schema, num_rows) {
+ columns_ = columns;
+}
std::shared_ptr<Array> RecordBatch::column(int i) const {
- std::shared_ptr<Array> result;
- DCHECK(MakeArray(columns_[i], &result).ok());
- return result;
+ if (!boxed_columns_[i]) {
+ DCHECK(internal::MakeArray(columns_[i], &boxed_columns_[i]).ok());
+ }
+ return boxed_columns_[i];
}
const std::string& RecordBatch::column_name(int i) const {
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/table.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 1145d11..da2722d 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -153,13 +153,9 @@ class ARROW_EXPORT RecordBatch {
/// \return true if batches are equal
std::shared_ptr<Schema> schema() const { return schema_; }
- /// \brief Retrieve an array from the record batch (new object)
+ /// \brief Retrieve an array from the record batch
/// \param[in] i field index, does not boundscheck
- /// \return a new Array object
- ///
- /// \note This function returns a new object. If you intend to dereference
- /// the pointer or access the internals, retain a reference to the
- /// std::shared_ptr returned.
+ /// \return an Array object
std::shared_ptr<Array> column(int i) const;
std::shared_ptr<internal::ArrayData> column_data(int i) const { return columns_[i]; }
@@ -197,9 +193,14 @@ class ARROW_EXPORT RecordBatch {
Status Validate() const;
private:
+ RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows);
+
std::shared_ptr<Schema> schema_;
int64_t num_rows_;
std::vector<std::shared_ptr<internal::ArrayData>> columns_;
+
+ // Caching boxed array data
+ mutable std::vector<std::shared_ptr<Array>> boxed_columns_;
};
/// \class Table
http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/python/pyarrow/tests/test_serialization.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index aca3848..7c8cace 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -90,6 +90,7 @@ PRIMITIVE_OBJECTS = [
[1, 2, 3, None], [(None,), 3, 1.0], ["h", "e", "l", "l", "o", None],
(None, None), ("hello", None), (True, False),
{True: "hello", False: "world"}, {"hello": "world", 1: 42, 2.5: 45},
+ {"hello": set([2, 3]), "world": set([42.0]), "this": None},
np.int8(3), np.int32(4), np.int64(5),
np.uint8(3), np.uint32(4), np.uint64(5), np.float32(1.9),
np.float64(1.9), np.zeros([100, 100]),