You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "icexelloss (via GitHub)" <gi...@apache.org> on 2023/02/27 19:47:06 UTC

[GitHub] [arrow] icexelloss opened a new pull request, #34373: GH-34366: [Python] Test run_query with a registered scalar UDF

icexelloss opened a new pull request, #34373:
URL: https://github.com/apache/arrow/pull/34373

   <!--
   Thanks for opening a pull request!
   If this is your first pull request you can find detailed information on how 
   to contribute here:
     * [New Contributor's Guide](https://arrow.apache.org/docs/dev/developers/guide/step_by_step/pr_lifecycle.html#reviews-and-merge-of-the-pull-request)
     * [Contributing Overview](https://arrow.apache.org/docs/dev/developers/overview.html)
   
   
   If this is not a [minor PR](https://github.com/apache/arrow/blob/main/CONTRIBUTING.md#Minor-Fixes). Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose
   
   Opening GitHub issues ahead of time contributes to the [Openness](http://theapacheway.com/open/#:~:text=Openness%20allows%20new%20users%20the,must%20happen%20in%20the%20open.) of the Apache Arrow project.
   
   Then could you also rename the pull request title in the following format?
   
       GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   or
   
       MINOR: [${COMPONENT}] ${SUMMARY}
   
   In the case of PARQUET issues on JIRA the title also supports:
   
       PARQUET-${JIRA_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}
   
   -->
   
   ### Rationale for this change
   
   Currently Acero has a way to execute a registered UDF via substrait however there are no tests for it.
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   ### What changes are included in this PR?
   This PR adds a test for passing a registered UDF via a substrait plan.
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   ### Are these changes tested?
   N/A
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   ### Are there any user-facing changes?
   No
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please uncomment the line below and explain which changes are breaking.
   -->
   <!-- **This PR includes breaking changes to public APIs.** -->
   
   <!--
   Please uncomment the line below (and provide explanation) if the changes fix either (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld). We use this to highlight fixes to issues that may affect users without their knowledge. For this reason, fixing bugs that cause errors don't count, since those are usually obvious.
   -->
   <!-- **This PR contains a "Critical Fix".** -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1120191763


##########
python/pyarrow/tests/test_udf.py:
##########
@@ -613,3 +614,180 @@ def test_udt_datasource1_generator():
 def test_udt_datasource1_exception():
     with pytest.raises(RuntimeError, match='datasource1_exception'):
         _test_datasource1_udt(datasource1_exception)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"t": [1, 2, 3], "p": [4, 5, 6]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {

Review Comment:
   I removed first the projection manually, will push a revision soon



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1120266377


##########
python/pyarrow/tests/test_udf.py:
##########
@@ -613,3 +614,180 @@ def test_udt_datasource1_generator():
 def test_udt_datasource1_exception():
     with pytest.raises(RuntimeError, match='datasource1_exception'):
         _test_datasource1_udt(datasource1_exception)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"t": [1, 2, 3], "p": [4, 5, 6]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {
+            "common": {
+              "emit": {
+                "outputMapping": [
+                  2,
+                  3,
+                  4
+                ]
+              }
+            },
+            "input": {
+              "project": {
+                "common": {
+                  "emit": {
+                    "outputMapping": [
+                      2,
+                      3
+                    ]
+                  }
+                },
+                "input": {
+                  "read": {
+                    "baseSchema": {
+                      "names": [
+                        "t",
+                        "p"
+                      ],
+                      "struct": {
+                        "types": [
+                          {
+                            "i64": {
+                              "nullability": "NULLABILITY_REQUIRED"
+                            }
+                          },
+                          {
+                            "i64": {
+                              "nullability": "NULLABILITY_NULLABLE"
+                            }
+                          }
+                        ],
+                        "nullability": "NULLABILITY_REQUIRED"
+                      }
+                    },
+                    "namedTable": {
+                      "names": [
+                        "t1"
+                      ]
+                    }
+                  }
+                },
+                "expressions": [
+                  {
+                    "selection": {
+                      "directReference": {
+                        "structField": {}
+                      },
+                      "rootReference": {}
+                    }
+                  },
+                  {
+                    "selection": {
+                      "directReference": {
+                        "structField": {
+                          "field": 1
+                        }
+                      },
+                      "rootReference": {}
+                    }
+                  }
+                ]
+              }
+            },
+            "expressions": [
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {}
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 1
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "scalarFunction": {
+                  "functionReference": 1,
+                  "outputType": {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {
+                              "field": 1
+                            }
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "t",
+          "p",
+          "p2"
+        ]
+      }
+    }
+  ]
+}
+    """
+
+    buf = pa._substrait._parse_json_plan(tobytes(substrait_query))
+    reader = pa.substrait.run_query(
+        buf, table_provider=table_provider, use_threads=use_threads)
+    res_tb = reader.read_all()
+
+    function, name = unary_func_fixture
+    expected_tb = test_table_1.add_column(2, 'p2', function(
+        mock_scalar_udf_context(10), test_table_1['p']))
+    res_tb = res_tb.rename_columns(['t', 'p', 'p2'])

Review Comment:
   Yeah there is an issue from a while back: https://github.com/apache/arrow/issues/33434



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss merged pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss merged PR #34373:
URL: https://github.com/apache/arrow/pull/34373


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1119524825


##########
python/pyarrow/tests/test_udf.py:
##########
@@ -20,6 +20,7 @@
 
 import pyarrow as pa
 from pyarrow import compute as pc
+from pyarrow.lib import tobytes

Review Comment:
   Since we are in pure-python context (and not a cython context) I think you can just use:
   
   ```
   substrait_query = b"""
   ...
   ```
   
   Then you don't have to rely on `tobytes`.  Even if that doesn't work I think `substrait_query.encode()` is still preferable over `tobytes`.



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -613,3 +614,180 @@ def test_udt_datasource1_generator():
 def test_udt_datasource1_exception():
     with pytest.raises(RuntimeError, match='datasource1_exception'):
         _test_datasource1_udt(datasource1_exception)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"t": [1, 2, 3], "p": [4, 5, 6]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {

Review Comment:
   Why two project nodes?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -613,3 +614,180 @@ def test_udt_datasource1_generator():
 def test_udt_datasource1_exception():
     with pytest.raises(RuntimeError, match='datasource1_exception'):
         _test_datasource1_udt(datasource1_exception)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"t": [1, 2, 3], "p": [4, 5, 6]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {
+            "common": {
+              "emit": {
+                "outputMapping": [
+                  2,
+                  3,
+                  4
+                ]
+              }
+            },
+            "input": {
+              "project": {
+                "common": {
+                  "emit": {
+                    "outputMapping": [
+                      2,
+                      3
+                    ]
+                  }
+                },
+                "input": {
+                  "read": {
+                    "baseSchema": {
+                      "names": [
+                        "t",
+                        "p"
+                      ],
+                      "struct": {
+                        "types": [
+                          {
+                            "i64": {
+                              "nullability": "NULLABILITY_REQUIRED"
+                            }
+                          },
+                          {
+                            "i64": {
+                              "nullability": "NULLABILITY_NULLABLE"
+                            }
+                          }
+                        ],
+                        "nullability": "NULLABILITY_REQUIRED"
+                      }
+                    },
+                    "namedTable": {
+                      "names": [
+                        "t1"
+                      ]
+                    }
+                  }
+                },
+                "expressions": [
+                  {
+                    "selection": {
+                      "directReference": {
+                        "structField": {}
+                      },
+                      "rootReference": {}
+                    }
+                  },
+                  {
+                    "selection": {
+                      "directReference": {
+                        "structField": {
+                          "field": 1
+                        }
+                      },
+                      "rootReference": {}
+                    }
+                  }
+                ]
+              }
+            },
+            "expressions": [
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {}
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 1
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "scalarFunction": {
+                  "functionReference": 1,
+                  "outputType": {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {
+                              "field": 1
+                            }
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "t",
+          "p",
+          "p2"
+        ]
+      }
+    }
+  ]
+}
+    """
+
+    buf = pa._substrait._parse_json_plan(tobytes(substrait_query))
+    reader = pa.substrait.run_query(
+        buf, table_provider=table_provider, use_threads=use_threads)
+    res_tb = reader.read_all()
+
+    function, name = unary_func_fixture
+    expected_tb = test_table_1.add_column(2, 'p2', function(
+        mock_scalar_udf_context(10), test_table_1['p']))
+    res_tb = res_tb.rename_columns(['t', 'p', 'p2'])

Review Comment:
   This may explain the problem we are seeing in https://github.com/ibis-project/ibis-substrait/pull/414



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1119317187


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -547,8 +547,9 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
         std::shared_ptr<Field> project_field;
         ARROW_ASSIGN_OR_RAISE(compute::Expression des_expr,
                               FromProto(expr, ext_set, conversion_options));
-        auto bound_expr = des_expr.Bind(*input.output_schema);
-        if (auto* expr_call = bound_expr->call()) {
+        ARROW_ASSIGN_OR_RAISE(

Review Comment:
   Added this for better error handling - before it will segfault if the function cannot be found, now it raises error to user, e.g.
   ```
   >   raise ArrowKeyError(message)
   E   pyarrow.lib.ArrowKeyError: No function registered with name: my_udf
   E   /home/icexelloss/workspace/arrow/cpp/src/arrow/compute/exec/expression.cc:534  GetFunction(call, exec_context)
   E   /home/icexelloss/workspace/arrow/cpp/src/arrow/engine/substrait/relation_internal.cc:550  des_expr.Bind(*input.output_schema)
   E   /home/icexelloss/workspace/arrow/cpp/src/arrow/engine/substrait/serde.cc:157  FromProto(plan_rel.has_root() ? plan_rel.root().input() : plan_rel.rel(), ext_set, conversion_options)
   E   /home/icexelloss/workspace/arrow/cpp/src/arrow/engine/substrait/serde.cc:200  DeserializePlans(buf, MakeNoSinkDeclarationFactory(), registry, ext_set_out, conversion_options)
   E   /home/icexelloss/workspace/arrow/cpp/src/arrow/engine/substrait/util.cc:47  DeserializePlan(substrait_buffer, registry, nullptr, conversion_options)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #34373: GH-34366: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1446970502

   cc @westonpace 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1119883354


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -547,8 +547,9 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
         std::shared_ptr<Field> project_field;
         ARROW_ASSIGN_OR_RAISE(compute::Expression des_expr,
                               FromProto(expr, ext_set, conversion_options));
-        auto bound_expr = des_expr.Bind(*input.output_schema);
-        if (auto* expr_call = bound_expr->call()) {
+        ARROW_ASSIGN_OR_RAISE(

Review Comment:
   Can you also add a test for this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1451635578

   Benchmark runs are scheduled for baseline = 4c1448e85011c24f2dde087dc75035c91be7afcd and contender = e8107bfa58ef5ad50c5c40d3f54bb7a96bdf2d0e. e8107bfa58ef5ad50c5c40d3f54bb7a96bdf2d0e is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/980e94a1766b46c286233f70f7071174...d887cfbdf62848939f55358e8b12ce9c/)
   [Finished :arrow_down:0.31% :arrow_up:0.0%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/a720d54bf2cf4b9ba564dbeacd6ed1bc...2471c94d2f0e42f39dfc67179cbe4533/)
   [Finished :arrow_down:0.26% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/f372006c669a46ab817593a1eca4a10e...c7d8df823b7349b29b20f57893f79177/)
   [Failed :arrow_down:0.13% :arrow_up:0.0%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/239ae030ba4549ae866c5885cf2f064b...f311445f570043e29b42d9b5f17e1d1c/)
   Buildkite builds:
   [Finished] [`e8107bfa` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2459)
   [Finished] [`e8107bfa` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2489)
   [Finished] [`e8107bfa` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2457)
   [Failed] [`e8107bfa` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2480)
   [Finished] [`4c1448e8` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2458)
   [Finished] [`4c1448e8` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2488)
   [Finished] [`4c1448e8` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2456)
   [Finished] [`4c1448e8` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2479)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1120260641


##########
python/pyarrow/tests/test_udf.py:
##########
@@ -613,3 +614,180 @@ def test_udt_datasource1_generator():
 def test_udt_datasource1_exception():
     with pytest.raises(RuntimeError, match='datasource1_exception'):
         _test_datasource1_udt(datasource1_exception)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"t": [1, 2, 3], "p": [4, 5, 6]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {

Review Comment:
   Updated and simplified the plan



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -20,6 +20,7 @@
 
 import pyarrow as pa
 from pyarrow import compute as pc
+from pyarrow.lib import tobytes

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1448679649

   > Not related to this PR but I would love to move away from these large JSON blobs in tests. I'm open to ideas if you have them :)
   
   I think what we can do for now is too keep these json test to minimum and do more heavy testing with ibis -> Acero integration. This is basically what we are doing with our internal integration testing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34373: GH-34366: [Python] Test run_query with a registered scalar UDF

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1446969178

   * Closes: #34366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1448969185

   @westonpace Ok build is green now let me know if any changes you would like me make


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1120170593


##########
python/pyarrow/tests/test_udf.py:
##########
@@ -613,3 +614,180 @@ def test_udt_datasource1_generator():
 def test_udt_datasource1_exception():
     with pytest.raises(RuntimeError, match='datasource1_exception'):
         _test_datasource1_udt(datasource1_exception)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"t": [1, 2, 3], "p": [4, 5, 6]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {

Review Comment:
   Not a very good reason, mostly because I was reusing my existing code to generate the json that has two projections:
   ```
   dt = ...
   dt = dt[['p', 't']]
   dt = dt.assign(p2=...)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1448672740

   @westonpace I am not sure what this error is about:
   
   ```
   ==================================== ERRORS ====================================
   _ ERROR collecting opt/conda/envs/arrow/lib/python3.7/site-packages/pyarrow/tests/test_udf.py _
   ImportError while importing test module '/opt/conda/envs/arrow/lib/python3.7/site-packages/pyarrow/tests/test_udf.py'.
   Hint: make sure your test modules/packages have valid Python names.
   Traceback:
   opt/conda/envs/arrow/lib/python3.7/importlib/__init__.py:127: in import_module
       return _bootstrap._gcd_import(name[level:], package, level)
   opt/conda/envs/arrow/lib/python3.7/site-packages/pyarrow/tests/test_udf.py:22: in <module>
       import pyarrow._substrait
   E   ModuleNotFoundError: No module named 'pyarrow._substrait'
   ```
   from this:
   https://github.com/apache/arrow/actions/runs/4295311322/jobs/7485550126
   
   Any idea?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1446986188

   :warning: GitHub issue #34333 **has been automatically assigned in GitHub** to PR creator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34373: GH-34366: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1119235870


##########
python/pyarrow/tests/test_udf.py:
##########
@@ -613,3 +614,180 @@ def test_udt_datasource1_generator():
 def test_udt_datasource1_exception():
     with pytest.raises(RuntimeError, match='datasource1_exception'):
         _test_datasource1_udt(datasource1_exception)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"t": [1, 2, 3], "p": [4, 5, 6]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = """
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {
+            "common": {
+              "emit": {
+                "outputMapping": [
+                  2,
+                  3,
+                  4
+                ]
+              }
+            },
+            "input": {
+              "project": {
+                "common": {
+                  "emit": {
+                    "outputMapping": [
+                      2,
+                      3
+                    ]
+                  }
+                },
+                "input": {
+                  "read": {
+                    "baseSchema": {
+                      "names": [
+                        "t",
+                        "p"
+                      ],
+                      "struct": {
+                        "types": [
+                          {
+                            "i64": {
+                              "nullability": "NULLABILITY_REQUIRED"
+                            }
+                          },
+                          {
+                            "i64": {
+                              "nullability": "NULLABILITY_NULLABLE"
+                            }
+                          }
+                        ],
+                        "nullability": "NULLABILITY_REQUIRED"
+                      }
+                    },
+                    "namedTable": {
+                      "names": [
+                        "t1"
+                      ]
+                    }
+                  }
+                },
+                "expressions": [
+                  {
+                    "selection": {
+                      "directReference": {
+                        "structField": {}
+                      },
+                      "rootReference": {}
+                    }
+                  },
+                  {
+                    "selection": {
+                      "directReference": {
+                        "structField": {
+                          "field": 1
+                        }
+                      },
+                      "rootReference": {}
+                    }
+                  }
+                ]
+              }
+            },
+            "expressions": [
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {}
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {
+                      "field": 1
+                    }
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "scalarFunction": {
+                  "functionReference": 1,
+                  "outputType": {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {
+                              "field": 1
+                            }
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "t",
+          "p",
+          "p2"
+        ]
+      }
+    }
+  ]
+}
+    """
+
+    buf = pa._substrait._parse_json_plan(tobytes(substrait_query))
+    reader = pa.substrait.run_query(
+        buf, table_provider=table_provider, use_threads=use_threads)
+    res_tb = reader.read_all()
+
+    function, name = unary_func_fixture
+    expected_tb = test_table_1.add_column(2, 'p2', function(
+        mock_scalar_udf_context(10), test_table_1['p']))
+    res_tb = res_tb.rename_columns(['t', 'p', 'p2'])

Review Comment:
   This is because a known bug that Acero doesn't name the result columns correctly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1119523390


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -547,8 +547,9 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
         std::shared_ptr<Field> project_field;
         ARROW_ASSIGN_OR_RAISE(compute::Expression des_expr,
                               FromProto(expr, ext_set, conversion_options));
-        auto bound_expr = des_expr.Bind(*input.output_schema);
-        if (auto* expr_call = bound_expr->call()) {
+        ARROW_ASSIGN_OR_RAISE(

Review Comment:
   Good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on PR #34373:
URL: https://github.com/apache/arrow/pull/34373#issuecomment-1448858075

   > @westonpace I am not sure what this error is about:
   > 
   > ```
   > ==================================== ERRORS ====================================
   > _ ERROR collecting opt/conda/envs/arrow/lib/python3.7/site-packages/pyarrow/tests/test_udf.py _
   > ImportError while importing test module '/opt/conda/envs/arrow/lib/python3.7/site-packages/pyarrow/tests/test_udf.py'.
   > Hint: make sure your test modules/packages have valid Python names.
   > Traceback:
   > opt/conda/envs/arrow/lib/python3.7/importlib/__init__.py:127: in import_module
   >     return _bootstrap._gcd_import(name[level:], package, level)
   > opt/conda/envs/arrow/lib/python3.7/site-packages/pyarrow/tests/test_udf.py:22: in <module>
   >     import pyarrow._substrait
   > E   ModuleNotFoundError: No module named 'pyarrow._substrait'
   > ```
   > 
   > from this: https://github.com/apache/arrow/actions/runs/4295311322/jobs/7485550126
   > 
   > Any idea?
   > 
   > (This passes for me locally)
   
   I see what the issue is - let me try to organize the tests to bypass this
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1120837528


##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -315,3 +320,246 @@ def table_provider(names, _):
     exec_message = "names for NamedTable not provided"
     with pytest.raises(ArrowInvalid, match=exec_message):
         substrait.run_query(buf, table_provider=table_provider)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})

Review Comment:
   ```suggestion
       test_table = pa.Table.from_pydict({"x": [1, 2, 3]})
   ```



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -315,3 +320,246 @@ def table_provider(names, _):
     exec_message = "names for NamedTable not provided"
     with pytest.raises(ArrowInvalid, match=exec_message):
         substrait.run_query(buf, table_provider=table_provider)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1

Review Comment:
   ```suggestion
               return test_table
   ```



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -315,3 +320,246 @@ def table_provider(names, _):
     exec_message = "names for NamedTable not provided"
     with pytest.raises(ArrowInvalid, match=exec_message):
         substrait.run_query(buf, table_provider=table_provider)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names, _):

Review Comment:
   You could even simplify to:
   
   ```
   def table_provider(_names, _schema):
     return test_table
   ```



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -315,3 +320,246 @@ def table_provider(names, _):
     exec_message = "names for NamedTable not provided"
     with pytest.raises(ArrowInvalid, match=exec_message):
         substrait.run_query(buf, table_provider=table_provider)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = b"""
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {
+            "common": {
+              "emit": {
+                "outputMapping": [
+                  1,
+                  2,
+                ]
+              }
+            },
+            "input": {
+              "read": {
+                "baseSchema": {
+                  "names": [
+                    "t",
+                  ],
+                  "struct": {
+                    "types": [
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_REQUIRED"
+                        }
+                      },
+                    ],
+                    "nullability": "NULLABILITY_REQUIRED"
+                  }
+                },
+                "namedTable": {
+                  "names": [
+                    "t1"
+                  ]
+                }
+              }
+            },
+            "expressions": [
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {}
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "scalarFunction": {
+                  "functionReference": 1,
+                  "outputType": {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {}
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "x",
+          "y",
+        ]
+      }
+    }
+  ]
+}
+    """
+
+    buf = pa._substrait._parse_json_plan(substrait_query)
+    reader = pa.substrait.run_query(
+        buf, table_provider=table_provider, use_threads=use_threads)
+    res_tb = reader.read_all()
+
+    function, name = unary_func_fixture
+    expected_tb = test_table_1.add_column(1, 'y', function(
+        mock_scalar_udf_context(10), test_table_1['x']))
+    res_tb = res_tb.rename_columns(['x', 'y'])
+    assert res_tb == expected_tb
+
+
+def test_udf_via_substrait_wrong_udf_name():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})

Review Comment:
   ```suggestion
       test_table = pa.Table.from_pydict({"x": [1, 2, 3]})
   ```



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -315,3 +320,246 @@ def table_provider(names, _):
     exec_message = "names for NamedTable not provided"
     with pytest.raises(ArrowInvalid, match=exec_message):
         substrait.run_query(buf, table_provider=table_provider)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = b"""
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {
+            "common": {
+              "emit": {
+                "outputMapping": [
+                  1,
+                  2,
+                ]
+              }
+            },
+            "input": {
+              "read": {
+                "baseSchema": {
+                  "names": [
+                    "t",
+                  ],
+                  "struct": {
+                    "types": [
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_REQUIRED"
+                        }
+                      },
+                    ],
+                    "nullability": "NULLABILITY_REQUIRED"
+                  }
+                },
+                "namedTable": {
+                  "names": [
+                    "t1"
+                  ]
+                }
+              }
+            },
+            "expressions": [
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {}
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "scalarFunction": {
+                  "functionReference": 1,
+                  "outputType": {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {}
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "x",
+          "y",
+        ]
+      }
+    }
+  ]
+}
+    """
+
+    buf = pa._substrait._parse_json_plan(substrait_query)
+    reader = pa.substrait.run_query(
+        buf, table_provider=table_provider, use_threads=use_threads)
+    res_tb = reader.read_all()
+
+    function, name = unary_func_fixture
+    expected_tb = test_table_1.add_column(1, 'y', function(
+        mock_scalar_udf_context(10), test_table_1['x']))
+    res_tb = res_tb.rename_columns(['x', 'y'])
+    assert res_tb == expected_tb
+
+
+def test_udf_via_substrait_wrong_udf_name():
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1

Review Comment:
   ```suggestion
               return test_table
   ```



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -315,3 +320,246 @@ def table_provider(names, _):
     exec_message = "names for NamedTable not provided"
     with pytest.raises(ArrowInvalid, match=exec_message):
         substrait.run_query(buf, table_provider=table_provider)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = b"""
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {
+            "common": {
+              "emit": {
+                "outputMapping": [
+                  1,
+                  2,
+                ]
+              }
+            },
+            "input": {
+              "read": {
+                "baseSchema": {
+                  "names": [
+                    "t",
+                  ],
+                  "struct": {
+                    "types": [
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_REQUIRED"
+                        }
+                      },
+                    ],
+                    "nullability": "NULLABILITY_REQUIRED"
+                  }
+                },
+                "namedTable": {
+                  "names": [
+                    "t1"
+                  ]
+                }
+              }
+            },
+            "expressions": [
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {}
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "scalarFunction": {
+                  "functionReference": 1,
+                  "outputType": {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {}
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "x",
+          "y",
+        ]
+      }
+    }
+  ]
+}
+    """
+
+    buf = pa._substrait._parse_json_plan(substrait_query)
+    reader = pa.substrait.run_query(
+        buf, table_provider=table_provider, use_threads=use_threads)
+    res_tb = reader.read_all()
+
+    function, name = unary_func_fixture
+    expected_tb = test_table_1.add_column(1, 'y', function(
+        mock_scalar_udf_context(10), test_table_1['x']))

Review Comment:
   ```suggestion
           mock_scalar_udf_context(10), test_table['x']))
   ```



##########
python/pyarrow/tests/test_substrait.py:
##########
@@ -315,3 +320,246 @@ def table_provider(names, _):
     exec_message = "names for NamedTable not provided"
     with pytest.raises(ArrowInvalid, match=exec_message):
         substrait.run_query(buf, table_provider=table_provider)
+
+
+@pytest.mark.parametrize("use_threads", [True, False])
+def test_udf_via_substrait(unary_func_fixture, use_threads):
+    test_table_1 = pa.Table.from_pydict({"x": [1, 2, 3]})
+
+    def table_provider(names, _):
+        if not names:
+            raise Exception("No names provided")
+        elif names[0] == "t1":
+            return test_table_1
+        else:
+            raise Exception("Unrecognized table name")
+
+    substrait_query = b"""
+    {
+  "extensionUris": [
+    {
+      "extensionUriAnchor": 1
+    },
+    {
+      "extensionUriAnchor": 2,
+      "uri": "urn:arrow:substrait_simple_extension_function"
+    }
+  ],
+  "extensions": [
+    {
+      "extensionFunction": {
+        "extensionUriReference": 2,
+        "functionAnchor": 1,
+        "name": "y=x+1"
+      }
+    }
+  ],
+  "relations": [
+    {
+      "root": {
+        "input": {
+          "project": {
+            "common": {
+              "emit": {
+                "outputMapping": [
+                  1,
+                  2,
+                ]
+              }
+            },
+            "input": {
+              "read": {
+                "baseSchema": {
+                  "names": [
+                    "t",
+                  ],
+                  "struct": {
+                    "types": [
+                      {
+                        "i64": {
+                          "nullability": "NULLABILITY_REQUIRED"
+                        }
+                      },
+                    ],
+                    "nullability": "NULLABILITY_REQUIRED"
+                  }
+                },
+                "namedTable": {
+                  "names": [
+                    "t1"
+                  ]
+                }
+              }
+            },
+            "expressions": [
+              {
+                "selection": {
+                  "directReference": {
+                    "structField": {}
+                  },
+                  "rootReference": {}
+                }
+              },
+              {
+                "scalarFunction": {
+                  "functionReference": 1,
+                  "outputType": {
+                    "i64": {
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  },
+                  "arguments": [
+                    {
+                      "value": {
+                        "selection": {
+                          "directReference": {
+                            "structField": {}
+                          },
+                          "rootReference": {}
+                        }
+                      }
+                    }
+                  ]
+                }
+              }
+            ]
+          }
+        },
+        "names": [
+          "x",
+          "y",
+        ]
+      }
+    }
+  ]
+}
+    """
+
+    buf = pa._substrait._parse_json_plan(substrait_query)
+    reader = pa.substrait.run_query(
+        buf, table_provider=table_provider, use_threads=use_threads)
+    res_tb = reader.read_all()
+
+    function, name = unary_func_fixture
+    expected_tb = test_table_1.add_column(1, 'y', function(

Review Comment:
   ```suggestion
       expected_tb = test_table.add_column(1, 'y', function(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] icexelloss commented on a diff in pull request #34373: GH-34333: [Python] Test run_query with a registered scalar UDF

Posted by "icexelloss (via GitHub)" <gi...@apache.org>.
icexelloss commented on code in PR #34373:
URL: https://github.com/apache/arrow/pull/34373#discussion_r1120261021


##########
cpp/src/arrow/engine/substrait/relation_internal.cc:
##########
@@ -547,8 +547,9 @@ Result<DeclarationInfo> FromProto(const substrait::Rel& rel, const ExtensionSet&
         std::shared_ptr<Field> project_field;
         ARROW_ASSIGN_OR_RAISE(compute::Expression des_expr,
                               FromProto(expr, ext_set, conversion_options));
-        auto bound_expr = des_expr.Bind(*input.output_schema);
-        if (auto* expr_call = bound_expr->call()) {
+        ARROW_ASSIGN_OR_RAISE(

Review Comment:
   Add test for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org