You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/12/01 08:27:03 UTC

[arrow] branch master updated: ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator.

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a667fca  ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator.
a667fca is described below

commit a667fca3b71772886bb2595986266d2039823dcc
Author: Robert Nishihara <ro...@gmail.com>
AuthorDate: Sat Dec 1 00:26:51 2018 -0800

    ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator.
    
    There is an issue here where `Release` was never being called in the plasma TF operator.
    
    Note that I also changed the release delay in the plasma operator to 0.
    
    Author: Robert Nishihara <ro...@gmail.com>
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #3061 from robertnishihara/extrareleaseinplasmaop and squashes the following commits:
    
    c10956692 <Philipp Moritz> add include guards
    f89d5df8c <Philipp Moritz> lint
    4836342e0 <Philipp Moritz> unregister memory
    e3b3864ef <Robert Nishihara> Linting
    b948ce0f4 <Robert Nishihara> Add test.
    75f2bd99c <Robert Nishihara> Remove logging statement.
    f04a7d26d <Robert Nishihara> Fix
    574c03532 <Robert Nishihara> Fix ndarray/tensor confusion in plasma op.
    06985cd1c <Robert Nishihara> Have plasma op deserialize as numpy array.
    a2a9c36b3 <Robert Nishihara> Add release call into wrapped_callback.
    0db9154bd <Robert Nishihara> Change release delay to 0.
    f4340946c <Robert Nishihara> Add Release call in plasma op.
---
 cpp/src/arrow/python/deserialize.cc       | 17 +++++------
 cpp/src/arrow/python/deserialize.h        |  6 ++--
 cpp/src/arrow/python/serialize.cc         | 14 ++++-----
 cpp/src/arrow/python/serialize.h          |  6 ++--
 python/pyarrow/tensorflow/plasma_op.cc    | 48 ++++++++++++++++++++-----------
 python/pyarrow/tests/test_plasma_tf_op.py |  5 +++-
 6 files changed, 58 insertions(+), 38 deletions(-)

diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc
index 452d8dd..f1a7eab 100644
--- a/cpp/src/arrow/python/deserialize.cc
+++ b/cpp/src/arrow/python/deserialize.cc
@@ -361,7 +361,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
 
     ipc::Message message(metadata, body);
 
-    RETURN_NOT_OK(ReadTensor(message, &tensor));
+    RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
     out->tensors.emplace_back(std::move(tensor));
   }
 
@@ -375,7 +375,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
 
     ipc::Message message(metadata, body);
 
-    RETURN_NOT_OK(ReadTensor(message, &tensor));
+    RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
     out->ndarrays.emplace_back(std::move(tensor));
   }
 
@@ -389,19 +389,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
   return Status::OK();
 }
 
-Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
-  if (object.tensors.size() != 1) {
-    return Status::Invalid("Object is not a Tensor");
+Status DeserializeNdarray(const SerializedPyObject& object,
+                          std::shared_ptr<Tensor>* out) {
+  if (object.ndarrays.size() != 1) {
+    return Status::Invalid("Object is not an Ndarray");
   }
-  *out = object.tensors[0];
+  *out = object.ndarrays[0];
   return Status::OK();
 }
 
-Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
+Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
   io::BufferReader reader(src);
   SerializedPyObject object;
   RETURN_NOT_OK(ReadSerializedObject(&reader, &object));
-  return DeserializeTensor(object, out);
+  return DeserializeNdarray(object, out);
 }
 
 }  // namespace py
diff --git a/cpp/src/arrow/python/deserialize.h b/cpp/src/arrow/python/deserialize.h
index a0286b1..754765a 100644
--- a/cpp/src/arrow/python/deserialize.h
+++ b/cpp/src/arrow/python/deserialize.h
@@ -76,15 +76,15 @@ ARROW_EXPORT
 Status DeserializeObject(PyObject* context, const SerializedPyObject& object,
                          PyObject* base, PyObject** out);
 
