You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jorisvandenbossche (via GitHub)" <gi...@apache.org> on 2023/03/01 13:31:49 UTC

[GitHub] [arrow] jorisvandenbossche opened a new pull request, #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Declaration/ExecNodeOptions bindings

jorisvandenbossche opened a new pull request, #34401:
URL: https://github.com/apache/arrow/pull/34401

   This PR refactors our current custom cython implementation of the Table/Dataset.filter/join/group_by/sort_by methods to use the new bindings for Declaration/ExecNodeOptions (https://github.com/apache/arrow/pull/34102).
   
   Note: this is blocked by https://github.com/apache/arrow/pull/34102 being merged, this currently still contains the changes of that PR as the first commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on pull request #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Acero Declaration/ExecNodeOptions bindings

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on PR #34401:
URL: https://github.com/apache/arrow/pull/34401#issuecomment-1479625933

   With the last commit, I renamed `_exec_plan.pyx` to `_exec_plan.py`, because it no longer contains any cython. That makes github see it as a deleted and new file, though. If you want to see the diff within the file itself, select all but the last commit: https://github.com/apache/arrow/pull/34401/files/11204cb7ff4071b4799bb4f064e8cc9f116351c8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Declaration/ExecNodeOptions bindings

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #34401:
URL: https://github.com/apache/arrow/pull/34401#issuecomment-1450160288

   * Closes: #33976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] ursabot commented on pull request #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Acero Declaration/ExecNodeOptions bindings

Posted by "ursabot (via GitHub)" <gi...@apache.org>.
ursabot commented on PR #34401:
URL: https://github.com/apache/arrow/pull/34401#issuecomment-1487313756

   Benchmark runs are scheduled for baseline = c74540f2fff92c84d231fbab16a32ef26e85f251 and contender = 0ead719bdcc90bdeee4d225f7bb9dab1c458eccf. 0ead719bdcc90bdeee4d225f7bb9dab1c458eccf is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Finished :arrow_down:0.0% :arrow_up:0.0%] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/a7b878a7908b4a28a60981404221afea...5f1c24a0f91b4fc393ba243969780208/)
   [Finished :arrow_down:0.12% :arrow_up:0.06%] [test-mac-arm](https://conbench.ursa.dev/compare/runs/fd52726201f345a2abd045835f9d9bff...33553e5726a946e385906b16635c973a/)
   [Finished :arrow_down:0.26% :arrow_up:0.0%] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/99b8a8b9938f4051a76e6dc1e31c4a20...7607c4a79305488d881b40a08652336e/)
   [Finished :arrow_down:0.25% :arrow_up:0.0%] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/04053f03f13145938df9e5788362ce51...92f178f54c124f5c9d96d9fdc662e12c/)
   Buildkite builds:
   [Finished] [`0ead719b` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2589)
   [Finished] [`0ead719b` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2619)
   [Finished] [`0ead719b` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2587)
   [Finished] [`0ead719b` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2610)
   [Finished] [`c74540f2` ec2-t3-xlarge-us-east-2](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ec2-t3-xlarge-us-east-2/builds/2588)
   [Finished] [`c74540f2` test-mac-arm](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-test-mac-arm/builds/2618)
   [Finished] [`c74540f2` ursa-i9-9960x](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-i9-9960x/builds/2586)
   [Finished] [`c74540f2` ursa-thinkcentre-m75q](https://buildkite.com/apache-arrow/arrow-bci-benchmark-on-ursa-thinkcentre-m75q/builds/2609)
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Acero Declaration/ExecNodeOptions bindings

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on code in PR #34401:
URL: https://github.com/apache/arrow/pull/34401#discussion_r1150200250


##########
python/pyarrow/tests/test_exec_plan.py:
##########
@@ -209,44 +209,27 @@ def test_filter_table_errors():
     })
 
     with pytest.raises(pa.ArrowTypeError):
