You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/13 12:02:09 UTC

[GitHub] [arrow] rtpsw opened a new pull request, #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

rtpsw opened a new pull request, #13375:
URL: https://github.com/apache/arrow/pull/13375

   See https://issues.apache.org/jira/browse/ARROW-16823


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901093244


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", options}});

Review Comment:
   In a recent PR, I added `names` to `ConsumingSinkNodeOptions` for a similar reason. Given your comment here, I suspect it may be useful to refactor this names-feature (perhaps into its own options instance to be included in more complex options instances). If you agree, then the JIRA should be for this feature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898237692


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   Ok. I suppose constness isn't a big deal here. The API seems a little complicated for what's going on but I suppose having the lock makes it hard to factor things out conveniently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1157099561

   I think we have two important pieces discussed here. _One_ is how Substrait-UDF usage is benefitted and the _second_ is how the function registry usage must be modified. Since the function registry usage is an important piece for the first task, should we address it first and move for the second. Just a thought. We could test the usage of the temporary FR further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907719221


##########
cpp/src/arrow/engine/substrait/extension_set.h:
##########
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <set>

Review Comment:
   Probably a leftover; I'll check.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907431339


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -40,22 +41,81 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 
 /// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
 ///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
 /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// The output of each top-level Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of the single Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);

Review Comment:
   For both this and the other `DeserializePlan` method it doesn't make too much sense for the user to have to provide an entire factory.
   
   Instead could we change `const WriteOptionsFactory& write_options_factory` to `dataset::WriteNodeOptions write_options`?
   
   We should also change the other `DeserializePlan` to use `std::shared_ptr<compute::SinkNodeConsumer>` instead of `const ConsumerFactory&` but that could be part of a future cleanup.



##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -315,6 +315,11 @@ struct ExtensionIdRegistryImpl : ExtensionIdRegistry {
     return Status::OK();
   }
 
+  Status RegisterFunction(std::string uri, std::string name,

Review Comment:
   These id/string APIs confuse me so I apologize if this is off base but I think these two `RegisterFunction` methods are backwards.  It seems like it makes more sense to just change primary `RegisterFunction` to take in strings instead of an `Id`.  The first thing the function does is make a value-copy of the `Id` so I don't see any benefit to passing in an `Id`.
   
   Then, if we need it, we could add a helper overload that goes the other way, accepting an `Id` and calling `to_string` on each of the pieces.



##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", options}});

Review Comment:
   I agree with you.  Every "sink" node (including writes) should probably allow the user to provide a schema for the outgoing batches as we convert from ExecBatch to RecordBatch at the edge.  This would allow the user to supply custom names as well as custom field and schema-level metadata.
   
   The only thing they can't specify is the field types (those have to match what is already the output schema for the node).  So we can either:
   
    * Use schema and throw an error if the user-supplied schema's types don't match
    * Create a new type that is just the names & metadata and not types.



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const ExtensionIdRegistry* registry);
+
 /// \brief Make a nested registry with the default registry as parent.
 /// See arrow::engine::nested_extension_id_registry for details.
 ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry();
 
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+                                            const std::string& id_uri,
+                                            const std::string& id_name,
+                                            const std::string& arrow_function_name);

Review Comment:
   A future PR sounds fine.



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {

Review Comment:
   > AFAIU, a Substrait plan must have some consumer for proper execution. Up to now, only a ConsumingSinkNodeOption was provided. A NullSinkNodeConsumer is useful with the writing capability added here.
   
   But you also added a custom `DeserializePlans` that takes in a `WriteNodeFactory` and doesn't require a consumer.  So if the user is only doing writes then they should use that method and the null consumer shouldn't be needed.



##########
cpp/src/arrow/engine/substrait/extension_set.h:
##########
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <set>

Review Comment:
   Is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1153829207

   https://issues.apache.org/jira/browse/ARROW-16823


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897546757


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);
+
+  /// \brief Add a new function to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
   Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);
 
-  /// \brief Add aliases for the given function name. Returns Status::KeyError if the
-  /// function with the given name is not registered
+  /// \brief Check whether an alias can be added for the given function name.
+  ///
+  /// \returns Status::KeyError if the function with the given name is not registered.
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name);

Review Comment:
   My reason for using `Status` as the return type is that the caller gets an explanation when not OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901092558


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", options}});
+  };
+}
+
+static Result<std::vector<compute::Declaration>> DeserializePlans(

Review Comment:
   Yes, it is internal. I'll move it to an anonymous namespace. It is not exposed because the type `DeclarationFactory` of one of its arguments isn't. There are variations of `DeserializePlans` with `ConsumerFactory` and `WriteOptionsFactory` that I believe should be sufficient for a while.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898206705


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   Sorry, looks like I confused you. The flag I meant is [`bool add` of `DoAddFunction`](https://github.com/apache/arrow/pull/13375/files#diff-703810f407203fb49563d6b8dc719052ee0fe9828559dc29bc667040cf3ebbbaR158) - compare the [invocation in `CanAddFunction`](https://github.com/apache/arrow/pull/13375/files#diff-703810f407203fb49563d6b8dc719052ee0fe9828559dc29bc667040cf3ebbbaR45) with [that in `AddFunction`](https://github.com/apache/arrow/pull/13375/files#diff-703810f407203fb49563d6b8dc719052ee0fe9828559dc29bc667040cf3ebbbaR52).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1156891212

   > @vibhatha, I'm not up to date on Acero/Substrait progress anymore. Are the changes here reasonable?
   
   I have [some explanation here](https://issues.apache.org/jira/browse/ARROW-16823?focusedCommentId=17554780&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17554780), in case it helps. The TBD parts are expected in an upcoming PR (or two) I'll prepare.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897547126


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,53 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+static compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                                      std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897718175


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -115,20 +210,42 @@ std::unique_ptr<FunctionRegistry> FunctionRegistry::Make() {
   return std::unique_ptr<FunctionRegistry>(new FunctionRegistry());
 }
 
-FunctionRegistry::FunctionRegistry() { impl_.reset(new FunctionRegistryImpl()); }
+std::unique_ptr<FunctionRegistry> FunctionRegistry::Make(FunctionRegistry* parent) {
+  return std::unique_ptr<FunctionRegistry>(new FunctionRegistry(

Review Comment:
   Instead, can we use `arrow::internal::make_unique`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896743013


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);
+  }
 
-    const std::string& name = function->name();
-    auto it = name_to_function_.find(name);
-    if (it != name_to_function_.end() && !allow_overwrite) {
-      return Status::KeyError("Already have a function registered with name: ", name);
+  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
     }
-    name_to_function_[name] = std::move(function);
-    return Status::OK();
+    return DoAddFunction(function, allow_overwrite, /*add=*/true);
+  }
+
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/false);
   }
 
   Status AddAlias(const std::string& target_name, const std::string& source_name) {
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));

Review Comment:
   Gotcha. Sorry I missed this one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896730323


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,53 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+static compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                                      std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}

Review Comment:
   Okay, may be we can put then in anonymous namespace?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896742518


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);
+
+  /// \brief Add a new function to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
   Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);
 
-  /// \brief Add aliases for the given function name. Returns Status::KeyError if the
-  /// function with the given name is not registered
+  /// \brief Check whether an alias can be added for the given function name.
+  ///
+  /// \returns Status::KeyError if the function with the given name is not registered.
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name);

Review Comment:
   Should this return `bool`?



##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -116,7 +119,7 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
         } else {
           return Status::NotImplemented(
               "substrait::ReadRel::LocalFiles::FileOrFiles::format "
-              "other than FILE_FORMAT_PARQUET");
+              "other than FILE_FORMAT_PARQUET and not recognized");

Review Comment:
   ```suggestion
                 "other than FILE_FORMAT_PARQUET is not recognized");
   ```



##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   nit: take `const std::shared_ptr<Function>&` since this is a check? Also, I would expect this to return `bool` and be declared `const`



##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);
+
+  /// \brief Add a new function to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
   Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);
 
-  /// \brief Add aliases for the given function name. Returns Status::KeyError if the
-  /// function with the given name is not registered
+  /// \brief Check whether an alias can be added for the given function name.
+  ///
+  /// \returns Status::KeyError if the function with the given name is not registered.
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name);
+
+  /// \brief Add alias for the given function name.
+  ///
+  /// \returns Status::KeyError if the function with the given name is not registered.
   Status AddAlias(const std::string& target_name, const std::string& source_name);
 
-  /// \brief Add a new function options type to the registry. Returns Status::KeyError if
-  /// a function options type with the same name is already registered
+  /// \brief Check whether a new function options type can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function options type with the same name is already