-/// \brief Reconstruct Tensor from Arrow-serialized representation
+/// \brief Reconstruct Ndarray from Arrow-serialized representation
 /// \param[in] object Object to deserialize
 /// \param[out] out The deserialized tensor
 /// \return Status
 ARROW_EXPORT
-Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
+Status DeserializeNdarray(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
 
 ARROW_EXPORT
-Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
+Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
 
 }  // namespace py
 }  // namespace arrow
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index 2655280..7911557 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -752,23 +752,23 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject
   return Status::OK();
 }
 
-Status SerializeTensor(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
+Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
   std::shared_ptr<Array> array;
   SequenceBuilder builder;
-  RETURN_NOT_OK(builder.AppendTensor(static_cast<int32_t>(out->tensors.size())));
-  out->tensors.push_back(tensor);
+  RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
+  out->ndarrays.push_back(tensor);
   RETURN_NOT_OK(builder.Finish(nullptr, nullptr, nullptr, nullptr, &array));
   out->batch = MakeBatch(array);
   return Status::OK();
 }
 
-Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
-                         const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
-                         io::OutputStream* dst) {
+Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
+                          const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
+                          io::OutputStream* dst) {
   auto empty_tensor = std::make_shared<Tensor>(
       dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
   SerializedPyObject serialized_tensor;
-  RETURN_NOT_OK(SerializeTensor(empty_tensor, &serialized_tensor));
+  RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
   return serialized_tensor.WriteTo(dst);
 }
 
diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h
index f11cb5a..2759d0c 100644
--- a/cpp/src/arrow/python/serialize.h
+++ b/cpp/src/arrow/python/serialize.h
@@ -103,9 +103,9 @@ Status SerializeTensor(std::shared_ptr<Tensor> tensor, py::SerializedPyObject* o
 /// \param[in] dst The OutputStream to write the Tensor header to
 /// \return Status
 ARROW_EXPORT
-Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
-                         const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
-                         io::OutputStream* dst);
+Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
+                          const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
+                          io::OutputStream* dst);
 
 }  // namespace py
 
diff --git a/python/pyarrow/tensorflow/plasma_op.cc b/python/pyarrow/tensorflow/plasma_op.cc
index 15ae0dc..a341d5a 100644
--- a/python/pyarrow/tensorflow/plasma_op.cc
+++ b/python/pyarrow/tensorflow/plasma_op.cc
@@ -77,8 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
     if (!connected_) {
       VLOG(1) << "Connecting to Plasma...";
       ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
-                                     plasma_manager_socket_name_,
-                                     plasma::kPlasmaDefaultReleaseDelay));
+                                     plasma_manager_socket_name_, 0));
       VLOG(1) << "Connected!";
       connected_ = true;
     }
@@ -141,7 +140,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
     std::vector<int64_t> shape = {total_bytes / byte_width};
 
     arrow::io::MockOutputStream mock;
-    ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, 0, &mock));
+    ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, 0, &mock));
     int64_t header_size = mock.GetExtentBytesWritten();
 
     std::shared_ptr<Buffer> data_buffer;
@@ -153,15 +152,21 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
 
     int64_t offset;
     arrow::io::FixedSizeBufferWriter buf(data_buffer);
-    ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, total_bytes, &buf));
+    ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, total_bytes, &buf));
     ARROW_CHECK_OK(buf.Tell(&offset));
 
     uint8_t* data = reinterpret_cast<uint8_t*>(data_buffer->mutable_data() + offset);
 
-    auto wrapped_callback = [this, context, done, data_buffer, object_id]() {
+    auto wrapped_callback = [this, context, done, data_buffer, data, object_id]() {
       {
         tf::mutex_lock lock(mu_);
         ARROW_CHECK_OK(client_.Seal(object_id));
+        ARROW_CHECK_OK(client_.Release(object_id));
+#ifdef GOOGLE_CUDA
+        auto orig_stream = context->op_device_context()->stream();
+        auto stream_executor = orig_stream->parent();
+        CHECK(stream_executor->HostMemoryUnregister(static_cast<void*>(data)));
+#endif
       }
       context->SetStatus(tensorflow::Status::OK());
       done();
@@ -244,8 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
     if (!connected_) {
       VLOG(1) << "Connecting to Plasma...";
       ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
-                                     plasma_manager_socket_name_,
-                                     plasma::kPlasmaDefaultReleaseDelay));
+                                     plasma_manager_socket_name_, 0));
       VLOG(1) << "Connected!";
       connected_ = true;
     }