-        ep._filter_table(
-            t, pc.divide(pc.field("a"), pc.scalar(2)),
-            output_type=pa.Table

Review Comment:
   Since we only ever used `output_type=pa.Table` option for this keyword in `_filter_table`, I removed the keyword and simplified the tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Declaration/ExecNodeOptions bindings

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche commented on code in PR #34401:
URL: https://github.com/apache/arrow/pull/34401#discussion_r1124455798


##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -235,163 +237,121 @@ def _perform_join(join_type, left_operand not None, left_keys,
     -------
     result_table : Table or InMemoryDataset
     """
-    cdef:
-        vector[CFieldRef] c_left_keys
-        vector[CFieldRef] c_right_keys
-        vector[CFieldRef] c_left_columns
-        vector[CFieldRef] c_right_columns
-        vector[CDeclaration] c_decl_plan
-        vector[CExpression] c_projections
-        vector[c_string] c_projected_col_names
-        CJoinType c_join_type
-
     # Prepare left and right tables Keys to send them to the C++ function
     left_keys_order = {}
-    if isinstance(left_keys, str):
+    if not isinstance(left_keys, (tuple, list)):
         left_keys = [left_keys]
     for idx, key in enumerate(left_keys):
         left_keys_order[key] = idx
-        c_left_keys.push_back(CFieldRef(<c_string>tobytes(key)))
 
     right_keys_order = {}
-    if isinstance(right_keys, str):
+    if not isinstance(right_keys, (list, tuple)):
         right_keys = [right_keys]
     for idx, key in enumerate(right_keys):
         right_keys_order[key] = idx
-        c_right_keys.push_back(CFieldRef(<c_string>tobytes(key)))
 
     # By default expose all columns on both left and right table
-    if isinstance(left_operand, Table):
-        left_columns = left_operand.column_names
-    elif isinstance(left_operand, Dataset):
-        left_columns = left_operand.schema.names
-    else:
-        raise TypeError("Unsupported left join member type")
-
-    if isinstance(right_operand, Table):
-        right_columns = right_operand.column_names
-    elif isinstance(right_operand, Dataset):
-        right_columns = right_operand.schema.names
-    else:
-        raise TypeError("Unsupported right join member type")
+    left_columns = left_operand.schema.names
+    right_columns = right_operand.schema.names
 
     # Pick the join type
-    if join_type == "left semi":
-        c_join_type = CJoinType_LEFT_SEMI
-        right_columns = []
-    elif join_type == "right semi":
-        c_join_type = CJoinType_RIGHT_SEMI
-        left_columns = []
-    elif join_type == "left anti":
-        c_join_type = CJoinType_LEFT_ANTI
+    if join_type == "left semi" or join_type == "left anti":
         right_columns = []
-    elif join_type == "right anti":
-        c_join_type = CJoinType_RIGHT_ANTI
+    elif join_type == "right semi" or join_type == "right anti":
         left_columns = []
-    elif join_type == "inner":
-        c_join_type = CJoinType_INNER
-        right_columns = [
-            col for col in right_columns if col not in right_keys_order
-        ]
-    elif join_type == "left outer":
-        c_join_type = CJoinType_LEFT_OUTER
+    elif join_type == "inner" or join_type == "left outer":
         right_columns = [
             col for col in right_columns if col not in right_keys_order
         ]
     elif join_type == "right outer":
-        c_join_type = CJoinType_RIGHT_OUTER
         left_columns = [
             col for col in left_columns if col not in left_keys_order
         ]
-    elif join_type == "full outer":
-        c_join_type = CJoinType_FULL_OUTER
-    else:
-        raise ValueError("Unsupported join type")
 
     # Turn the columns to vectors of FieldRefs
     # and set aside indices of keys.
     left_column_keys_indices = {}
     for idx, colname in enumerate(left_columns):
-        c_left_columns.push_back(CFieldRef(<c_string>tobytes(colname)))
         if colname in left_keys:
             left_column_keys_indices[colname] = idx
     right_column_keys_indices = {}
     for idx, colname in enumerate(right_columns):
-        c_right_columns.push_back(CFieldRef(<c_string>tobytes(colname)))
         if colname in right_keys:
             right_column_keys_indices[colname] = idx
 
     # Add the join node to the execplan
-    if coalesce_keys:
-        c_decl_plan.push_back(
-            CDeclaration(tobytes("hashjoin"), CHashJoinNodeOptions(
-                c_join_type, c_left_keys, c_right_keys,
-                c_left_columns, c_right_columns,
-                _true,
-                <c_string>tobytes(left_suffix or ""),
-                <c_string>tobytes(right_suffix or "")
-            ))
-        )
-        if join_type == "full outer":
-            # In case of full outer joins, the join operation will output all columns
-            # so that we can coalesce the keys and exclude duplicates in a subsequent projection.
-            left_columns_set = set(left_columns)
-            right_columns_set = set(right_columns)
-            # Where the right table columns start.
-            right_operand_index = len(left_columns)
-            for idx, col in enumerate(left_columns + right_columns):
-                if idx < len(left_columns) and col in left_column_keys_indices:
-                    # Include keys only once and coalesce left+right table keys.
-                    c_projected_col_names.push_back(tobytes(col))
-                    # Get the index of the right key that is being paired
-                    # with this left key. We do so by retrieving the name
-                    # of the right key that is in the same position in the provided keys
-                    # and then looking up the index for that name in the right table.
-                    right_key_index = right_column_keys_indices[right_keys[left_keys_order[col]]]
-                    c_projections.push_back(Expression.unwrap(
-                        Expression._call("coalesce", [
-                            Expression._field(idx), Expression._field(
-                                right_operand_index+right_key_index)
-                        ])
-                    ))
-                elif idx >= right_operand_index and col in right_column_keys_indices:
-                    # Do not include right table keys. As they would lead to duplicated keys.
-                    continue
-                else:
-                    # For all the other columns incude them as they are.
-                    # Just recompute the suffixes that the join produced as the projection
-                    # would lose them otherwise.
-                    if left_suffix and idx < right_operand_index and col in right_columns_set:
-                        col += left_suffix
-                    if right_suffix and idx >= right_operand_index and col in left_columns_set:
-                        col += right_suffix
-                    c_projected_col_names.push_back(tobytes(col))
-                    c_projections.push_back(
-                        Expression.unwrap(Expression._field(idx)))
-            c_decl_plan.push_back(
-                CDeclaration(tobytes("project"), CProjectNodeOptions(
-                    c_projections, c_projected_col_names))
-            )
-    else:
-        c_decl_plan.push_back(
-            CDeclaration(tobytes("hashjoin"), CHashJoinNodeOptions(
-                c_join_type, c_left_keys, c_right_keys,
-                _true,
-                <c_string>tobytes(left_suffix or ""),
-                <c_string>tobytes(right_suffix or "")
-            ))
+    # TEMP
+    if isinstance(left_operand, Dataset):
+        left_operand = left_operand.to_table()
+    if isinstance(right_operand, Dataset):
+        right_operand = right_operand.to_table()

Review Comment:
   This is just a temporary hack because for now there is only a TableSourceNodeOptions, I need to add a proper ScanNodeOptions in another pre-cursor PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche merged pull request #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Acero Declaration/ExecNodeOptions bindings

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche merged PR #34401:
URL: https://github.com/apache/arrow/pull/34401


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org