Review Comment:
   Note it's `\return` in Doxygen



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,37 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(

Review Comment:
   nit: docstring?



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -68,7 +68,7 @@ class SubstraitExecutor {
                              compute::ExecContext exec_context)
       : plan_(std::move(plan)), exec_context_(exec_context) {}
 
-  ~SubstraitExecutor() { ARROW_CHECK_OK(this->Close()); }
+  ~SubstraitExecutor() { ARROW_UNUSED(this->Close()); }

Review Comment:
   Hmm, this should either `CHECK` or it should warn if not OK, but we shouldn't swallow the error.



##########
cpp/src/arrow/compute/registry_test.cc:
##########
@@ -85,5 +96,139 @@ TEST_F(TestRegistry, Basics) {
   ASSERT_EQ(func, f2);
 }
 
+INSTANTIATE_TEST_SUITE_P(
+    TestRegistry, TestRegistry,
+    testing::Values(
+        std::make_tuple(
+            static_cast<MakeFunctionRegistry>([]() { return FunctionRegistry::Make(); }),
+            []() { return 0; }, []() { return std::vector<std::string>{}; }, "default"),
+        std::make_tuple(
+            static_cast<MakeFunctionRegistry>([]() {
+              return FunctionRegistry::Make(GetFunctionRegistry());
+            }),
+            []() { return GetFunctionRegistry()->num_functions(); },
+            []() { return GetFunctionRegistry()->GetFunctionNames(); }, "nested")));
+
+TEST(TestRegistry, RegisterTempFunctions) {
+  auto default_registry = GetFunctionRegistry();
+  constexpr int rounds = 3;

Review Comment:
   nit: why is this test repeated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907736969


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", options}});

Review Comment:
   Please create a JIRA for this. Perhaps there should be a hierarchy - names only, names plus metadata, and schema - in increasing order of detail. The names-only option is what's needed for the Substrait plan. If the caller provides a schema then the types must match, as you say.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1171611084

   This is good to go from my point of view. I have an upcoming PyArrow UDF PR that will use the changes in this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897341336


##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -68,7 +68,7 @@ class SubstraitExecutor {
                              compute::ExecContext exec_context)
       : plan_(std::move(plan)), exec_context_(exec_context) {}
 
-  ~SubstraitExecutor() { ARROW_CHECK_OK(this->Close()); }
+  ~SubstraitExecutor() { ARROW_UNUSED(this->Close()); }

Review Comment:
   I agree. Also note that the flag-solution could still lead to a crash in the destructor if `Close()` fails there and was not called earlier. Besides avoiding a crash, the proposed `SubstraitExecutor` code ensures that `ExecuteSerializedPlan` either successfully calls `Close()` or reports some error back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1155071043

   @vibhatha, I think this PR is ready for review. Are you the one to review it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896846352


##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -68,7 +68,7 @@ class SubstraitExecutor {
                              compute::ExecContext exec_context)
       : plan_(std::move(plan)), exec_context_(exec_context) {}
 
-  ~SubstraitExecutor() { ARROW_CHECK_OK(this->Close()); }
+  ~SubstraitExecutor() { ARROW_UNUSED(this->Close()); }

Review Comment:
   It gets checked [earlier here](https://github.com/apache/arrow/pull/13375/files#diff-0078df867c0f64dcfe79c149ee007f0933d164a32f122e28d79872a969b5be05R121). I moved the check there because I observed that the original code crashed (in this destructor) when there was some error during the execution of the plan. Such errors happen normally, and I wanted explanations for errors to surface.
   
   Regarding a warning in the destructor, I could add it if you still want it. Note that an error would get logged even if the user handled the error in the [earlier call to `ExecuteSerializedPlan`](https://github.com/apache/arrow/pull/13375/files#diff-0078df867c0f64dcfe79c149ee007f0933d164a32f122e28d79872a969b5be05R111), and I'm not sure this is what users would like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896736948


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -98,14 +111,96 @@ class FunctionRegistry::FunctionRegistryImpl {
       const std::string& name) const {
     auto it = name_to_options_type_.find(name);
     if (it == name_to_options_type_.end()) {
+      if (parent_ != NULLPTR) {
+        return parent_->GetFunctionOptionsType(name);
+      }
       return Status::KeyError("No function options type registered with name: ", name);
     }
     return it->second;
   }
 
-  int num_functions() const { return static_cast<int>(name_to_function_.size()); }
+  int num_functions() const {
+    return (parent_ == NULLPTR ? 0 : parent_->num_functions()) +
+           static_cast<int>(name_to_function_.size());
+  }
 
  private:
+  // must not acquire mutex
+  Status CanAddFunctionName(const std::string& name, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(name, allow_overwrite));
+    }
+    if (!allow_overwrite) {
+      auto it = name_to_function_.find(name);
+      if (it != name_to_function_.end()) {
+        return Status::KeyError("Already have a function registered with name: ", name);
+      }
+    }
+    return Status::OK();
+  }
+
+  // must not acquire mutex
+  Status CanAddOptionsTypeName(const std::string& name, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddOptionsTypeName(name, allow_overwrite));
+    }
+    if (!allow_overwrite) {
+      auto it = name_to_options_type_.find(name);
+      if (it != name_to_options_type_.end()) {
+        return Status::KeyError(
+            "Already have a function options type registered with name: ", name);
+      }
+    }
+    return Status::OK();
+  }
+
+  Status DoAddFunction(std::shared_ptr<Function> function, bool allow_overwrite,
+                       bool add) {
+#ifndef NDEBUG
+    // This validates docstrings extensively, so don't waste time on it
+    // in release builds.
+    RETURN_NOT_OK(function->Validate());
+#endif
+
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+
+    const std::string& name = function->name();
+    RETURN_NOT_OK(CanAddFunctionName(name, allow_overwrite));
+    if (add) {
+      name_to_function_[name] = std::move(function);
+    }
+    return Status::OK();
+  }
+
+  Status DoAddAlias(const std::string& target_name, const std::string& source_name,
+                    bool add) {
+    // source name must exist in this registry or the parent
+    // check outside mutex, in case GetFunction leads to mutex acquisition
+    ARROW_ASSIGN_OR_RAISE(auto func, GetFunction(source_name));
+
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+
+    // target name must be available in this registry and the parent
+    RETURN_NOT_OK(CanAddFunctionName(target_name, /*allow_overwrite=*/false));

Review Comment:
   do we need to mention `/*allow_overwrite=*/` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896739252


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);
+  }
 
-    const std::string& name = function->name();
-    auto it = name_to_function_.find(name);
-    if (it != name_to_function_.end() && !allow_overwrite) {
-      return Status::KeyError("Already have a function registered with name: ", name);
+  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
     }
-    name_to_function_[name] = std::move(function);
-    return Status::OK();
+    return DoAddFunction(function, allow_overwrite, /*add=*/true);
+  }
+
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/false);
   }
 
   Status AddAlias(const std::string& target_name, const std::string& source_name) {
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));

Review Comment:
   See https://google.github.io/styleguide/cppguide.html#Function_Argument_Comments so I don't think any of these need to be changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896732718


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);
+  }
 
-    const std::string& name = function->name();
-    auto it = name_to_function_.find(name);
-    if (it != name_to_function_.end() && !allow_overwrite) {
-      return Status::KeyError("Already have a function registered with name: ", name);
+  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
     }
-    name_to_function_[name] = std::move(function);
-    return Status::OK();
+    return DoAddFunction(function, allow_overwrite, /*add=*/true);
+  }
+
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/false);
   }
 
   Status AddAlias(const std::string& target_name, const std::string& source_name) {
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));

Review Comment:
   Our general style is to annotate literal parameters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901081082


##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const ExtensionIdRegistry* registry);
+
 /// \brief Make a nested registry with the default registry as parent.
 /// See arrow::engine::nested_extension_id_registry for details.
 ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry();
 
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+                                            const std::string& id_uri,
+                                            const std::string& id_name,
+                                            const std::string& arrow_function_name);
+
+ARROW_ENGINE_EXPORT const std::string& default_extension_types_uri();

