You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/12/01 08:27:03 UTC
[arrow] branch master updated: ARROW-3920: [plasma] Fix reference
counting in custom tensorflow plasma operator.
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a667fca ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator.
a667fca is described below
commit a667fca3b71772886bb2595986266d2039823dcc
Author: Robert Nishihara <ro...@gmail.com>
AuthorDate: Sat Dec 1 00:26:51 2018 -0800
ARROW-3920: [plasma] Fix reference counting in custom tensorflow plasma operator.
There is an issue here where `Release` was never being called in the plasma TF operator.
Note that I also changed the release delay in the plasma operator to 0.
Author: Robert Nishihara <ro...@gmail.com>
Author: Philipp Moritz <pc...@gmail.com>
Closes #3061 from robertnishihara/extrareleaseinplasmaop and squashes the following commits:
c10956692 <Philipp Moritz> add include guards
f89d5df8c <Philipp Moritz> lint
4836342e0 <Philipp Moritz> unregister memory
e3b3864ef <Robert Nishihara> Linting
b948ce0f4 <Robert Nishihara> Add test.
75f2bd99c <Robert Nishihara> Remove logging statement.
f04a7d26d <Robert Nishihara> Fix
574c03532 <Robert Nishihara> Fix ndarray/tensor confusion in plasma op.
06985cd1c <Robert Nishihara> Have plasma op deserialize as numpy array.
a2a9c36b3 <Robert Nishihara> Add release call into wrapped_callback.
0db9154bd <Robert Nishihara> Change release delay to 0.
f4340946c <Robert Nishihara> Add Release call in plasma op.
---
cpp/src/arrow/python/deserialize.cc | 17 +++++------
cpp/src/arrow/python/deserialize.h | 6 ++--
cpp/src/arrow/python/serialize.cc | 14 ++++-----
cpp/src/arrow/python/serialize.h | 6 ++--
python/pyarrow/tensorflow/plasma_op.cc | 48 ++++++++++++++++++++-----------
python/pyarrow/tests/test_plasma_tf_op.py | 5 +++-
6 files changed, 58 insertions(+), 38 deletions(-)
diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc
index 452d8dd..f1a7eab 100644
--- a/cpp/src/arrow/python/deserialize.cc
+++ b/cpp/src/arrow/python/deserialize.cc
@@ -361,7 +361,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
ipc::Message message(metadata, body);
- RETURN_NOT_OK(ReadTensor(message, &tensor));
+ RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
out->tensors.emplace_back(std::move(tensor));
}
@@ -375,7 +375,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
ipc::Message message(metadata, body);
- RETURN_NOT_OK(ReadTensor(message, &tensor));
+ RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
out->ndarrays.emplace_back(std::move(tensor));
}
@@ -389,19 +389,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
return Status::OK();
}
-Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
- if (object.tensors.size() != 1) {
- return Status::Invalid("Object is not a Tensor");
+Status DeserializeNdarray(const SerializedPyObject& object,
+ std::shared_ptr<Tensor>* out) {
+ if (object.ndarrays.size() != 1) {
+ return Status::Invalid("Object is not an Ndarray");
}
- *out = object.tensors[0];
+ *out = object.ndarrays[0];
return Status::OK();
}
-Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
+Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
io::BufferReader reader(src);
SerializedPyObject object;
RETURN_NOT_OK(ReadSerializedObject(&reader, &object));
- return DeserializeTensor(object, out);
+ return DeserializeNdarray(object, out);
}
} // namespace py
diff --git a/cpp/src/arrow/python/deserialize.h b/cpp/src/arrow/python/deserialize.h
index a0286b1..754765a 100644
--- a/cpp/src/arrow/python/deserialize.h
+++ b/cpp/src/arrow/python/deserialize.h
@@ -76,15 +76,15 @@ ARROW_EXPORT
Status DeserializeObject(PyObject* context, const SerializedPyObject& object,
PyObject* base, PyObject** out);
-/// \brief Reconstruct Tensor from Arrow-serialized representation
+/// \brief Reconstruct Ndarray from Arrow-serialized representation
/// \param[in] object Object to deserialize
/// \param[out] out The deserialized tensor
/// \return Status
ARROW_EXPORT
-Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
+Status DeserializeNdarray(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
ARROW_EXPORT
-Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
+Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
} // namespace py
} // namespace arrow
diff --git a/cpp/src/arrow/python/serialize.cc b/cpp/src/arrow/python/serialize.cc
index 2655280..7911557 100644
--- a/cpp/src/arrow/python/serialize.cc
+++ b/cpp/src/arrow/python/serialize.cc
@@ -752,23 +752,23 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject
return Status::OK();
}
-Status SerializeTensor(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
+Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
std::shared_ptr<Array> array;
SequenceBuilder builder;
- RETURN_NOT_OK(builder.AppendTensor(static_cast<int32_t>(out->tensors.size())));
- out->tensors.push_back(tensor);
+ RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
+ out->ndarrays.push_back(tensor);
RETURN_NOT_OK(builder.Finish(nullptr, nullptr, nullptr, nullptr, &array));
out->batch = MakeBatch(array);
return Status::OK();
}
-Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
- const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
- io::OutputStream* dst) {
+Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
+ const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
+ io::OutputStream* dst) {
auto empty_tensor = std::make_shared<Tensor>(
dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
SerializedPyObject serialized_tensor;
- RETURN_NOT_OK(SerializeTensor(empty_tensor, &serialized_tensor));
+ RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
return serialized_tensor.WriteTo(dst);
}
diff --git a/cpp/src/arrow/python/serialize.h b/cpp/src/arrow/python/serialize.h
index f11cb5a..2759d0c 100644
--- a/cpp/src/arrow/python/serialize.h
+++ b/cpp/src/arrow/python/serialize.h
@@ -103,9 +103,9 @@ Status SerializeTensor(std::shared_ptr<Tensor> tensor, py::SerializedPyObject* o
/// \param[in] dst The OutputStream to write the Tensor header to
/// \return Status
ARROW_EXPORT
-Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
- const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
- io::OutputStream* dst);
+Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
+ const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
+ io::OutputStream* dst);
} // namespace py
diff --git a/python/pyarrow/tensorflow/plasma_op.cc b/python/pyarrow/tensorflow/plasma_op.cc
index 15ae0dc..a341d5a 100644
--- a/python/pyarrow/tensorflow/plasma_op.cc
+++ b/python/pyarrow/tensorflow/plasma_op.cc
@@ -77,8 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
- plasma_manager_socket_name_,
- plasma::kPlasmaDefaultReleaseDelay));
+ plasma_manager_socket_name_, 0));
VLOG(1) << "Connected!";
connected_ = true;
}
@@ -141,7 +140,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
std::vector<int64_t> shape = {total_bytes / byte_width};
arrow::io::MockOutputStream mock;
- ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, 0, &mock));
+ ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, 0, &mock));
int64_t header_size = mock.GetExtentBytesWritten();
std::shared_ptr<Buffer> data_buffer;
@@ -153,15 +152,21 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
int64_t offset;
arrow::io::FixedSizeBufferWriter buf(data_buffer);
- ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, total_bytes, &buf));
+ ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, total_bytes, &buf));
ARROW_CHECK_OK(buf.Tell(&offset));
uint8_t* data = reinterpret_cast<uint8_t*>(data_buffer->mutable_data() + offset);
- auto wrapped_callback = [this, context, done, data_buffer, object_id]() {
+ auto wrapped_callback = [this, context, done, data_buffer, data, object_id]() {
{
tf::mutex_lock lock(mu_);
ARROW_CHECK_OK(client_.Seal(object_id));
+ ARROW_CHECK_OK(client_.Release(object_id));
+#ifdef GOOGLE_CUDA
+ auto orig_stream = context->op_device_context()->stream();
+ auto stream_executor = orig_stream->parent();
+ CHECK(stream_executor->HostMemoryUnregister(static_cast<void*>(data)));
+#endif
}
context->SetStatus(tensorflow::Status::OK());
done();
@@ -244,8 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
- plasma_manager_socket_name_,
- plasma::kPlasmaDefaultReleaseDelay));
+ plasma_manager_socket_name_, 0));
VLOG(1) << "Connected!";
connected_ = true;
}
@@ -284,25 +288,39 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
/*timeout_ms=*/-1, &object_buffer));
}
- std::shared_ptr<arrow::Tensor> tensor;
- ARROW_CHECK_OK(arrow::py::ReadTensor(object_buffer.data, &tensor));
+ std::shared_ptr<arrow::Tensor> ndarray;
+ ARROW_CHECK_OK(arrow::py::NdarrayFromBuffer(object_buffer.data, &ndarray));
- int64_t byte_width = get_byte_width(*tensor->type());
- const int64_t size_in_bytes = tensor->data()->size();
+ int64_t byte_width = get_byte_width(*ndarray->type());
+ const int64_t size_in_bytes = ndarray->data()->size();
tf::TensorShape shape({static_cast<int64_t>(size_in_bytes / byte_width)});
- const float* plasma_data = reinterpret_cast<const float*>(tensor->raw_data());
+ const float* plasma_data = reinterpret_cast<const float*>(ndarray->raw_data());
tf::Tensor* output_tensor = nullptr;
OP_REQUIRES_OK_ASYNC(context, context->allocate_output(0, shape, &output_tensor),
done);
+ auto wrapped_callback = [this, context, done, plasma_data, object_id]() {
+ {
+ tf::mutex_lock lock(mu_);
+ ARROW_CHECK_OK(client_.Release(object_id));
+#ifdef GOOGLE_CUDA
+ auto orig_stream = context->op_device_context()->stream();
+ auto stream_executor = orig_stream->parent();
+ CHECK(stream_executor->HostMemoryUnregister(
+ const_cast<void*>(static_cast<const void*>(plasma_data))));
+#endif
+ }
+ done();
+ };
+
if (std::is_same<Device, CPUDevice>::value) {
std::memcpy(
reinterpret_cast<void*>(const_cast<char*>(output_tensor->tensor_data().data())),
plasma_data, size_in_bytes);
- done();
+ wrapped_callback();
} else {
#ifdef GOOGLE_CUDA
auto orig_stream = context->op_device_context()->stream();
@@ -319,8 +337,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
}
// Important. See note in T2P op.
- // We don't check the return status since the host memory might've been
- // already registered (e.g., the TensorToPlasmaOp might've been run).
CHECK(stream_executor->HostMemoryRegister(
const_cast<void*>(static_cast<const void*>(plasma_data)),
static_cast<tf::uint64>(size_in_bytes)));
@@ -341,7 +357,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
CHECK(orig_stream->ThenWaitFor(h2d_stream).ok());
context->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute(
- h2d_stream, std::move(done));
+ h2d_stream, std::move(wrapped_callback));
#endif
}
}
diff --git a/python/pyarrow/tests/test_plasma_tf_op.py b/python/pyarrow/tests/test_plasma_tf_op.py
index b7e1afa..d9bf915 100644
--- a/python/pyarrow/tests/test_plasma_tf_op.py
+++ b/python/pyarrow/tests/test_plasma_tf_op.py
@@ -70,7 +70,6 @@ def run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
# Try getting the data from Python
plasma_object_id = plasma.ObjectID(object_id)
obj = client.get(plasma_object_id)
- obj = obj.to_numpy()
# Deserialized Tensor should be 64-byte aligned.
assert obj.ctypes.data % 64 == 0
@@ -100,3 +99,7 @@ def test_plasma_tf_op(use_gpu=False):
np.int8, np.int16, np.int32, np.int64]:
run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
client, use_gpu, dtype)
+
+ # Make sure the objects have been released.
+ for _, info in client.list().items():
+ assert info['ref_count'] == 0