You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/07/13 17:02:13 UTC

[arrow] 18/43: ARROW-5775: [C++] Fix thread-unsafe cached data

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch maint-0.14.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit fa3f132e9439c41e72a23ffd334a345ff95c8b0f
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Fri Jul 5 09:08:47 2019 +0200

    ARROW-5775: [C++] Fix thread-unsafe cached data
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #4791 from pitrou/ARROW-5775-boxed-fields and squashes the following commits:
    
    9c2a33319 <Antoine Pitrou> Add "inline"
    6fe0b860d <Antoine Pitrou> ARROW-5775:  Fix thread-unsafe cached data
---
 cpp/CMakeLists.txt                     |  1 +
 cpp/src/arrow/array.cc                 | 24 +++++++-------
 cpp/src/arrow/array.h                  |  3 --
 cpp/src/arrow/python/deserialize.cc    |  2 +-
 cpp/src/arrow/record_batch.cc          | 11 ++++---
 cpp/src/arrow/util/atomic_shared_ptr.h | 57 ++++++++++++++++++++++++++++++++++
 6 files changed, 77 insertions(+), 21 deletions(-)

diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 6d56718..d802016 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -50,6 +50,7 @@ endif()
 message(STATUS "Arrow version: "
                "${ARROW_VERSION_MAJOR}.${ARROW_VERSION_MINOR}.${ARROW_VERSION_PATCH} "
                "(full: '${ARROW_VERSION}')")