Review Comment:
   This is intended to be used from PyArrow code, and just returns [the constant `kArrowExtTypesUri`](https://github.com/apache/arrow/blob/4ade394a7a0fa22ecb8ef2e3b4dc8bb42e599274/cpp/src/arrow/engine/substrait/extension_set.h#L100-L102). This constant is used in other places in the Arrow Substrait code, and likely embedded in existing Substrait plans, and so should (and need) not be changed.
   
   I believe YAML extension/inheritance is a separate issue that is not affected by this code change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901165605


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901095244


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan

Review Comment:
   I'll fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901085767


##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const ExtensionIdRegistry* registry);
+
 /// \brief Make a nested registry with the default registry as parent.
 /// See arrow::engine::nested_extension_id_registry for details.
 ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry();
 
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+                                            const std::string& id_uri,
+                                            const std::string& id_name,
+                                            const std::string& arrow_function_name);

Review Comment:
   Generally, I added functions in [cpp/src/arrow/engine/substrait/util.h](https://github.com/apache/arrow/pull/13375/files/cbf9c10c80b03e929b0880814aa03f960ab3517b#diff-2da5e8c8a425f3caf753bd2508931448d2b61ec4e3f3b26670c84ac03bf63e14) to be used from PyArrow code in an upcoming PR. This PyArrow code will completely encapsulate the registration of functions embedded within a Substrait plan. Granted, non-embedded functions will still require manual registration that could be improved as you suggested; let's do this in a later PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1164734789

   @westonpace, is this good to go?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907801495


##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -315,6 +315,11 @@ struct ExtensionIdRegistryImpl : ExtensionIdRegistry {
     return Status::OK();
   }
 
+  Status RegisterFunction(std::string uri, std::string name,

Review Comment:
   I'm pretty sure the current configuration actually requires *more* copies and not fewer.  Consider the two implementations:
   
   A - Take in string_view, make a copy of it, and store the copy into `uris_` and `names_` (current impl)
   B - Take in string, move it into `uris_` and `names_` (proposed impl)
   
   If the caller no longer needs the string then they can do...
   
   ```
   RegisterFunction(std::move(uri), std::move(name), std::move(function_name));
   ```
   ...no copy would be needed for approach B but a copy would be needed for approach A.  If the caller needs a copy to be made (they still need to use uri or name) then they simply would not move and one copy would be made.
   
   That being said, the performance of one extra copy is probably negligible here, and, as you pointed out, this is how the code was before.  I don't think we have to fix this if you don't want to do so right now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1157097556

   > > @vibhatha, I'm not up to date on Acero/Substrait progress anymore. Are the changes here reasonable?
   > 
   > I have [some explanation here](https://issues.apache.org/jira/browse/ARROW-16823?focusedCommentId=17554780&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17554780), in case it helps. The TBD parts are expected in an upcoming PR (or two) I'll prepare.
   
   @rtpsw I added a comment to the JIRA. Appreciate your feedback to clarify the design and usage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897548067


##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,37 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896730323


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,53 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+static compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                                      std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}

Review Comment:
   Okay, may be we can put this in anonymous namespace?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896848148


##########
cpp/src/arrow/compute/registry_test.cc:
##########
@@ -85,5 +96,139 @@ TEST_F(TestRegistry, Basics) {
   ASSERT_EQ(func, f2);
 }
 
+INSTANTIATE_TEST_SUITE_P(
+    TestRegistry, TestRegistry,
+    testing::Values(
+        std::make_tuple(
+            static_cast<MakeFunctionRegistry>([]() { return FunctionRegistry::Make(); }),
+            []() { return 0; }, []() { return std::vector<std::string>{}; }, "default"),
+        std::make_tuple(
+            static_cast<MakeFunctionRegistry>([]() {
+              return FunctionRegistry::Make(GetFunctionRegistry());
+            }),
+            []() { return GetFunctionRegistry()->num_functions(); },
+            []() { return GetFunctionRegistry()->GetFunctionNames(); }, "nested")));
+
+TEST(TestRegistry, RegisterTempFunctions) {
+  auto default_registry = GetFunctionRegistry();
+  constexpr int rounds = 3;

Review Comment:
   The test uses additional rounds to make sure that there are no leftover registrations from a previous round.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901077543


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {

Review Comment:
   See use in `cpp/src/arrow/engine/substrait/util.cc` for deserializing (and later executing) Substrait plans without a consumer (e.g., just writers). I intend to use this utility from PyArrow code in an upcoming PR.
   
   AFAIU, a Substrait plan must have some consumer for proper execution. Up to now, only a `ConsumingSinkNodeOption` was provided. A `NullSinkNodeConsumer` is useful with the writing capability added here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901085767


##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const ExtensionIdRegistry* registry);
+
 /// \brief Make a nested registry with the default registry as parent.
 /// See arrow::engine::nested_extension_id_registry for details.
 ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry();
 
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+                                            const std::string& id_uri,
+                                            const std::string& id_name,
+                                            const std::string& arrow_function_name);

Review Comment:
   Generally, I added functions in `[cpp/src/arrow/engine/substrait/util.h](https://github.com/apache/arrow/pull/13375/files/cbf9c10c80b03e929b0880814aa03f960ab3517b#diff-2da5e8c8a425f3caf753bd2508931448d2b61ec4e3f3b26670c84ac03bf63e14)` to be used from PyArrow code in an upcoming PR. This PyArrow code will completely encapsulate the registration of functions embedded within a Substrait plan. Granted, non-embedded functions will still require manual registration that could be improved as you suggested; let's do this in a later PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901079847


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -116,7 +119,7 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
         } else {
           return Status::NotImplemented(
               "substrait::ReadRel::LocalFiles::FileOrFiles::format "
-              "other than FILE_FORMAT_PARQUET");
+              "other than FILE_FORMAT_PARQUET with an unrecognized file extension");

Review Comment:
   I'll fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907788223


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {

Review Comment:
   True, but the method on line 132 is, itself, never used that I can tell.  Also, the docs for that method mention plans with just writes which seems incorrect since those plans would use `WriteNodeFactory`.  If we're going to add new public methods then it should be clear the situation in which the are needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907730758


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -40,22 +41,81 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 
 /// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
 ///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
 /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// The output of each top-level Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of the single Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);

Review Comment:
   I think changing from factories to instances would lead to trouble, `ConsumerFactory` (and `WriteOptionsFactory` too) is called once per `PlanRel` in the Substrait plan in order to get a distinct instance per invocation, despite the factory taking no arguments, which often leads to returning equal instances. However, using an instance instead of a factory would pretty much force reusing the same instance with each `PlanRel`, which isn't right. To avoid reusing such an instance, it would have to support safe duplication, which would be more annoying for the caller to implement than a factory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r910427701


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {

Review Comment:
   I moved `NullSinkNodeConsumer` into `serde_test.cc`, which hopefully is acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896732396


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);
+  }
 
-    const std::string& name = function->name();
-    auto it = name_to_function_.find(name);
-    if (it != name_to_function_.end() && !allow_overwrite) {
-      return Status::KeyError("Already have a function registered with name: ", name);
+  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
     }
-    name_to_function_[name] = std::move(function);
-    return Status::OK();
+    return DoAddFunction(function, allow_overwrite, /*add=*/true);
+  }
+
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));

Review Comment:
   Do we need to mention this in a comment? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896961031


##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -68,7 +68,7 @@ class SubstraitExecutor {
                              compute::ExecContext exec_context)
       : plan_(std::move(plan)), exec_context_(exec_context) {}
 
-  ~SubstraitExecutor() { ARROW_CHECK_OK(this->Close()); }
+  ~SubstraitExecutor() { ARROW_UNUSED(this->Close()); }

Review Comment:
   In that case, `Close()` should probably be fixed so that it's idempotent/we should have some flag to tell that we don't need to close



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896736948


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -98,14 +111,96 @@ class FunctionRegistry::FunctionRegistryImpl {
       const std::string& name) const {
     auto it = name_to_options_type_.find(name);
     if (it == name_to_options_type_.end()) {
+      if (parent_ != NULLPTR) {
+        return parent_->GetFunctionOptionsType(name);
+      }
       return Status::KeyError("No function options type registered with name: ", name);
     }
     return it->second;
   }
 
-  int num_functions() const { return static_cast<int>(name_to_function_.size()); }
+  int num_functions() const {
+    return (parent_ == NULLPTR ? 0 : parent_->num_functions()) +
+           static_cast<int>(name_to_function_.size());
+  }
 
  private:
