You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by mr...@apache.org on 2020/04/23 06:34:45 UTC
[arrow] branch master updated: ARROW-8162: [Format][Python] Add
serialization for CSF sparse tensors to Python
This is an automated email from the ASF dual-hosted git repository.
mrkn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 45a0ad3 ARROW-8162: [Format][Python] Add serialization for CSF sparse tensors to Python
45a0ad3 is described below
commit 45a0ad38929699c600dee8a7b44898f49da19a26
Author: Rok <ro...@mihevc.org>
AuthorDate: Thu Apr 23 15:34:12 2020 +0900
ARROW-8162: [Format][Python] Add serialization for CSF sparse tensors to Python
This is to resolve [ARROW-8162](https://issues.apache.org/jira/browse/ARROW-8162).
Once [ARROW-7428](https://issues.apache.org/jira/browse/ARROW-7428) is complete serialization for CSF sparse tensors should be enabled in Python too.
This should be reviewed after #6340
Closes #6667 from rok/ARROW-8162
Authored-by: Rok <ro...@mihevc.org>
Signed-off-by: Kenta Murata <mr...@mrkn.jp>
---
cpp/src/arrow/ipc/reader.cc | 21 +++++---
cpp/src/arrow/python/deserialize.cc | 8 +++
cpp/src/arrow/python/deserialize.h | 10 ++--
cpp/src/arrow/python/serialize.cc | 20 +++++++
cpp/src/arrow/python/serialize.h | 1 +
cpp/src/generated/SparseTensor_generated.h | 1 +
python/pyarrow/includes/libarrow.pxd | 1 +
python/pyarrow/serialization.pxi | 3 ++
python/pyarrow/tests/test_serialization.py | 83 ++++++++++++++++++++++++++++--
9 files changed, 134 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 95b1c5a..b13ba29 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1324,7 +1324,8 @@ namespace internal {
namespace {
-Result<size_t> GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id) {
+Result<size_t> GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id,
+ const size_t ndim) {
switch (format_id) {
case SparseTensorFormat::COO:
return 2;
@@ -1336,18 +1337,19 @@ Result<size_t> GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id
return 3;
case SparseTensorFormat::CSF:
- return 3;
+ return 2 * ndim;
default:
return Status::Invalid("Unrecognized sparse tensor format");
}
}
-Status CheckSparseTensorBodyBufferCount(
- const IpcPayload& payload, SparseTensorFormat::type sparse_tensor_format_id) {
+Status CheckSparseTensorBodyBufferCount(const IpcPayload& payload,
+ SparseTensorFormat::type sparse_tensor_format_id,
+ const size_t ndim) {
size_t expected_body_buffer_count = 0;
ARROW_ASSIGN_OR_RAISE(expected_body_buffer_count,
- GetSparseTensorBodyBufferCount(sparse_tensor_format_id));
+ GetSparseTensorBodyBufferCount(sparse_tensor_format_id, ndim));
if (payload.body_buffers.size() != expected_body_buffer_count) {
return Status::Invalid("Invalid body buffer count for a sparse tensor");
}
@@ -1359,10 +1361,12 @@ Status CheckSparseTensorBodyBufferCount(
Result<size_t> ReadSparseTensorBodyBufferCount(const Buffer& metadata) {
SparseTensorFormat::type format_id;
+ std::vector<int64_t> shape;
- RETURN_NOT_OK(internal::GetSparseTensorMetadata(metadata, nullptr, nullptr, nullptr,
+ RETURN_NOT_OK(internal::GetSparseTensorMetadata(metadata, nullptr, &shape, nullptr,
nullptr, &format_id));
- return GetSparseTensorBodyBufferCount(format_id);
+
+ return GetSparseTensorBodyBufferCount(format_id, static_cast<size_t>(shape.size()));
}
Result<std::shared_ptr<SparseTensor>> ReadSparseTensorPayload(const IpcPayload& payload) {
@@ -1378,7 +1382,8 @@ Result<std::shared_ptr<SparseTensor>> ReadSparseTensorPayload(const IpcPayload&
&non_zero_length, &sparse_tensor_format_id,
&sparse_tensor, &buffer));
- RETURN_NOT_OK(CheckSparseTensorBodyBufferCount(payload, sparse_tensor_format_id));
+ RETURN_NOT_OK(CheckSparseTensorBodyBufferCount(payload, sparse_tensor_format_id,
+ static_cast<size_t>(shape.size())));
switch (sparse_tensor_format_id) {
case SparseTensorFormat::COO: {
diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc
index a1a3737..60c6534 100644
--- a/cpp/src/arrow/python/deserialize.cc
+++ b/cpp/src/arrow/python/deserialize.cc
@@ -204,6 +204,14 @@ Status GetValue(PyObject* context, const Array& arr, int64_t index, int8_t type,
*result = wrap_sparse_csc_matrix(sparse_csc_matrix);
return Status::OK();
}
+ case PythonType::SPARSECSFTENSOR: {
+ int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
+ const std::shared_ptr<SparseCSFTensor>& sparse_csf_tensor =
+ arrow::internal::checked_pointer_cast<SparseCSFTensor>(
+ blobs.sparse_tensors[ref]);
+ *result = wrap_sparse_csf_tensor(sparse_csf_tensor);
+ return Status::OK();
+ }
case PythonType::NDARRAY: {
int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
return DeserializeArray(ref, base, blobs, result);
diff --git a/cpp/src/arrow/python/deserialize.h b/cpp/src/arrow/python/deserialize.h
index 9568eef..41b6a13 100644
--- a/cpp/src/arrow/python/deserialize.h
+++ b/cpp/src/arrow/python/deserialize.h
@@ -42,9 +42,13 @@ struct ARROW_PYTHON_EXPORT SparseTensorCounts {
int coo;
int csr;
int csc;
+ int csf;
+ int ndim_csf;
- int num_total_tensors() const { return coo + csr + csc; }
- int num_total_buffers() const { return coo * 3 + csr * 4 + csc * 4; }
+ int num_total_tensors() const { return coo + csr + csc + csf; }
+ int num_total_buffers() const {
+ return coo * 3 + csr * 4 + csc * 4 + 2 * ndim_csf + csf;
+ }
};
/// \brief Read serialized Python sequence from file interface using Arrow IPC
@@ -63,7 +67,7 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out);
/// \param[in] num_buffers number of buffers in the object
/// \param[in] data a list containing pyarrow.Buffer instances. It must be 1 +
/// num_tensors * 2 + num_coo_tensors * 3 + num_csr_tensors * 4 + num_csc_tensors * 4 +
-/// num_buffers in length
+/// num_csf_tensors * (2 * ndim_csf + 3) + num_buffers in length
/// \param[out] out the reconstructed object
/// \return Status
ARROW_PYTHON_EXPORT
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index 1a2897f..e469f6a 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -181,6 +181,16 @@ class SequenceBuilder {
return sparse_csc_matrix_indices_->Append(sparse_csc_matrix_index);
}
+ // Appending a sparse csf tensor to the sequence
+ //
+ // \param sparse_csf_tensor_index Index of the sparse csf tensor in the object.
+ Status AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index) {
+ RETURN_NOT_OK(CreateAndUpdate(&sparse_csf_tensor_indices_,
+ PythonType::SPARSECSFTENSOR,
+ [this]() { return new Int32Builder(pool_); }));
+ return sparse_csf_tensor_indices_->Append(sparse_csf_tensor_index);
+ }
+
// Appending a numpy ndarray to the sequence
//
// \param tensor_index Index of the tensor in the object.
@@ -277,6 +287,7 @@ class SequenceBuilder {
std::shared_ptr<Int32Builder> sparse_coo_tensor_indices_;
std::shared_ptr<Int32Builder> sparse_csr_matrix_indices_;
std::shared_ptr<Int32Builder> sparse_csc_matrix_indices_;
+ std::shared_ptr<Int32Builder> sparse_csf_tensor_indices_;
std::shared_ptr<Int32Builder> ndarray_indices_;
std::shared_ptr<Int32Builder> buffer_indices_;
@@ -515,6 +526,11 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
static_cast<int32_t>(blobs_out->sparse_tensors.size())));
ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csc_matrix(elem));
blobs_out->sparse_tensors.push_back(matrix);
+ } else if (is_sparse_csf_tensor(elem)) {
+ RETURN_NOT_OK(builder->AppendSparseCSFTensor(
+ static_cast<int32_t>(blobs_out->sparse_tensors.size())));
+ ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_csf_tensor(elem));
+ blobs_out->sparse_tensors.push_back(tensor);
} else {
// Attempt to serialize the object using the custom callback.
PyObject* serialized_object;
@@ -659,6 +675,7 @@ Status CountSparseTensors(
size_t num_csr = 0;
size_t num_csc = 0;
size_t num_csf = 0;
+ size_t ndim_csf = 0;
for (const auto& sparse_tensor : sparse_tensors) {
switch (sparse_tensor->format_id()) {
@@ -673,6 +690,7 @@ Status CountSparseTensors(
break;
case SparseTensorFormat::CSF:
++num_csf;
+ ndim_csf += sparse_tensor->ndim();
break;
}
}
@@ -681,6 +699,7 @@ Status CountSparseTensors(
PyDict_SetItemString(num_sparse_tensors.obj(), "csr", PyLong_FromSize_t(num_csr));
PyDict_SetItemString(num_sparse_tensors.obj(), "csc", PyLong_FromSize_t(num_csc));
PyDict_SetItemString(num_sparse_tensors.obj(), "csf", PyLong_FromSize_t(num_csf));
+ PyDict_SetItemString(num_sparse_tensors.obj(), "ndim_csf", PyLong_FromSize_t(ndim_csf));
RETURN_IF_PYERROR();
*out = num_sparse_tensors.detach();
@@ -704,6 +723,7 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
PyLong_FromSize_t(this->tensors.size()));
RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors, &num_sparse_tensors));
PyDict_SetItemString(result.obj(), "num_sparse_tensors", num_sparse_tensors);
+ PyDict_SetItemString(result.obj(), "ndim_csf", num_sparse_tensors);
PyDict_SetItemString(result.obj(), "num_ndarrays",
PyLong_FromSize_t(this->ndarrays.size()));
PyDict_SetItemString(result.obj(), "num_buffers",
diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h
index 8ef5516..191e279 100644
--- a/cpp/src/arrow/python/serialize.h
+++ b/cpp/src/arrow/python/serialize.h
@@ -134,6 +134,7 @@ struct PythonType {
SPARSECOOTENSOR,
SPARSECSRMATRIX,
SPARSECSCMATRIX,
+ SPARSECSFTENSOR,
NUM_PYTHON_TYPES
};
};
diff --git a/cpp/src/generated/SparseTensor_generated.h b/cpp/src/generated/SparseTensor_generated.h
index 023d6cc..9c00326 100644
--- a/cpp/src/generated/SparseTensor_generated.h
+++ b/cpp/src/generated/SparseTensor_generated.h
@@ -20,6 +20,7 @@ struct SparseMatrixIndexCSX;
struct SparseMatrixIndexCSXBuilder;
struct SparseTensorIndexCSF;
+struct SparseTensorIndexCSFBuilder;
struct SparseTensor;
struct SparseTensorBuilder;
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 8a5b0b7..7545d0b 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1721,6 +1721,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
int csr
int csc
int csf
+ int ndim_csf
int num_total_tensors() const
int num_total_buffers() const
diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi
index 757bd15..3a9901c 100644
--- a/python/pyarrow/serialization.pxi
+++ b/python/pyarrow/serialization.pxi
@@ -308,6 +308,9 @@ cdef class SerializedPyObject:
num_sparse_tensors.coo = components['num_sparse_tensors']['coo']
num_sparse_tensors.csr = components['num_sparse_tensors']['csr']
num_sparse_tensors.csc = components['num_sparse_tensors']['csc']
+ num_sparse_tensors.csf = components['num_sparse_tensors']['csf']
+ num_sparse_tensors.ndim_csf = \
+ components['num_sparse_tensors']['ndim_csf']
with nogil:
check_status(GetSerializedFromComponents(num_tensors,
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index e54cd88..ede2b33 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -125,6 +125,9 @@ def assert_equal(obj1, obj2):
elif isinstance(obj1, pa.SparseCSCMatrix) and \
isinstance(obj2, pa.SparseCSCMatrix):
assert obj1.equals(obj2)
+ elif isinstance(obj1, pa.SparseCSFTensor) and \
+ isinstance(obj2, pa.SparseCSFTensor):
+ assert obj1.equals(obj2)
elif isinstance(obj1, pa.RecordBatch) and isinstance(obj2, pa.RecordBatch):
assert obj1.equals(obj2)
elif isinstance(obj1, pa.Table) and isinstance(obj2, pa.Table):
@@ -158,7 +161,6 @@ index_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8')
tensor_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8',
'f2', 'f4', 'f8')
-
PRIMITIVE_OBJECTS += [0, np.array([["hi", "hi"], [1.3, 1]])]
@@ -727,6 +729,77 @@ def test_scipy_sparse_csc_matrix_serialization():
assert np.array_equal(sparse_array.toarray(), result.toarray())
+@pytest.mark.parametrize('tensor_type', tensor_types)
+@pytest.mark.parametrize('index_type', index_types)
+def test_sparse_csf_tensor_serialization(index_type, tensor_type):
+ tensor_dtype = np.dtype(tensor_type)
+ index_dtype = np.dtype(index_type)
+ data = np.array([[1, 2, 3, 4, 5, 6, 7, 8]]).T.astype(tensor_dtype)
+ indptr = [
+ np.array([0, 2, 3]),
+ np.array([0, 1, 3, 4]),
+ np.array([0, 2, 4, 5, 8]),
+ ]
+ indices = [
+ np.array([0, 1]),
+ np.array([0, 1, 1]),
+ np.array([0, 0, 1, 1]),
+ np.array([1, 2, 0, 2, 0, 0, 1, 2]),
+ ]
+ indptr = [x.astype(index_dtype) for x in indptr]
+ indices = [x.astype(index_dtype) for x in indices]
+ shape = (2, 3, 4, 5)
+ axis_order = (0, 1, 2, 3)
+ dim_names = ("a", "b", "c", "d")
+
+ for ndim in [2, 3, 4]:
+ sparse_tensor = pa.SparseCSFTensor.from_numpy(data, indptr[:ndim - 1],
+ indices[:ndim],
+ shape[:ndim],
+ axis_order[:ndim],
+ dim_names[:ndim])
+
+ context = pa.default_serialization_context()
+ serialized = pa.serialize(sparse_tensor, context=context).to_buffer()
+ result = pa.deserialize(serialized)
+ assert_equal(result, sparse_tensor)
+ assert isinstance(result, pa.SparseCSFTensor)
+
+
+@pytest.mark.parametrize('tensor_type', tensor_types)
+@pytest.mark.parametrize('index_type', index_types)
+def test_sparse_csf_tensor_components_serialization(large_buffer,
+ index_type, tensor_type):
+ tensor_dtype = np.dtype(tensor_type)
+ index_dtype = np.dtype(index_type)
+ data = np.array([[1, 2, 3, 4, 5, 6, 7, 8]]).T.astype(tensor_dtype)
+ indptr = [
+ np.array([0, 2, 3]),
+ np.array([0, 1, 3, 4]),
+ np.array([0, 2, 4, 5, 8]),
+ ]
+ indices = [
+ np.array([0, 1]),
+ np.array([0, 1, 1]),
+ np.array([0, 0, 1, 1]),
+ np.array([1, 2, 0, 2, 0, 0, 1, 2]),
+ ]
+ indptr = [x.astype(index_dtype) for x in indptr]
+ indices = [x.astype(index_dtype) for x in indices]
+ shape = (2, 3, 4, 5)
+ axis_order = (0, 1, 2, 3)
+ dim_names = ("a", "b", "c", "d")
+
+ for ndim in [2, 3, 4]:
+ sparse_tensor = pa.SparseCSFTensor.from_numpy(data, indptr[:ndim - 1],
+ indices[:ndim],
+ shape[:ndim],
+ axis_order[:ndim],
+ dim_names[:ndim])
+
+ serialization_roundtrip(sparse_tensor, large_buffer)
+
+
@pytest.mark.filterwarnings(
"ignore:the matrix subclass:PendingDeprecationWarning")
def test_numpy_matrix_serialization(tmpdir):
@@ -929,7 +1002,9 @@ def test_serialize_to_components_invalid_cases():
components = {
'num_tensors': 0,
- 'num_sparse_tensors': {'coo': 0, 'csr': 0, 'csc': 0},
+ 'num_sparse_tensors': {
+ 'coo': 0, 'csr': 0, 'csc': 0, 'csf': 0, 'ndim_csf': 0
+ },
'num_ndarrays': 0,
'num_buffers': 1,
'data': [buf]
@@ -940,7 +1015,9 @@ def test_serialize_to_components_invalid_cases():
components = {
'num_tensors': 0,
- 'num_sparse_tensors': {'coo': 0, 'csr': 0, 'csc': 0},
+ 'num_sparse_tensors': {
+ 'coo': 0, 'csr': 0, 'csc': 0, 'csf': 0, 'ndim_csf': 0
+ },
'num_ndarrays': 1,
'num_buffers': 0,
'data': [buf, buf]