You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by mr...@apache.org on 2020/04/23 06:34:45 UTC

[arrow] branch master updated: ARROW-8162: [Format][Python] Add serialization for CSF sparse tensors to Python

This is an automated email from the ASF dual-hosted git repository.

mrkn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 45a0ad3  ARROW-8162: [Format][Python] Add serialization for CSF sparse tensors to Python
45a0ad3 is described below

commit 45a0ad38929699c600dee8a7b44898f49da19a26
Author: Rok <ro...@mihevc.org>
AuthorDate: Thu Apr 23 15:34:12 2020 +0900

    ARROW-8162: [Format][Python] Add serialization for CSF sparse tensors to Python
    
    This is to resolve [ARROW-8162](https://issues.apache.org/jira/browse/ARROW-8162).
    Once [ARROW-7428](https://issues.apache.org/jira/browse/ARROW-7428) is complete serialization for CSF sparse tensors should be enabled in Python too.
    
    This should be reviewed after #6340
    
    Closes #6667 from rok/ARROW-8162
    
    Authored-by: Rok <ro...@mihevc.org>
    Signed-off-by: Kenta Murata <mr...@mrkn.jp>
---
 cpp/src/arrow/ipc/reader.cc                | 21 +++++---
 cpp/src/arrow/python/deserialize.cc        |  8 +++
 cpp/src/arrow/python/deserialize.h         | 10 ++--
 cpp/src/arrow/python/serialize.cc          | 20 +++++++
 cpp/src/arrow/python/serialize.h           |  1 +
 cpp/src/generated/SparseTensor_generated.h |  1 +
 python/pyarrow/includes/libarrow.pxd       |  1 +
 python/pyarrow/serialization.pxi           |  3 ++
 python/pyarrow/tests/test_serialization.py | 83 ++++++++++++++++++++++++++++--
 9 files changed, 134 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 95b1c5a..b13ba29 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1324,7 +1324,8 @@ namespace internal {
 
 namespace {
 
-Result<size_t> GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id) {
+Result<size_t> GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id,
+                                              const size_t ndim) {
   switch (format_id) {
     case SparseTensorFormat::COO:
       return 2;
@@ -1336,18 +1337,19 @@ Result<size_t> GetSparseTensorBodyBufferCount(SparseTensorFormat::type format_id
       return 3;
 
     case SparseTensorFormat::CSF:
-      return 3;
+      return 2 * ndim;
 
     default:
       return Status::Invalid("Unrecognized sparse tensor format");
   }
 }
 
-Status CheckSparseTensorBodyBufferCount(
-    const IpcPayload& payload, SparseTensorFormat::type sparse_tensor_format_id) {
+Status CheckSparseTensorBodyBufferCount(const IpcPayload& payload,
+                                        SparseTensorFormat::type sparse_tensor_format_id,
+                                        const size_t ndim) {
   size_t expected_body_buffer_count = 0;
   ARROW_ASSIGN_OR_RAISE(expected_body_buffer_count,
-                        GetSparseTensorBodyBufferCount(sparse_tensor_format_id));
+                        GetSparseTensorBodyBufferCount(sparse_tensor_format_id, ndim));
   if (payload.body_buffers.size() != expected_body_buffer_count) {
     return Status::Invalid("Invalid body buffer count for a sparse tensor");
   }
@@ -1359,10 +1361,12 @@ Status CheckSparseTensorBodyBufferCount(
 
 Result<size_t> ReadSparseTensorBodyBufferCount(const Buffer& metadata) {
   SparseTensorFormat::type format_id;
+  std::vector<int64_t> shape;
 
-  RETURN_NOT_OK(internal::GetSparseTensorMetadata(metadata, nullptr, nullptr, nullptr,
+  RETURN_NOT_OK(internal::GetSparseTensorMetadata(metadata, nullptr, &shape, nullptr,
                                                   nullptr, &format_id));
-  return GetSparseTensorBodyBufferCount(format_id);
+
+  return GetSparseTensorBodyBufferCount(format_id, static_cast<size_t>(shape.size()));
 }
 
 Result<std::shared_ptr<SparseTensor>> ReadSparseTensorPayload(const IpcPayload& payload) {
@@ -1378,7 +1382,8 @@ Result<std::shared_ptr<SparseTensor>> ReadSparseTensorPayload(const IpcPayload&
                                          &non_zero_length, &sparse_tensor_format_id,
                                          &sparse_tensor, &buffer));
 
-  RETURN_NOT_OK(CheckSparseTensorBodyBufferCount(payload, sparse_tensor_format_id));
+  RETURN_NOT_OK(CheckSparseTensorBodyBufferCount(payload, sparse_tensor_format_id,
+                                                 static_cast<size_t>(shape.size())));
 
   switch (sparse_tensor_format_id) {
     case SparseTensorFormat::COO: {
diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc
index a1a3737..60c6534 100644
--- a/cpp/src/arrow/python/deserialize.cc
+++ b/cpp/src/arrow/python/deserialize.cc
@@ -204,6 +204,14 @@ Status GetValue(PyObject* context, const Array& arr, int64_t index, int8_t type,
       *result = wrap_sparse_csc_matrix(sparse_csc_matrix);
       return Status::OK();
     }
+    case PythonType::SPARSECSFTENSOR: {
+      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
+      const std::shared_ptr<SparseCSFTensor>& sparse_csf_tensor =
+          arrow::internal::checked_pointer_cast<SparseCSFTensor>(
+              blobs.sparse_tensors[ref]);
+      *result = wrap_sparse_csf_tensor(sparse_csf_tensor);
+      return Status::OK();
+    }
     case PythonType::NDARRAY: {
       int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
       return DeserializeArray(ref, base, blobs, result);
diff --git a/cpp/src/arrow/python/deserialize.h b/cpp/src/arrow/python/deserialize.h
index 9568eef..41b6a13 100644
--- a/cpp/src/arrow/python/deserialize.h
+++ b/cpp/src/arrow/python/deserialize.h
@@ -42,9 +42,13 @@ struct ARROW_PYTHON_EXPORT SparseTensorCounts {
   int coo;
   int csr;
   int csc;
+  int csf;
+  int ndim_csf;
 
-  int num_total_tensors() const { return coo + csr + csc; }
-  int num_total_buffers() const { return coo * 3 + csr * 4 + csc * 4; }
+  int num_total_tensors() const { return coo + csr + csc + csf; }
+  int num_total_buffers() const {
+    return coo * 3 + csr * 4 + csc * 4 + 2 * ndim_csf + csf;
+  }
 };
 
 /// \brief Read serialized Python sequence from file interface using Arrow IPC
@@ -63,7 +67,7 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out);
 /// \param[in] num_buffers number of buffers in the object
 /// \param[in] data a list containing pyarrow.Buffer instances. It must be 1 +
 /// num_tensors * 2 + num_coo_tensors * 3 + num_csr_tensors * 4 + num_csc_tensors * 4 +
-/// num_buffers in length
+/// num_csf_tensors * (2 * ndim_csf + 3) + num_buffers in length
 /// \param[out] out the reconstructed object
 /// \return Status
 ARROW_PYTHON_EXPORT
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index 1a2897f..e469f6a 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -181,6 +181,16 @@ class SequenceBuilder {
     return sparse_csc_matrix_indices_->Append(sparse_csc_matrix_index);
   }
 
+  // Appending a sparse csf tensor to the sequence
+  //
+  // \param sparse_csf_tensor_index Index of the sparse csf tensor in the object.
+  Status AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index) {
+    RETURN_NOT_OK(CreateAndUpdate(&sparse_csf_tensor_indices_,
+                                  PythonType::SPARSECSFTENSOR,
+                                  [this]() { return new Int32Builder(pool_); }));
+    return sparse_csf_tensor_indices_->Append(sparse_csf_tensor_index);
+  }
+
   // Appending a numpy ndarray to the sequence
   //
   // \param tensor_index Index of the tensor in the object.
@@ -277,6 +287,7 @@ class SequenceBuilder {
   std::shared_ptr<Int32Builder> sparse_coo_tensor_indices_;
   std::shared_ptr<Int32Builder> sparse_csr_matrix_indices_;
   std::shared_ptr<Int32Builder> sparse_csc_matrix_indices_;
+  std::shared_ptr<Int32Builder> sparse_csf_tensor_indices_;
   std::shared_ptr<Int32Builder> ndarray_indices_;
   std::shared_ptr<Int32Builder> buffer_indices_;
 
@@ -515,6 +526,11 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
         static_cast<int32_t>(blobs_out->sparse_tensors.size())));
     ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csc_matrix(elem));
     blobs_out->sparse_tensors.push_back(matrix);
+  } else if (is_sparse_csf_tensor(elem)) {
+    RETURN_NOT_OK(builder->AppendSparseCSFTensor(
+        static_cast<int32_t>(blobs_out->sparse_tensors.size())));
+    ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_csf_tensor(elem));
+    blobs_out->sparse_tensors.push_back(tensor);
   } else {
     // Attempt to serialize the object using the custom callback.
     PyObject* serialized_object;
@@ -659,6 +675,7 @@ Status CountSparseTensors(
   size_t num_csr = 0;
   size_t num_csc = 0;
   size_t num_csf = 0;
+  size_t ndim_csf = 0;
 
   for (const auto& sparse_tensor : sparse_tensors) {
     switch (sparse_tensor->format_id()) {
@@ -673,6 +690,7 @@ Status CountSparseTensors(
         break;
       case SparseTensorFormat::CSF:
         ++num_csf;
+        ndim_csf += sparse_tensor->ndim();
         break;
     }
   }
@@ -681,6 +699,7 @@ Status CountSparseTensors(
   PyDict_SetItemString(num_sparse_tensors.obj(), "csr", PyLong_FromSize_t(num_csr));
   PyDict_SetItemString(num_sparse_tensors.obj(), "csc", PyLong_FromSize_t(num_csc));
   PyDict_SetItemString(num_sparse_tensors.obj(), "csf", PyLong_FromSize_t(num_csf));
+  PyDict_SetItemString(num_sparse_tensors.obj(), "ndim_csf", PyLong_FromSize_t(ndim_csf));
   RETURN_IF_PYERROR();
 
   *out = num_sparse_tensors.detach();
@@ -704,6 +723,7 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
                        PyLong_FromSize_t(this->tensors.size()));
   RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors, &num_sparse_tensors));
   PyDict_SetItemString(result.obj(), "num_sparse_tensors", num_sparse_tensors);
+  PyDict_SetItemString(result.obj(), "ndim_csf", num_sparse_tensors);
   PyDict_SetItemString(result.obj(), "num_ndarrays",
                        PyLong_FromSize_t(this->ndarrays.size()));
   PyDict_SetItemString(result.obj(), "num_buffers",
diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h
index 8ef5516..191e279 100644
--- a/cpp/src/arrow/python/serialize.h
+++ b/cpp/src/arrow/python/serialize.h
@@ -134,6 +134,7 @@ struct PythonType {
     SPARSECOOTENSOR,
     SPARSECSRMATRIX,
     SPARSECSCMATRIX,
+    SPARSECSFTENSOR,
     NUM_PYTHON_TYPES
   };
 };
diff --git a/cpp/src/generated/SparseTensor_generated.h b/cpp/src/generated/SparseTensor_generated.h
index 023d6cc..9c00326 100644
--- a/cpp/src/generated/SparseTensor_generated.h
+++ b/cpp/src/generated/SparseTensor_generated.h
@@ -20,6 +20,7 @@ struct SparseMatrixIndexCSX;
 struct SparseMatrixIndexCSXBuilder;
 
 struct SparseTensorIndexCSF;
+struct SparseTensorIndexCSFBuilder;
 
 struct SparseTensor;
 struct SparseTensorBuilder;
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 8a5b0b7..7545d0b 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -1721,6 +1721,7 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil:
         int csr
         int csc
         int csf
+        int ndim_csf
         int num_total_tensors() const
         int num_total_buffers() const
 
diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi
index 757bd15..3a9901c 100644
--- a/python/pyarrow/serialization.pxi
+++ b/python/pyarrow/serialization.pxi
@@ -308,6 +308,9 @@ cdef class SerializedPyObject:
         num_sparse_tensors.coo = components['num_sparse_tensors']['coo']
         num_sparse_tensors.csr = components['num_sparse_tensors']['csr']
         num_sparse_tensors.csc = components['num_sparse_tensors']['csc']
+        num_sparse_tensors.csf = components['num_sparse_tensors']['csf']
+        num_sparse_tensors.ndim_csf = \
+            components['num_sparse_tensors']['ndim_csf']
 
         with nogil:
             check_status(GetSerializedFromComponents(num_tensors,
diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py
index e54cd88..ede2b33 100644
--- a/python/pyarrow/tests/test_serialization.py
+++ b/python/pyarrow/tests/test_serialization.py
@@ -125,6 +125,9 @@ def assert_equal(obj1, obj2):
     elif isinstance(obj1, pa.SparseCSCMatrix) and \
             isinstance(obj2, pa.SparseCSCMatrix):
         assert obj1.equals(obj2)
+    elif isinstance(obj1, pa.SparseCSFTensor) and \
+            isinstance(obj2, pa.SparseCSFTensor):
+        assert obj1.equals(obj2)
     elif isinstance(obj1, pa.RecordBatch) and isinstance(obj2, pa.RecordBatch):
         assert obj1.equals(obj2)
     elif isinstance(obj1, pa.Table) and isinstance(obj2, pa.Table):
@@ -158,7 +161,6 @@ index_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8')
 tensor_types = ('i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8',
                 'f2', 'f4', 'f8')
 
-
 PRIMITIVE_OBJECTS += [0, np.array([["hi", "hi"], [1.3, 1]])]
 
 
@@ -727,6 +729,77 @@ def test_scipy_sparse_csc_matrix_serialization():
     assert np.array_equal(sparse_array.toarray(), result.toarray())
 
 
+@pytest.mark.parametrize('tensor_type', tensor_types)
+@pytest.mark.parametrize('index_type', index_types)
+def test_sparse_csf_tensor_serialization(index_type, tensor_type):
+    tensor_dtype = np.dtype(tensor_type)
+    index_dtype = np.dtype(index_type)
+    data = np.array([[1, 2, 3, 4, 5, 6, 7, 8]]).T.astype(tensor_dtype)
+    indptr = [
+        np.array([0, 2, 3]),
+        np.array([0, 1, 3, 4]),
+        np.array([0, 2, 4, 5, 8]),
+    ]
+    indices = [
+        np.array([0, 1]),
+        np.array([0, 1, 1]),
+        np.array([0, 0, 1, 1]),
+        np.array([1, 2, 0, 2, 0, 0, 1, 2]),
+    ]
+    indptr = [x.astype(index_dtype) for x in indptr]
+    indices = [x.astype(index_dtype) for x in indices]
+    shape = (2, 3, 4, 5)
+    axis_order = (0, 1, 2, 3)
+    dim_names = ("a", "b", "c", "d")
+
+    for ndim in [2, 3, 4]:
+        sparse_tensor = pa.SparseCSFTensor.from_numpy(data, indptr[:ndim - 1],
+                                                      indices[:ndim],
+                                                      shape[:ndim],
+                                                      axis_order[:ndim],
+                                                      dim_names[:ndim])
+
+        context = pa.default_serialization_context()
+        serialized = pa.serialize(sparse_tensor, context=context).to_buffer()
+        result = pa.deserialize(serialized)
+        assert_equal(result, sparse_tensor)
+        assert isinstance(result, pa.SparseCSFTensor)
+
+
+@pytest.mark.parametrize('tensor_type', tensor_types)
+@pytest.mark.parametrize('index_type', index_types)
+def test_sparse_csf_tensor_components_serialization(large_buffer,
+                                                    index_type, tensor_type):
+    tensor_dtype = np.dtype(tensor_type)
+    index_dtype = np.dtype(index_type)
+    data = np.array([[1, 2, 3, 4, 5, 6, 7, 8]]).T.astype(tensor_dtype)
+    indptr = [
+        np.array([0, 2, 3]),
+        np.array([0, 1, 3, 4]),
+        np.array([0, 2, 4, 5, 8]),
+    ]
+    indices = [
+        np.array([0, 1]),
+        np.array([0, 1, 1]),
+        np.array([0, 0, 1, 1]),
+        np.array([1, 2, 0, 2, 0, 0, 1, 2]),
+    ]
+    indptr = [x.astype(index_dtype) for x in indptr]
+    indices = [x.astype(index_dtype) for x in indices]
+    shape = (2, 3, 4, 5)
+    axis_order = (0, 1, 2, 3)
+    dim_names = ("a", "b", "c", "d")
+
+    for ndim in [2, 3, 4]:
+        sparse_tensor = pa.SparseCSFTensor.from_numpy(data, indptr[:ndim - 1],
+                                                      indices[:ndim],
+                                                      shape[:ndim],
+                                                      axis_order[:ndim],
+                                                      dim_names[:ndim])
+
+        serialization_roundtrip(sparse_tensor, large_buffer)
+
+
 @pytest.mark.filterwarnings(
     "ignore:the matrix subclass:PendingDeprecationWarning")
 def test_numpy_matrix_serialization(tmpdir):
@@ -929,7 +1002,9 @@ def test_serialize_to_components_invalid_cases():
 
     components = {
         'num_tensors': 0,
-        'num_sparse_tensors': {'coo': 0, 'csr': 0, 'csc': 0},
+        'num_sparse_tensors': {
+            'coo': 0, 'csr': 0, 'csc': 0, 'csf': 0, 'ndim_csf': 0
+        },
         'num_ndarrays': 0,
         'num_buffers': 1,
         'data': [buf]
@@ -940,7 +1015,9 @@ def test_serialize_to_components_invalid_cases():
 
     components = {
         'num_tensors': 0,
-        'num_sparse_tensors': {'coo': 0, 'csr': 0, 'csc': 0},
+        'num_sparse_tensors': {
+            'coo': 0, 'csr': 0, 'csc': 0, 'csf': 0, 'ndim_csf': 0
+        },
         'num_ndarrays': 1,
         'num_buffers': 0,
         'data': [buf, buf]