+  // must not acquire mutex
+  Status CanAddFunctionName(const std::string& name, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(name, allow_overwrite));
+    }
+    if (!allow_overwrite) {
+      auto it = name_to_function_.find(name);
+      if (it != name_to_function_.end()) {
+        return Status::KeyError("Already have a function registered with name: ", name);
+      }
+    }
+    return Status::OK();
+  }
+
+  // must not acquire mutex
+  Status CanAddOptionsTypeName(const std::string& name, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddOptionsTypeName(name, allow_overwrite));
+    }
+    if (!allow_overwrite) {
+      auto it = name_to_options_type_.find(name);
+      if (it != name_to_options_type_.end()) {
+        return Status::KeyError(
+            "Already have a function options type registered with name: ", name);
+      }
+    }
+    return Status::OK();
+  }
+
+  Status DoAddFunction(std::shared_ptr<Function> function, bool allow_overwrite,
+                       bool add) {
+#ifndef NDEBUG
+    // This validates docstrings extensively, so don't waste time on it
+    // in release builds.
+    RETURN_NOT_OK(function->Validate());
+#endif
+
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+
+    const std::string& name = function->name();
+    RETURN_NOT_OK(CanAddFunctionName(name, allow_overwrite));
+    if (add) {
+      name_to_function_[name] = std::move(function);
+    }
+    return Status::OK();
+  }
+
+  Status DoAddAlias(const std::string& target_name, const std::string& source_name,
+                    bool add) {
+    // source name must exist in this registry or the parent
+    // check outside mutex, in case GetFunction leads to mutex acquisition
+    ARROW_ASSIGN_OR_RAISE(auto func, GetFunction(source_name));
+
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+
+    // target name must be available in this registry and the parent
+    RETURN_NOT_OK(CanAddFunctionName(target_name, /*allow_overwrite=*/false));

Review Comment:
   do we need to mention `/*allow_overwrite=*` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896138388


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,53 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+static compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                                      std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}

Review Comment:
   In principle, there should be a benefit for exposing a function that offsets the complexity it adds. What would this benefit be? It's not clear to me right now where this function would be used besides here, so my guess is it would be better to defer until such a use comes up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898289154


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -115,20 +210,42 @@ std::unique_ptr<FunctionRegistry> FunctionRegistry::Make() {
   return std::unique_ptr<FunctionRegistry>(new FunctionRegistry());
 }
 