@@ -284,25 +288,39 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
                                  /*timeout_ms=*/-1, &object_buffer));
     }
 
-    std::shared_ptr<arrow::Tensor> tensor;
-    ARROW_CHECK_OK(arrow::py::ReadTensor(object_buffer.data, &tensor));
+    std::shared_ptr<arrow::Tensor> ndarray;
+    ARROW_CHECK_OK(arrow::py::NdarrayFromBuffer(object_buffer.data, &ndarray));
 
-    int64_t byte_width = get_byte_width(*tensor->type());
-    const int64_t size_in_bytes = tensor->data()->size();
+    int64_t byte_width = get_byte_width(*ndarray->type());
+    const int64_t size_in_bytes = ndarray->data()->size();
 
     tf::TensorShape shape({static_cast<int64_t>(size_in_bytes / byte_width)});
 
-    const float* plasma_data = reinterpret_cast<const float*>(tensor->raw_data());
+    const float* plasma_data = reinterpret_cast<const float*>(ndarray->raw_data());
 
     tf::Tensor* output_tensor = nullptr;
     OP_REQUIRES_OK_ASYNC(context, context->allocate_output(0, shape, &output_tensor),
                          done);
 
+    auto wrapped_callback = [this, context, done, plasma_data, object_id]() {
+      {
+        tf::mutex_lock lock(mu_);
+        ARROW_CHECK_OK(client_.Release(object_id));
+#ifdef GOOGLE_CUDA
+        auto orig_stream = context->op_device_context()->stream();
+        auto stream_executor = orig_stream->parent();
+        CHECK(stream_executor->HostMemoryUnregister(
+            const_cast<void*>(static_cast<const void*>(plasma_data))));
+#endif
+      }
+      done();
+    };
+
     if (std::is_same<Device, CPUDevice>::value) {
       std::memcpy(
           reinterpret_cast<void*>(const_cast<char*>(output_tensor->tensor_data().data())),
           plasma_data, size_in_bytes);
-      done();
+      wrapped_callback();
     } else {
 #ifdef GOOGLE_CUDA
       auto orig_stream = context->op_device_context()->stream();
@@ -319,8 +337,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
       }
 
       // Important.  See note in T2P op.
-      // We don't check the return status since the host memory might've been
-      // already registered (e.g., the TensorToPlasmaOp might've been run).
       CHECK(stream_executor->HostMemoryRegister(
           const_cast<void*>(static_cast<const void*>(plasma_data)),
           static_cast<tf::uint64>(size_in_bytes)));
@@ -341,7 +357,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
       CHECK(orig_stream->ThenWaitFor(h2d_stream).ok());
 
       context->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute(
-          h2d_stream, std::move(done));
+          h2d_stream, std::move(wrapped_callback));
 #endif
     }
   }
diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py
index b7e1afa..d9bf915 100644
--- a/python/pyarrow/tests/test_plasma_tf_op.py
+++ b/python/pyarrow/tests/test_plasma_tf_op.py
@@ -70,7 +70,6 @@ def run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
     # Try getting the data from Python
     plasma_object_id = plasma.ObjectID(object_id)
     obj = client.get(plasma_object_id)
-    obj = obj.to_numpy()
 
     # Deserialized Tensor should be 64-byte aligned.
     assert obj.ctypes.data % 64 == 0
@@ -100,3 +99,7 @@ def test_plasma_tf_op(use_gpu=False):
                       np.int8, np.int16, np.int32, np.int64]:
             run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
                                            client, use_gpu, dtype)
+
+        # Make sure the objects have been released.
+        for _, info in client.list().items():
+            assert info['ref_count'] == 0