+message(STATUS "Arrow SO version: ${ARROW_SO_VERSION} (full: ${ARROW_FULL_SO_VERSION})")
 
 set(ARROW_SOURCE_DIR ${PROJECT_SOURCE_DIR})
 set(ARROW_BINARY_DIR ${PROJECT_BINARY_DIR})
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 0f63aba..bc38559 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -31,6 +31,7 @@
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/type_traits.h"
+#include "arrow/util/atomic_shared_ptr.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/decimal.h"
@@ -530,7 +531,8 @@ const StructType* StructArray::struct_type() const {
 }
 
 std::shared_ptr<Array> StructArray::field(int i) const {
-  if (!boxed_fields_[i]) {
+  std::shared_ptr<Array> result = internal::atomic_load(&boxed_fields_[i]);
+  if (!result) {
     std::shared_ptr<ArrayData> field_data;
     if (data_->offset != 0 || data_->child_data[i]->length != data_->length) {
       field_data = std::make_shared<ArrayData>(
@@ -538,9 +540,10 @@ std::shared_ptr<Array> StructArray::field(int i) const {
     } else {
       field_data = data_->child_data[i];
     }
-    boxed_fields_[i] = MakeArray(field_data);
+    result = MakeArray(field_data);
+    internal::atomic_store(&boxed_fields_[i], result);
   }
-  return boxed_fields_[i];
+  return result;
 }
 
 std::shared_ptr<Array> StructArray::GetFieldByName(const std::string& name) const {
@@ -709,7 +712,8 @@ Status UnionArray::MakeSparse(const Array& type_ids,
 }
 
 std::shared_ptr<Array> UnionArray::child(int i) const {
-  if (!boxed_fields_[i]) {
+  std::shared_ptr<Array> result = internal::atomic_load(&boxed_fields_[i]);
+  if (!result) {
     std::shared_ptr<ArrayData> child_data = data_->child_data[i]->Copy();
     if (mode() == UnionMode::SPARSE) {
       // Sparse union: need to adjust child if union is sliced
@@ -719,16 +723,10 @@ std::shared_ptr<Array> UnionArray::child(int i) const {
         *child_data = child_data->Slice(data_->offset, data_->length);
       }
     }
-    boxed_fields_[i] = MakeArray(child_data);
+    result = MakeArray(child_data);
+    internal::atomic_store(&boxed_fields_[i], result);
   }
-  return boxed_fields_[i];
-}
-
-const Array* UnionArray::UnsafeChild(int i) const {
-  if (!boxed_fields_[i]) {
-    boxed_fields_[i] = MakeArray(data_->child_data[i]);
-  }
-  return boxed_fields_[i].get();
+  return result;
 }
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 2a1ce7a..256bbdc 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -1042,9 +1042,6 @@ class ARROW_EXPORT UnionArray : public Array {
   // For dense unions, the returned array is unchanged.
   std::shared_ptr<Array> child(int pos) const;
 
-  /// Only use this while the UnionArray is in scope
-  const Array* UnsafeChild(int pos) const;
-
  protected:
   void SetData(const std::shared_ptr<ArrayData>& data);
 
diff --git a/cpp/src/arrow/python/deserialize.cc b/cpp/src/arrow/python/deserialize.cc
index 5e6e135..45f7d61 100644
--- a/cpp/src/arrow/python/deserialize.cc
+++ b/cpp/src/arrow/python/deserialize.cc
@@ -235,7 +235,7 @@ Status DeserializeSequence(PyObject* context, const Array& array, int64_t start_
       int64_t offset = value_offsets[i];
       uint8_t type = type_ids[i];
       PyObject* value;
-      RETURN_NOT_OK(GetValue(context, *data.UnsafeChild(type), offset,
+      RETURN_NOT_OK(GetValue(context, *data.child(type), offset,
                              python_types[type_ids[i]], base, blobs, &value));
       RETURN_NOT_OK(set_item(result.obj(), i - start_idx, value));
     }
diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc
index 1f266df..f83a6cd 100644
--- a/cpp/src/arrow/record_batch.cc
+++ b/cpp/src/arrow/record_batch.cc
@@ -18,6 +18,7 @@
 #include "arrow/record_batch.h"
 
 #include <algorithm>
+#include <atomic>
 #include <cstdlib>
 #include <memory>
 #include <string>
@@ -27,6 +28,7 @@
 #include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
+#include "arrow/util/atomic_shared_ptr.h"
 #include "arrow/util/logging.h"
 #include "arrow/util/stl.h"
 
@@ -85,11 +87,12 @@ class SimpleRecordBatch : public RecordBatch {
   }
 
   std::shared_ptr<Array> column(int i) const override {
-    if (!boxed_columns_[i]) {
-      boxed_columns_[i] = MakeArray(columns_[i]);
+    std::shared_ptr<Array> result = internal::atomic_load(&boxed_columns_[i]);
+    if (!result) {
+      result = MakeArray(columns_[i]);
+      internal::atomic_store(&boxed_columns_[i], result);
     }
-    DCHECK(boxed_columns_[i]);
-    return boxed_columns_[i];
+    return result;
   }
 
   std::shared_ptr<ArrayData> column_data(int i) const override { return columns_[i]; }
diff --git a/cpp/src/arrow/util/atomic_shared_ptr.h b/cpp/src/arrow/util/atomic_shared_ptr.h
new file mode 100644
index 0000000..9f3152b
--- /dev/null
+++ b/cpp/src/arrow/util/atomic_shared_ptr.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <utility>
+
+namespace arrow {
+namespace internal {
+
+#if !defined(__clang__) && defined(__GNUC__) && __GNUC__ < 5
+
+// atomic shared_ptr operations only appeared in gcc 5,
+// emulate them with unsafe ops on gcc 4.x.
+
+template <class T>
+inline std::shared_ptr<T> atomic_load(const std::shared_ptr<T>* p) {
+  return *p;
+}
+
+template <class T>
+inline void atomic_store(std::shared_ptr<T>* p, std::shared_ptr<T> r) {
+  *p = r;
+}
+
+#else
+
+template <class T>
+inline std::shared_ptr<T> atomic_load(const std::shared_ptr<T>* p) {
+  return std::atomic_load(p);
+}
+
+template <class T>
+inline void atomic_store(std::shared_ptr<T>* p, std::shared_ptr<T> r) {
+  std::atomic_store(p, std::move(r));
+}
+
+#endif
+
+}  // namespace internal
+}  // namespace arrow