-FunctionRegistry::FunctionRegistry() { impl_.reset(new FunctionRegistryImpl()); }
+std::unique_ptr<FunctionRegistry> FunctionRegistry::Make(FunctionRegistry* parent) {
+  return std::unique_ptr<FunctionRegistry>(new FunctionRegistry(

Review Comment:
   This fails to compile because `explicit FunctionRegistry(FunctionRegistryImpl* impl)` is private (see compiler output below). Looks like the existing code works because it is all within the scope of `FunctionRegistry` whereas using `make_unique` introduces another scope.
   ```
   In file included from /mnt/user1/tscontract/github/rtpsw/arrow/cpp/src/arrow/compute/registry.cc:19:
   /mnt/user1/tscontract/github/rtpsw/arrow/cpp/src/arrow/util/make_unique.h: In instantiation of ‘typename std::enable_if<(! std::is_array< <template-parameter-1-1> >::value), std::unique_ptr<T> >::type arrow::internal::make_unique(A&& ...) [with T = arrow::compute::FunctionRegistry; A = {arrow::compute::FunctionRegistry::FunctionRegistryImpl*}; typename std::enable_if<(! std::is_array< <template-parameter-1-1> >::value), std::unique_ptr<T> >::type = std::unique_ptr<arrow::compute::FunctionRegistry>]’:
   /mnt/user1/tscontract/github/rtpsw/arrow/cpp/src/arrow/compute/registry.cc:216:70:   required from here
   /mnt/user1/tscontract/github/rtpsw/arrow/cpp/src/arrow/util/make_unique.h:30:29: error: ‘arrow::compute::FunctionRegistry::FunctionRegistry(arrow::compute::FunctionRegistry::FunctionRegistryImpl*)’ is private within this context
      30 |   return std::unique_ptr<T>(new T(std::forward<A>(args)...));
         |                             ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
   /mnt/user1/tscontract/github/rtpsw/arrow/cpp/src/arrow/compute/registry.cc:221:1: note: declared private here
     221 | FunctionRegistry::FunctionRegistry(FunctionRegistryImpl* impl) { impl_.reset(impl); }
         | ^~~~~~~~~~~~~~~~
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898584716


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -115,20 +210,42 @@ std::unique_ptr<FunctionRegistry> FunctionRegistry::Make() {
   return std::unique_ptr<FunctionRegistry>(new FunctionRegistry());
 }
 
-FunctionRegistry::FunctionRegistry() { impl_.reset(new FunctionRegistryImpl()); }
+std::unique_ptr<FunctionRegistry> FunctionRegistry::Make(FunctionRegistry* parent) {
+  return std::unique_ptr<FunctionRegistry>(new FunctionRegistry(

Review Comment:
   I see. May be we can stick to what we had. cc @lidavidm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r895930793


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,53 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+static compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                                      std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}

Review Comment:
   Could this be a util function? It seems it doesn't have dependencies for Substrait. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r895853513


##########
cpp/src/arrow/engine/substrait/plan_internal.cc:
##########
@@ -92,6 +92,9 @@ Status AddExtensionSetToPlan(const ExtensionSet& ext_set, substrait::Plan* plan)
 
 Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
                                              const ExtensionIdRegistry* registry) {

Review Comment:
   Maybe, 
   ```suggestion
   Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
                                                const ExtensionIdRegistry* registry = default_extension_id_registry()) {
   ```
   
   Is there a specific reason why we have to explicitly assign if `NULLPTR`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896962558


##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -68,7 +68,7 @@ class SubstraitExecutor {
                              compute::ExecContext exec_context)
       : plan_(std::move(plan)), exec_context_(exec_context) {}
 
-  ~SubstraitExecutor() { ARROW_CHECK_OK(this->Close()); }
+  ~SubstraitExecutor() { ARROW_UNUSED(this->Close()); }

Review Comment:
   I suppose idempotent is not the right word.
   
   I guess I can see the motivation for this behavior: we can just swallow the error, and it won't matter to "well behaved" application since it will have handled the error, and then we don't need to track any extra state. So I suppose this is ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896732675


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);

Review Comment:
   Do we need to mention this in a comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898200017


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   `DoAddFunction` either does the addition or dry-runs it, depending on the Boolean flag argument. I guess `Do` isn't ideal to describe this (yet I can't think of a better name) but `Try` would not be right. This dry-run behavior is why `CanAddFunction` and `AddFunction` are both implemented using a common function and differ only by the value of a flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1158370666

   Sorry, I'm still catching up from being out earlier this week.  I'll take a look at this tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1156905723

   For background on nested registries, see:
   - [a completed PR on nested extension-id-registries](https://github.com/apache/arrow/pull/13214)
   - [a pending PR on nested function-registries](https://github.com/apache/arrow/pull/13252), which I ended up merging here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901078427


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
+ public:
+  Status Init(const std::shared_ptr<Schema>&, BackpressureControl*) override {
+    return Status::OK();
+  }
+  Status Consume(ExecBatch exec_batch) override { return Status::OK(); }
+  Future<> Finish() override { return Status::OK(); }
+
+ public:
+  static std::shared_ptr<NullSinkNodeConsumer> Make() {
+    return std::make_shared<NullSinkNodeConsumer>();
+  }

Review Comment:
   This allows for convenient passing of `NullSinkNodeConsumer::Make` as an argument. But you're right I didn't use it yet - in `DeserializePlans` of `cpp/src/arrow/engine/substrait/util.cc` - I'll fix it soon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r909881548


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -40,22 +41,81 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 
 /// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
 ///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
 /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// The output of each top-level Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of the single Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);

Review Comment:
   Ah, I didn't notice it was the singular variation. The code currently has the singular variation invoke the plural one, such that the change you're asking is somewhat inconvenient to make, but I'll try fixing for the sake of user convenience.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r900418662


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -116,7 +119,7 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
         } else {
           return Status::NotImplemented(
               "substrait::ReadRel::LocalFiles::FileOrFiles::format "
-              "other than FILE_FORMAT_PARQUET");
+              "other than FILE_FORMAT_PARQUET with an unrecognized file extension");

Review Comment:
   ```suggestion
                 "other than FILE_FORMAT_PARQUET");
   ```
   
   Minor nit:  In the near future I am hoping we will be dropping the `.ends_with` checks and will not be using file extension to determine the format.  It's non-standard (i.e. not a documented behavior in the Substrait spec) and a little hidden.  The latest Substrait spec already has support for specifying the IPC format.
   
   So I'd rather not add a clarification that we will just be turning around and removing.



##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", options}});
+  };
+}
+
+static Result<std::vector<compute::Declaration>> DeserializePlans(

Review Comment:
   Is this method internal?  I don't see it in any .h file.  If so, can we include it in the anonymous namespace.
   
   That being said, why not make it external and add it to serde.h?  I've been hoping for a while we can move away from forcing users to use the consuming sink node so I think making this external would give flexibility to new users.



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {
+ public:
+  Status Init(const std::shared_ptr<Schema>&, BackpressureControl*) override {
+    return Status::OK();
+  }
+  Status Consume(ExecBatch exec_batch) override { return Status::OK(); }
+  Future<> Finish() override { return Status::OK(); }
+
+ public:
+  static std::shared_ptr<NullSinkNodeConsumer> Make() {
+    return std::make_shared<NullSinkNodeConsumer>();
+  }

Review Comment:
   Is this needed?  Is it for consistency with other nodes?  It seems callers could just call `make_shared` themselves.



##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {

Review Comment:
   Is this for benchmarking purposes?  Or some other reason?
   
   Would it instead make sense to have no-op declaration factory that simply didn't add any new nodes?
   
   I'm not really against this approach, just pondering the options.



##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -108,25 +109,50 @@ class SubstraitExecutor {
 }  // namespace
 
 Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer) {
-  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* extid_registry,
+    compute::FunctionRegistry* func_registry) {
   // TODO(ARROW-15732)
   compute::ExecContext exec_context(arrow::default_memory_pool(),
-                                    ::arrow::internal::GetCpuThreadPool());
+                                    ::arrow::internal::GetCpuThreadPool(), func_registry);
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context));
   SubstraitExecutor executor(std::move(plan), exec_context);
-  RETURN_NOT_OK(executor.Init(substrait_buffer));
+  RETURN_NOT_OK(executor.Init(substrait_buffer, extid_registry));
   ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+  // check closing here, not in destructor, to expose error to caller
+  RETURN_NOT_OK(executor.Close());
   return sink_reader;
 }
 
 Result<std::shared_ptr<Buffer>> SerializeJsonPlan(const std::string& substrait_json) {
   return engine::internal::SubstraitFromJSON("Plan", substrait_json);
 }
 
+Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buffer, const ExtensionIdRegistry* registry) {
+  return engine::DeserializePlans(
+      buffer, []() { return std::make_shared<compute::NullSinkNodeConsumer>(); },
+      registry);
+}
+
 std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry() {
   return nested_extension_id_registry(default_extension_id_registry());
 }
 
+Status RegisterFunction(ExtensionIdRegistry& registry, const std::string& id_uri,
+                        const std::string& id_name,
+                        const std::string& arrow_function_name) {
+  const std::string& id_uri_sym = registry.AddExternalSymbol(id_uri);
+  const std::string& id_name_sym = registry.AddExternalSymbol(id_name);
+  const std::string& arrow_function_name_sym =
+      registry.AddExternalSymbol(arrow_function_name);

Review Comment:
   I'm pretty sure this is redundant because `RegisterFunction` already takes ownership of the Arrow function name (in `function_names_`)



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///

Review Comment:
   Can you add a brief description here explaining why a user might want to do this?  It could be something like "this can be useful for testing or benchmarking purposes"



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const ExtensionIdRegistry* registry);
+
 /// \brief Make a nested registry with the default registry as parent.
 /// See arrow::engine::nested_extension_id_registry for details.
 ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry();
 
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+                                            const std::string& id_uri,
+                                            const std::string& id_name,
+                                            const std::string& arrow_function_name);
+
+ARROW_ENGINE_EXPORT const std::string& default_extension_types_uri();

Review Comment:
   Is the intention to also use this as the URI for extension functions?  I think I'm a fan of simpler URIs like: `https://arrow.apache.org/substrait/v1/extensions.yaml`
   
   Either way, you can probably safely ignore this comment until we fix the Substrait picture here to allow for YAML extension/inheritance.



##########
cpp/src/arrow/engine/substrait/extension_set.h:
##########
@@ -95,6 +96,17 @@ class ARROW_ENGINE_EXPORT ExtensionIdRegistry {
   virtual Status CanRegisterFunction(Id,
                                      const std::string& arrow_function_name) const = 0;
   virtual Status RegisterFunction(Id, std::string arrow_function_name) = 0;
+
+  /// \brief Add a symbol external to the plan yet used in an Id.
+  ///
+  /// This ensures the symbol, which is only viewed but not held by the Id, lives while
+  /// the extension set does. Symbols appearing in the Substrait plan are already held.
+  const std::string& AddExternalSymbol(const std::string& symbol) {
+    return *external_symbols.insert(symbol).first;
+  }

Review Comment:
   I think we should have...
   
   ```
   // Registers the function and takes ownership of uri, name
   RegisterFunction(std::string uri, std::string name, std::string arrow_function_name) = 0;
   // Registers the function without taking ownership of id
   RegisterFunction(Id id, std::string arrow_function_name) = 0;
   ```
   
   Then `external_symbols` can be an implementation detail.



##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const ExtensionIdRegistry* registry);
+
 /// \brief Make a nested registry with the default registry as parent.
 /// See arrow::engine::nested_extension_id_registry for details.
 ARROW_ENGINE_EXPORT std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry();
 
+/// \brief Register a function manually.
+///
+/// Register an arrow function name by an ID, defined by a URI and a name, on a given
+/// extension-id-registry.
+///
+/// \param[in] registry an extension-id-registry to use
+/// \param[in] id_uri a URI of the ID to register by
+/// \param[in] id_name a name of the ID to register by
+/// \param[in] arrow_function_name name of arrow function to register
+ARROW_ENGINE_EXPORT Status RegisterFunction(ExtensionIdRegistry& registry,
+                                            const std::string& id_uri,
+                                            const std::string& id_name,
+                                            const std::string& arrow_function_name);

Review Comment:
   I'm not a big fan of the fact that we'd have to call `RegisterFunction` twice for every UDF (once for the Arrow registry and once for the Substrait registry).  I think I'd like to someday get to a point where there is a special URI / convention so that anytime the consumer encounters a URI like `https://arrow.apache.org/substrait/v1/custom.yaml` it will automatically convert the Substrait args to Arrow args using some convention and then make a call to the function registry.
   
   However, that can be for the future.  This seems like a reasonable approach for the moment.



##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+

Review Comment:
   For symmetry it might be nice to have a `DeserializePlan` which takes a single instance of `dataset::WriteNodeOptions` and returns an error if the underlying plan has multiple top-level relations.  I imagine this would be a useful convenience to have as well for callers.



##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(

Review Comment:
   This will need some unit test coverage.



##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan

Review Comment:
   ```suggestion
   /// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
   ///
   /// The output of each top-level Substrait relation will be written to a filesystem.
   /// `write_options_factory` can be used to control write behavior.
   ///
   /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
   ```
   
   These overloads are likely to be a little confusing to users.  The docstring should clearly explain the difference between the two.
   
   We can expand on the docstring for the other overload a little too...
   
   ```
   The output of each top-level Substrait relation will be sent to a caller supplied
   consumer function provided by consumer_factory
   ```



##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", options}});

Review Comment:
   Using a project node for this seems like overkill.  Probably `WriteNodeOptions` should just include a new field which is an array of names (or maybe replace `custom_metadata` with `custom_schema`).  However, this change does not have to be part of this PR.  I can create a JIRA for this cleanup later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901100555


##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -108,25 +109,50 @@ class SubstraitExecutor {
 }  // namespace
 
 Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer) {
-  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make());
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* extid_registry,
+    compute::FunctionRegistry* func_registry) {
   // TODO(ARROW-15732)
   compute::ExecContext exec_context(arrow::default_memory_pool(),
-                                    ::arrow::internal::GetCpuThreadPool());
+                                    ::arrow::internal::GetCpuThreadPool(), func_registry);
+  ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context));
   SubstraitExecutor executor(std::move(plan), exec_context);
-  RETURN_NOT_OK(executor.Init(substrait_buffer));
+  RETURN_NOT_OK(executor.Init(substrait_buffer, extid_registry));
   ARROW_ASSIGN_OR_RAISE(auto sink_reader, executor.Execute());
+  // check closing here, not in destructor, to expose error to caller
+  RETURN_NOT_OK(executor.Close());
   return sink_reader;
 }
 
 Result<std::shared_ptr<Buffer>> SerializeJsonPlan(const std::string& substrait_json) {
   return engine::internal::SubstraitFromJSON("Plan", substrait_json);
 }
 
+Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buffer, const ExtensionIdRegistry* registry) {
+  return engine::DeserializePlans(
+      buffer, []() { return std::make_shared<compute::NullSinkNodeConsumer>(); },
+      registry);
+}
+
 std::shared_ptr<ExtensionIdRegistry> MakeExtensionIdRegistry() {
   return nested_extension_id_registry(default_extension_id_registry());
 }
 
+Status RegisterFunction(ExtensionIdRegistry& registry, const std::string& id_uri,
+                        const std::string& id_name,
+                        const std::string& arrow_function_name) {
+  const std::string& id_uri_sym = registry.AddExternalSymbol(id_uri);
+  const std::string& id_name_sym = registry.AddExternalSymbol(id_name);
+  const std::string& arrow_function_name_sym =
+      registry.AddExternalSymbol(arrow_function_name);

Review Comment:
   I'll fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907718160


##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -315,6 +315,11 @@ struct ExtensionIdRegistryImpl : ExtensionIdRegistry {
     return Status::OK();
   }
 
+  Status RegisterFunction(std::string uri, std::string name,

Review Comment:
   The `Id` variation was there before my changes here, and I suspect the idea of using `util::string_view` instead of `std::string` was to avoid copying twice - once in converting any passed `util::string_view` (but not `std::string`) typed arguments to `std::string` and second inside the registration code. So, I believe the `util::string_view` variation should be the one that stays, and the code inside needs to convert everything to a (held) `std::string`. Most likely not everything was converted this way in the original code - I recall getting a SIGSEGV when calling from Python before my changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901095299


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -44,18 +45,50 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+

Review Comment:
   I'll add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r895922931


##########
cpp/src/arrow/engine/substrait/plan_internal.cc:
##########
@@ -92,6 +92,9 @@ Status AddExtensionSetToPlan(const ExtensionSet& ext_set, substrait::Plan* plan)
 
 Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
                                              const ExtensionIdRegistry* registry) {

Review Comment:
   Ah! I see your point. Mine was just a suggestion. Let's do what is best. Thanks for pointing that out. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1157078543

   cc @westonpace could please take a look. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898201393


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   Ah…here I had thought the `allow_overwrite` was meant for the user not for the internal implementation. Can't we factor out an explicit check instead of relying on a flag?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace merged pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace merged PR #13375:
URL: https://github.com/apache/arrow/pull/13375


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r910321990


##########
cpp/src/arrow/engine/substrait/extension_set.cc:
##########
@@ -315,6 +315,11 @@ struct ExtensionIdRegistryImpl : ExtensionIdRegistry {
     return Status::OK();
   }
 
+  Status RegisterFunction(std::string uri, std::string name,

Review Comment:
   Let's defer, then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1153837157

   @rtpsw I did skim through the PR, interesting!. 
   What is required to get a functional test case to evaluate registering a UDF? I went through the JIRA, but it is not clear what is meant by 
   > registering a function (with an Id) external to the plan
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1153940711

   > @rtpsw I did skim through the PR, interesting!. What is required to get a functional test case to evaluate registering a UDF? I went through the JIRA, but it is not clear what is meant by
   > 
   > > registering a function (with an Id) external to the plan
   
   You're right, this isn't trivial. The issue is that `Id` is defined with two members of type `util::string_view`, which point to strings that must be held elsewhere while the plan is in scope. This is satisfied for an `Id` that has these fields pointing to strings parsed from the Substrait plan itself. However, if a user registers a function with an `Id` whose field values are external to the plan, then the strings they point to should be held while the plan is in scope, so it makes sense to keep the strings on the plan; this is what `AddExternalSymbol` is meant for.
   
   I intend to add test cases a bit later. This PR is an extraction from a larger project project I'm working on for end-to-end (Ibis/Ibis-Substrait/PyArrow) support for Python-UDFs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898279240


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   OK, I'll keep this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1156784745

   Right now I see only a minor change waiting for me to make. Let me know if you're still reviewing and I'll hold for your notification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1159218051

   > I skipped over the changes to nested function registries since I already reviewed those (I think) in #13252 .
   
   I think there are a few additions here, so I'll try to rebase to make the diff clear.
   
   > This will enable custom non-embedded functions to be used in Substrait plans although I'd prefer it to be a bit more automatic (e.g. not requiring a second register call).
   
   I'll look into this.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907807493


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -40,22 +41,81 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 
 /// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
 ///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
 /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// The output of each top-level Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of the single Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);

Review Comment:
   This would only be for the `DeserializePlan` methods which should fail if there is more than one `PlanRel` anyways so I'm not sure it is an issue?  No copies would need to be made of the options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897348892


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   Turns out changing to `const` causes a compilation error when the implementation calls `DoAddFunction`, which is designed to be called from both `AddFunction` and `CanAddFunction`. I wouldn't like to resolve this by using a `const_cast` or by creating `const` and non-`const` versions of `DoAddFunction` (perhaps with different names). Moreover, such two versions involve a non-reentrant mutex acquisition that would make it annoying to avoid code duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1156818130

   @vibhatha, I'm not up to date on Acero/Substrait progress anymore. Are the changes here reasonable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
lidavidm commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897868044


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   I suppose why does **Can**AddFunction call **Do**AddFunction? Unless you meant to make it **Try**AddFunction or something (I think that is my confusion). In the latter case then the existing signature is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897332451


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -116,7 +119,7 @@ Result<compute::Declaration> FromProto(const substrait::Rel& rel,
         } else {
           return Status::NotImplemented(
               "substrait::ReadRel::LocalFiles::FileOrFiles::format "
-              "other than FILE_FORMAT_PARQUET");
+              "other than FILE_FORMAT_PARQUET and not recognized");

Review Comment:
   What I meant by "and not recognized" is that the given file format (which is specified as other than parquet) cannot be recognized (automatically using the file extension). I'll make the error message clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r910679133


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -40,22 +41,81 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 
 /// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
 ///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
 /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// The output of each top-level Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of the single Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901095936


##########
cpp/src/arrow/engine/substrait/util.h:
##########
@@ -30,17 +31,45 @@ namespace substrait {
 
 /// \brief Retrieve a RecordBatchReader from a Substrait plan.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<RecordBatchReader>> ExecuteSerializedPlan(
-    const Buffer& substrait_buffer);
+    const Buffer& substrait_buffer, const ExtensionIdRegistry* registry = NULLPTR,
+    compute::FunctionRegistry* func_registry = NULLPTR);
 
 /// \brief Get a Serialized Plan from a Substrait JSON plan.
 /// This is a helper method for Python tests.
 ARROW_ENGINE_EXPORT Result<std::shared_ptr<Buffer>> SerializeJsonPlan(
     const std::string& substrait_json);
 
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+/// including a no-op consumer of the sink output
+///

Review Comment:
   I'll add.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r901100424


##########
cpp/src/arrow/engine/substrait/extension_set.h:
##########
@@ -95,6 +96,17 @@ class ARROW_ENGINE_EXPORT ExtensionIdRegistry {
   virtual Status CanRegisterFunction(Id,
                                      const std::string& arrow_function_name) const = 0;
   virtual Status RegisterFunction(Id, std::string arrow_function_name) = 0;
+
+  /// \brief Add a symbol external to the plan yet used in an Id.
+  ///
+  /// This ensures the symbol, which is only viewed but not held by the Id, lives while
+  /// the extension set does. Symbols appearing in the Substrait plan are already held.
+  const std::string& AddExternalSymbol(const std::string& symbol) {
+    return *external_symbols.insert(symbol).first;
+  }

Review Comment:
   I'll fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1159665217

   Rebase done and pushed using `git push --force-with-lease origin HEAD`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907806229


##########
cpp/src/arrow/engine/substrait/serde.cc:
##########
@@ -58,12 +58,57 @@ Result<compute::Declaration> DeserializeRelation(const Buffer& buf,
   return FromProto(rel, ext_set);
 }
 
-Result<std::vector<compute::Declaration>> DeserializePlans(
-    const Buffer& buf, const ConsumerFactory& consumer_factory,
+using DeclarationFactory = std::function<compute::Declaration(
+    compute::Declaration, std::vector<std::string> names)>;
+
+static DeclarationFactory MakeConsumingSinkDeclarationFactory(
+    const ConsumerFactory& consumer_factory) {
+  return [&consumer_factory](compute::Declaration input, std::vector<std::string> names) {
+    std::shared_ptr<compute::ExecNodeOptions> options =
+        std::make_shared<compute::ConsumingSinkNodeOptions>(
+            compute::ConsumingSinkNodeOptions{consumer_factory(), std::move(names)});
+    return compute::Declaration::Sequence(
+        {std::move(input), {"consuming_sink", options}});
+  };
+}
+
+namespace {
+
+compute::Declaration ProjectByNamesDeclaration(compute::Declaration input,
+                                               std::vector<std::string> names) {
+  int names_size = static_cast<int>(names.size());
+  if (names_size == 0) {
+    return input;
+  }
+  std::vector<compute::Expression> expressions;
+  for (int i = 0; i < names_size; i++) {
+    expressions.push_back(compute::field_ref(FieldRef(i)));
+  }
+  return compute::Declaration::Sequence(
+      {std::move(input),
+       {"project",
+        compute::ProjectNodeOptions{std::move(expressions), std::move(names)}}});
+}
+
+}  // namespace
+
+static DeclarationFactory MakeWriteDeclarationFactory(
+    const WriteOptionsFactory& write_options_factory) {
+  return [&write_options_factory](compute::Declaration input,
+                                  std::vector<std::string> names) {
+    compute::Declaration projected = ProjectByNamesDeclaration(input, names);
+    std::shared_ptr<compute::ExecNodeOptions> options = write_options_factory();
+    return compute::Declaration::Sequence({std::move(projected), {"write", options}});

Review Comment:
   ARROW-16915



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907730758


##########
cpp/src/arrow/engine/substrait/serde.h:
##########
@@ -40,22 +41,81 @@ using ConsumerFactory = std::function<std::shared_ptr<compute::SinkNodeConsumer>
 
 /// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
 ///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
 /// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
 /// message
 /// \param[in] consumer_factory factory function for generating the node that consumes
 /// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
 /// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
 /// Plan is returned here.
 /// \return a vector of ExecNode declarations, one for each toplevel relation in the
 /// Substrait Plan
 ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
     const Buffer& buf, const ConsumerFactory& consumer_factory,
-    ExtensionSet* ext_set_out = NULLPTR);
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
 
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of each top-level Substrait relation will be sent to a caller supplied
+/// consumer function provided by consumer_factory
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] consumer_factory factory function for generating the node that consumes
+/// the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return an ExecNode corresponding to the single toplevel relation in the Substrait
+/// Plan
 Result<compute::ExecPlan> DeserializePlan(const Buffer& buf,
                                           const ConsumerFactory& consumer_factory,
+                                          const ExtensionIdRegistry* registry = NULLPTR,
                                           ExtensionSet* ext_set_out = NULLPTR);
 
+/// Factory function type for generating the write options of a node consuming the batches
+/// produced by each toplevel Substrait relation when deserializing a Substrait Plan.
+using WriteOptionsFactory = std::function<std::shared_ptr<dataset::WriteNodeOptions>()>;
+
+/// \brief Deserializes a Substrait Plan message to a list of ExecNode declarations
+///
+/// The output of each top-level Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<std::vector<compute::Declaration>> DeserializePlans(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);
+
+/// \brief Deserializes a single-relation Substrait Plan message to an execution plan
+///
+/// The output of the single Substrait relation will be written to a filesystem.
+/// `write_options_factory` can be used to control write behavior.
+///
+/// \param[in] buf a buffer containing the protobuf serialization of a Substrait Plan
+/// message
+/// \param[in] write_options_factory factory function for generating the write options of
+/// a node consuming the batches produced by each toplevel Substrait relation
+/// \param[in] registry an extension-id-registry to use, or null for the default one.
+/// \param[out] ext_set_out if non-null, the extension mapping used by the Substrait
+/// Plan is returned here.
+/// \return a vector of ExecNode declarations, one for each toplevel relation in the
+/// Substrait Plan
+ARROW_ENGINE_EXPORT Result<compute::ExecPlan> DeserializePlan(
+    const Buffer& buf, const WriteOptionsFactory& write_options_factory,
+    const ExtensionIdRegistry* registry = NULLPTR, ExtensionSet* ext_set_out = NULLPTR);

Review Comment:
   I think changing from factories to instances would lead to trouble. `ConsumerFactory` (and `WriteOptionsFactory` too) is called once per `PlanRel` in the Substrait plan in order to get a distinct instance per invocation, despite the factory taking no arguments, which often leads to returning equal instances. However, using an instance instead of a factory would pretty much force reusing the same instance with each `PlanRel`, which isn't right. To avoid reusing such an instance, it would have to support safe duplication, which would be more annoying for the caller to implement than a factory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r907744244


##########
cpp/src/arrow/compute/exec/options.h:
##########
@@ -229,6 +229,20 @@ class ARROW_EXPORT SinkNodeConsumer {
   virtual Future<> Finish() = 0;
 };
 
+class ARROW_EXPORT NullSinkNodeConsumer : public SinkNodeConsumer {

Review Comment:
   In the latest commit here, this `NullSinkNodeConsumer` is used in [line 132 of `util.cc`](https://github.com/apache/arrow/pull/13375/files#diff-0078df867c0f64dcfe79c149ee007f0933d164a32f122e28d79872a969b5be05R132). You're right that it's not for the reason I noted above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1155084458

   > @vibhatha, I think this PR is ready for review. Are you the one to review it?
   
   @rtpsw I was reading it now. But I won't be a major reviewer. I will be closely reading and co-review certain parts. 
   
   cc @westonpace @lidavidm could you please take a look. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896733517


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);
+  }
 
-    const std::string& name = function->name();
-    auto it = name_to_function_.find(name);
-    if (it != name_to_function_.end() && !allow_overwrite) {
-      return Status::KeyError("Already have a function registered with name: ", name);
+  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
     }
-    name_to_function_[name] = std::move(function);
-    return Status::OK();
+    return DoAddFunction(function, allow_overwrite, /*add=*/true);
+  }
+
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/false);
   }
 
   Status AddAlias(const std::string& target_name, const std::string& source_name) {
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/true);
+  }
 
