You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/21 12:28:39 UTC

[GitHub] [arrow] paleolimbot commented on a change in pull request #12467: ARROW-15471: [R] ExtensionType support in R

paleolimbot commented on a change in pull request #12467:
URL: https://github.com/apache/arrow/pull/12467#discussion_r829146168



##########
File path: r/R/arrow-tabular.R
##########
@@ -98,6 +98,40 @@ ArrowTabular <- R6Class("ArrowTabular",
   )
 )
 
+tabular_as_data_frame_common <- function(x, base) {

Review comment:
       I'm worried about this implementation because it's unintuitive...this gets used by `Table$to_data_frame()` and `RecordBatch$to_data_frame()` because both of those call into C++ to do their thing (but the C++ implementation doesn't know about extension types. Maybe it should?). Pretty much everwhere else we avoid looping over columns in R but that might be better than added complexity at the C++ level?

##########
File path: r/src/extension.cpp
##########
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <thread>
+
+#include <arrow/array.h>
+#include <arrow/extension_type.h>
+#include <arrow/type.h>
+
+// A wrapper around arrow::ExtensionType that allows R to register extension
+// types whose Deserialize, ExtensionEquals, and Serialize methods are
+// in meanintfully handled at the R level. At the C++ level, the type is
+// already serialized to minimize calls to R from C++.
+//
+// Using a std::shared_ptr<> to wrap a cpp11::sexp type is unusual, but we
+// need it here to avoid calling the copy constructor from another thread,
+// since this might call into the R API. If we don't do this, we get crashes
+// when reading a multi-file Dataset.

Review comment:
       It's true that I need to use `std::shared_ptr<cpp11::environment>` to store a the `r6_class_` field here instead of `cpp11::environment` to avoid a crash, but I'm not entirely sure I'm using `std::shared_ptr` correctly.

##########
File path: r/src/extension.cpp
##########
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "./arrow_types.h"
+
+#if defined(ARROW_R_WITH_ARROW)
+
+#include <thread>
+
+#include <arrow/array.h>
+#include <arrow/extension_type.h>
+#include <arrow/type.h>
+
+// A wrapper around arrow::ExtensionType that allows R to register extension
+// types whose Deserialize, ExtensionEquals, and Serialize methods are
+// in meanintfully handled at the R level. At the C++ level, the type is
+// already serialized to minimize calls to R from C++.
+//
+// Using a std::shared_ptr<> to wrap a cpp11::sexp type is unusual, but we
+// need it here to avoid calling the copy constructor from another thread,
+// since this might call into the R API. If we don't do this, we get crashes
+// when reading a multi-file Dataset.
+class RExtensionType : public arrow::ExtensionType {
+ public:
+  RExtensionType(const std::shared_ptr<arrow::DataType> storage_type,
+                 std::string extension_name, std::string extension_metadata,
+                 std::shared_ptr<cpp11::environment> r6_class,
+                 std::thread::id creation_thread)
+      : arrow::ExtensionType(storage_type),
+        extension_name_(extension_name),
+        extension_metadata_(extension_metadata),
+        r6_class_(r6_class),
+        creation_thread_(creation_thread) {}
+
+  std::string extension_name() const { return extension_name_; }
+
+  bool ExtensionEquals(const arrow::ExtensionType& other) const;
+
+  std::shared_ptr<arrow::Array> MakeArray(std::shared_ptr<arrow::ArrayData> data) const;
+
+  arrow::Result<std::shared_ptr<arrow::DataType>> Deserialize(
+      std::shared_ptr<arrow::DataType> storage_type,
+      const std::string& serialized_data) const;
+
+  std::string Serialize() const { return extension_metadata_; }
+
+  std::string ToString() const;
+
+  std::unique_ptr<RExtensionType> Clone() const;
+
+  cpp11::environment r6_class() const { return *r6_class_; }
+
+  cpp11::environment r6_instance(std::shared_ptr<arrow::DataType> storage_type,
+                                 const std::string& serialized_data) const;
+
+  cpp11::environment r6_instance() const {
+    return r6_instance(storage_type(), Serialize());
+  }
+
+ private:
+  std::string extension_name_;
+  std::string extension_metadata_;
+  std::string cached_to_string_;
+  std::shared_ptr<cpp11::environment> r6_class_;
+  std::thread::id creation_thread_;
+
+  arrow::Status assert_r_thread() const {
+    if (std::this_thread::get_id() == creation_thread_) {
+      return arrow::Status::OK();
+    } else {
+      return arrow::Status::ExecutionError("RExtensionType <", extension_name_,
+                                           "> attempted to call into R ",
+                                           "from a non-R thread");
+    }
+  }
+};
+
+bool RExtensionType::ExtensionEquals(const arrow::ExtensionType& other) const {
+  // Avoid materializing the R6 instance if at all possible, since this is slow
+  // and in some cases not possible due to threading
+  if (other.extension_name() != extension_name()) {
+    return false;
+  }
+
+  if (other.Serialize() == Serialize()) {
+    return true;
+  }
+
+  // With any ambiguity, we need to materialize the R6 instance and call its
+  // ExtensionEquals method. We can't do this on the non-R thread.
+  arrow::Status is_r_thread = assert_r_thread();
+  if (!assert_r_thread().ok()) {
+    throw std::runtime_error(is_r_thread.message());
+  }
+
+  cpp11::environment instance = r6_instance();
+  cpp11::function instance_ExtensionEquals(instance[".ExtensionEquals"]);
+
+  std::shared_ptr<DataType> other_shared =
+      ValueOrStop(other.Deserialize(other.storage_type(), other.Serialize()));
+  cpp11::sexp other_r6 = cpp11::to_r6<DataType>(other_shared, "ExtensionType");
+
+  cpp11::logicals result(instance_ExtensionEquals(other_r6));
+  return cpp11::as_cpp<bool>(result);
+}
+
+std::shared_ptr<arrow::Array> RExtensionType::MakeArray(
+    std::shared_ptr<arrow::ArrayData> data) const {
+  std::shared_ptr<arrow::ArrayData> new_data = data->Copy();
+  std::unique_ptr<RExtensionType> cloned = Clone();
+  new_data->type = std::shared_ptr<RExtensionType>(cloned.release());
+  return std::make_shared<arrow::ExtensionArray>(new_data);
+}
+
+arrow::Result<std::shared_ptr<arrow::DataType>> RExtensionType::Deserialize(
+    std::shared_ptr<arrow::DataType> storage_type,
+    const std::string& serialized_data) const {
+  std::unique_ptr<RExtensionType> cloned = Clone();
+  cloned->storage_type_ = storage_type;
+  cloned->extension_metadata_ = serialized_data;
+
+  // We probably should create an ephemeral R6 instance here, which will call
+  // the R6 instance's .Deserialize() method, possibly erroring when the metadata is
+  // invalid or the deserialized values are invalid. When there is an error it will be
+  // confusing, since it will only occur when the result surfaces to R
+  // (which might be much later). Unfortunately, the Deserialize() method gets
+  // called from other threads frequently (e.g., when reading a multi-file Dataset),
+  // and we get crashes if we try this. As a compromise, we call this method when we can
+  // to maximize the likelihood an error is surfaced.
+  if (assert_r_thread().ok()) {
+    cloned->r6_instance();
+  }

Review comment:
       This is the main threading concern...the `Deserialize()` method gets called from other threads frequently but unless it's been passed through an R6 instance in R, we don't know if the metadata is valid or not.

##########
File path: r/R/extension.R
##########
@@ -0,0 +1,437 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#' @include arrow-package.R
+#' @title class arrow::ExtensionArray
+#'
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#'
+#' @section Methods:
+#'
+#' The `ExtensionArray` class inherits from `Array`, but also provides
+#' access to the underlying storage of the extension.
+#'
+#' - `$storage()`: Returns the underlying [Array] used to store
+#'   values.
+#'
+#' The `ExtensionArray` is not intended to be subclassed for extension
+#' types.
+#'
+#' @rdname ExtensionArray
+#' @name ExtensionArray
+ExtensionArray <- R6Class("ExtensionArray",
+  inherit = Array,
+  public = list(
+    storage = function() {
+      ExtensionArray__storage(self)
+    },
+
+    as_vector = function() {
+      self$type$.array_as_vector(self)
+    }
+  )
+)
+
+ExtensionArray$create <- function(x, type) {
+  assert_is(type, "ExtensionType")
+  if (inheritx(x, "ExtensionArray") && type$Equals(x$type)) {
+    return(x)
+  }
+
+  storage <- Array$create(x, type = type$storage_type())
+  type$WrapArray(storage)
+}
+
+#' @include arrow-package.R
+#' @title class arrow::ExtensionType
+#'
+#' @usage NULL
+#' @format NULL
+#' @docType class
+#'
+#' @section Methods:
+#'
+#' The `ExtensionType` class inherits from `DataType`, but also defines
+#' extra methods specific to extension types:
+#'
+#' - `$storage_type()`: Returns the underlying [DataType] used to store
+#'   values.
+#' - `$storage_id()`: Returns the [Type] identifier corresponding to the
+#'   `$storage_type()`.
+#' - `$extension_name()`: Returns the extension name.
+#' - `$Serialize()`: Returns the serialized version of the extension metadata
+#'   as a [raw()] vector.
+#' - `$WrapArray(array)`: Wraps a storage [Array] into an [ExtensionArray]
+#'   with this extension type.
+#'
+#' In addition, subclasses may override the following methos to customize
+#' the behaviour of extension classes.
+#'
+#' - `$.Deserialize(storage_type, extension_name, extension_metadata)`
+#'   This method is called when a new [ExtensionType]
+#'   is initialized and is responsible for parsing and validating
+#'   the serialized `extension_metadata` (a [raw()] vector)
+#'   such that its contents can be inspected by fields and/or methods
+#'   of the R6 ExtensionType subclass. Implementations must also check the
+#'   `storage_type` to make sure it is compatible with the extension type.
+#' - `$.array_as_vector(extension_array)`: Convert an [Array] to an R
+#'   vector. This method is called by [as.vector()] on [ExtensionArray]
+#'   objects or when a [RecordBatch] containing an [ExtensionArray] is
+#'   converted to a [data.frame()]. The default method returns the converted
+#'   storage array.
+#' - `$.chunked_array_as_vector(chunked_array)`: Convert a [ChunkedArray]
+#'   to an R vector. This method is called by [as.vector()] on a [ChunkedArray]
+#'   whose type matches this extension type or when a [Table] containing
+#'   such a column is converted to a [data.frame()]. The default method
+#'   returns the converted version of the equivalent storage arrays
+#'   as a [ChunkedArray].
+#' - `$.ToString()` Return a string representation that will be printed
+#'   to the console when this type or an Array of this type is printed.
+#'
+#' @rdname ExtensionType
+#' @name ExtensionType
+#' @export
+ExtensionType <- R6Class("ExtensionType",
+  inherit = DataType,
+  public = list(
+
+    # In addition to the initialization that occurs for all
+    # ArrowObject instances, we call .Deserialize(), which can
+    # be overridden to populate custom fields
+    initialize = function(xp) {
+      super$initialize(xp)
+      self$.Deserialize(
+        self$storage_type(),
+        self$extension_name(),
+        self$Serialize()
+      )
+    },
+
+    # Because of how C++ shared_ptr<> objects are converted to R objects,
+    # the initial object that is instantiated will be of this class
+    # (ExtensionType), but the R6Class object that was registered is
+    # available from C++. We need this in order to produce the correct
+    # R6 subclass when a shared_ptr<ExtensionType> is returned to R.
+    r6_class = function() {
+      ExtensionType__r6_class(self)
+    },
+
+    storage_type = function() {
+      ExtensionType__storage_type(self)
+    },
+
+    storage_id = function() {
+      self$storage_type()$id
+    },
+
+    extension_name = function() {
+      ExtensionType__extension_name(self)
+    },
+
+    Serialize = function() {
+      ExtensionType__Serialize(self)
+    },
+
+    MakeArray = function(data) {
+      assert_is(data, "ArrayData")
+      ExtensionType__MakeArray(self, data)
+    },
+
+    WrapArray = function(array) {
+      assert_is(array, "Array")
+      self$MakeArray(array$data())
+    },
+

Review comment:
       I made up the "dot prefix means protected method" thing here...I don't know if there is a convention for "protected"-style methods in R6 but would be happy to use it if it exists.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org