-    auto it = name_to_function_.find(source_name);
-    if (it == name_to_function_.end()) {
-      return Status::KeyError("No function registered with name: ", source_name);
+  Status CanAddFunctionOptionsType(const FunctionOptionsType* options_type,
+                                   bool allow_overwrite = false) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionOptionsType(options_type, allow_overwrite));
     }
-    name_to_function_[target_name] = it->second;
-    return Status::OK();
+    return DoAddFunctionOptionsType(options_type, allow_overwrite, /*add=*/false);

Review Comment:
   Comment needed?



##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);
+  }
 
-    const std::string& name = function->name();
-    auto it = name_to_function_.find(name);
-    if (it != name_to_function_.end() && !allow_overwrite) {
-      return Status::KeyError("Already have a function registered with name: ", name);
+  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
     }
-    name_to_function_[name] = std::move(function);
-    return Status::OK();
+    return DoAddFunction(function, allow_overwrite, /*add=*/true);
+  }
+
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/false);
   }
 
   Status AddAlias(const std::string& target_name, const std::string& source_name) {
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/true);
+  }
 
-    auto it = name_to_function_.find(source_name);
-    if (it == name_to_function_.end()) {
-      return Status::KeyError("No function registered with name: ", source_name);
+  Status CanAddFunctionOptionsType(const FunctionOptionsType* options_type,
+                                   bool allow_overwrite = false) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionOptionsType(options_type, allow_overwrite));
     }
-    name_to_function_[target_name] = it->second;
-    return Status::OK();
+    return DoAddFunctionOptionsType(options_type, allow_overwrite, /*add=*/false);
   }
 
   Status AddFunctionOptionsType(const FunctionOptionsType* options_type,
                                 bool allow_overwrite = false) {
-    std::lock_guard<std::mutex> mutation_guard(lock_);
-
-    const std::string name = options_type->type_name();
-    auto it = name_to_options_type_.find(name);
-    if (it != name_to_options_type_.end() && !allow_overwrite) {
-      return Status::KeyError(
-          "Already have a function options type registered with name: ", name);
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionOptionsType(options_type, allow_overwrite));
     }
-    name_to_options_type_[name] = options_type;
-    return Status::OK();
+    return DoAddFunctionOptionsType(options_type, allow_overwrite, /*add=*/true);

Review Comment:
   Comment needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896737423


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -98,14 +111,96 @@ class FunctionRegistry::FunctionRegistryImpl {
       const std::string& name) const {
     auto it = name_to_options_type_.find(name);
     if (it == name_to_options_type_.end()) {
+      if (parent_ != NULLPTR) {
+        return parent_->GetFunctionOptionsType(name);
+      }
       return Status::KeyError("No function options type registered with name: ", name);
     }
     return it->second;
   }
 
-  int num_functions() const { return static_cast<int>(name_to_function_.size()); }
+  int num_functions() const {
+    return (parent_ == NULLPTR ? 0 : parent_->num_functions()) +
+           static_cast<int>(name_to_function_.size());
+  }
 
  private:
+  // must not acquire mutex
+  Status CanAddFunctionName(const std::string& name, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(name, allow_overwrite));
+    }
+    if (!allow_overwrite) {
+      auto it = name_to_function_.find(name);
+      if (it != name_to_function_.end()) {
+        return Status::KeyError("Already have a function registered with name: ", name);
+      }
+    }
+    return Status::OK();
+  }
+
+  // must not acquire mutex
+  Status CanAddOptionsTypeName(const std::string& name, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddOptionsTypeName(name, allow_overwrite));
+    }
+    if (!allow_overwrite) {
+      auto it = name_to_options_type_.find(name);
+      if (it != name_to_options_type_.end()) {
+        return Status::KeyError(
+            "Already have a function options type registered with name: ", name);
+      }
+    }
+    return Status::OK();
+  }
+
+  Status DoAddFunction(std::shared_ptr<Function> function, bool allow_overwrite,
+                       bool add) {
+#ifndef NDEBUG
+    // This validates docstrings extensively, so don't waste time on it
+    // in release builds.
+    RETURN_NOT_OK(function->Validate());
+#endif
+
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+
+    const std::string& name = function->name();
+    RETURN_NOT_OK(CanAddFunctionName(name, allow_overwrite));
+    if (add) {
+      name_to_function_[name] = std::move(function);
+    }
+    return Status::OK();
+  }
+
+  Status DoAddAlias(const std::string& target_name, const std::string& source_name,
+                    bool add) {
+    // source name must exist in this registry or the parent
+    // check outside mutex, in case GetFunction leads to mutex acquisition
+    ARROW_ASSIGN_OR_RAISE(auto func, GetFunction(source_name));
+
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+
+    // target name must be available in this registry and the parent
+    RETURN_NOT_OK(CanAddFunctionName(target_name, /*allow_overwrite=*/false));
+    if (add) {
+      name_to_function_[target_name] = func;
+    }
+    return Status::OK();
+  }
+
+  Status DoAddFunctionOptionsType(const FunctionOptionsType* options_type,
+                                  bool allow_overwrite, bool add) {
+    std::lock_guard<std::mutex> mutation_guard(lock_);
+
+    const std::string name = options_type->type_name();
+    RETURN_NOT_OK(CanAddOptionsTypeName(name, /*allow_overwrite=*/false));

Review Comment:
   same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896732126


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -34,59 +34,72 @@ namespace compute {
 
 class FunctionRegistry::FunctionRegistryImpl {
  public:
-  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
-#ifndef NDEBUG
-    // This validates docstrings extensively, so don't waste time on it
-    // in release builds.
-    RETURN_NOT_OK(function->Validate());
-#endif
+  explicit FunctionRegistryImpl(FunctionRegistryImpl* parent = NULLPTR)
+      : parent_(parent) {}
+  ~FunctionRegistryImpl() {}
 
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
+    }
+    return DoAddFunction(function, allow_overwrite, /*add=*/false);
+  }
 
-    const std::string& name = function->name();
-    auto it = name_to_function_.find(name);
-    if (it != name_to_function_.end() && !allow_overwrite) {
-      return Status::KeyError("Already have a function registered with name: ", name);
+  Status AddFunction(std::shared_ptr<Function> function, bool allow_overwrite) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunction(function, allow_overwrite));
     }
-    name_to_function_[name] = std::move(function);
-    return Status::OK();
+    return DoAddFunction(function, allow_overwrite, /*add=*/true);
+  }
+
+  Status CanAddAlias(const std::string& target_name, const std::string& source_name) {
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));
+    }
+    return DoAddAlias(target_name, source_name, /*add=*/false);
   }
 
   Status AddAlias(const std::string& target_name, const std::string& source_name) {
-    std::lock_guard<std::mutex> mutation_guard(lock_);
+    if (parent_ != NULLPTR) {
+      RETURN_NOT_OK(parent_->CanAddFunctionName(target_name,
+                                                /*allow_overwrite=*/false));

Review Comment:
   Do we need to mention this in a comment? since it is just two fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r896833396


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   Both `const` should be fine, so I'll try that. My reason for using `Status`  as the return type is that the caller gets an explanation when not OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r895867777


##########
cpp/src/arrow/engine/substrait/plan_internal.cc:
##########
@@ -92,6 +92,9 @@ Status AddExtensionSetToPlan(const ExtensionSet& ext_set, substrait::Plan* plan)
 
 Result<ExtensionSet> GetExtensionSetFromPlan(const substrait::Plan& plan,
                                              const ExtensionIdRegistry* registry) {

Review Comment:
   I'll need to check it out, but I think the code-safety issue discussed in [ARROW-16811](https://issues.apache.org/jira/browse/ARROW-16811) could apply here, i.e., a developer calling this function may let the argument default instead of exposing it to the user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r897341336


##########
cpp/src/arrow/engine/substrait/util.cc:
##########
@@ -68,7 +68,7 @@ class SubstraitExecutor {
                              compute::ExecContext exec_context)
       : plan_(std::move(plan)), exec_context_(exec_context) {}
 
-  ~SubstraitExecutor() { ARROW_CHECK_OK(this->Close()); }
+  ~SubstraitExecutor() { ARROW_UNUSED(this->Close()); }

Review Comment:
   I agree. Also note that the flag-solution could still lead to a crash in the destructor if `Close()` fails there and was not called earlier. Besides avoiding a crash, the proposed `SubstraitExecutor` code ensures that `ExecuteSerializedPlan` calls `Close()` or reports some error back or both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] vibhatha commented on pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
vibhatha commented on PR #13375:
URL: https://github.com/apache/arrow/pull/13375#issuecomment-1157078268

   > @vibhatha, I'm not up to date on Acero/Substrait progress anymore. Are the changes here reasonable?
   
   @lidavidm I am going to go through again. I will check with my knowlege on ACERO/substrait. But it would be better to have another review from @westonpace on this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898200017


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   `DoAddFunction` either does the addition or dry-runs it, depending on the Boolean flag argument. I guess `Do` isn't ideal to describe this but `Try` would not be right. This dry-run feature is why `CanAddFunction` and `AddFunction` are both implemented using a common function and differ only by the value of a flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898200590


##########
cpp/src/arrow/compute/registry.cc:
##########
@@ -115,20 +210,42 @@ std::unique_ptr<FunctionRegistry> FunctionRegistry::Make() {
   return std::unique_ptr<FunctionRegistry>(new FunctionRegistry());
 }
 
-FunctionRegistry::FunctionRegistry() { impl_.reset(new FunctionRegistryImpl()); }
+std::unique_ptr<FunctionRegistry> FunctionRegistry::Make(FunctionRegistry* parent) {
+  return std::unique_ptr<FunctionRegistry>(new FunctionRegistry(

Review Comment:
   I'll try this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] rtpsw commented on a diff in pull request #13375: ARROW-16823: [C++] Arrow Substrait enhancements for UDF

Posted by GitBox <gi...@apache.org>.
rtpsw commented on code in PR #13375:
URL: https://github.com/apache/arrow/pull/13375#discussion_r898200017


##########
cpp/src/arrow/compute/registry.h:
##########
@@ -47,35 +47,64 @@ class ARROW_EXPORT FunctionRegistry {
  public:
   ~FunctionRegistry();
 
-  /// \brief Construct a new registry. Most users only need to use the global
-  /// registry
+  /// \brief Construct a new registry.
+  ///
+  /// Most users only need to use the global registry.
   static std::unique_ptr<FunctionRegistry> Make();
 
-  /// \brief Add a new function to the registry. Returns Status::KeyError if a
-  /// function with the same name is already registered
+  /// \brief Construct a new nested registry with the given parent.
+  ///
+  /// Most users only need to use the global registry. The returned registry never changes
+  /// its parent, even when an operation allows overwritting.
+  static std::unique_ptr<FunctionRegistry> Make(FunctionRegistry* parent);
+
+  /// \brief Check whether a new function can be added to the registry.
+  ///
+  /// \returns Status::KeyError if a function with the same name is already registered.
+  Status CanAddFunction(std::shared_ptr<Function> function, bool allow_overwrite = false);

Review Comment:
   `DoAddFunction` either does the addition or dry-runs it, depending on the Boolean flag argument. I guess `Do` isn't ideal to describe this (yet I can't think of a better name) but `Try` would not be right. This dry-run feature is why `CanAddFunction` and `AddFunction` are both implemented using a common function and differ only by the value of a flag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org