You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jorisvandenbossche (via GitHub)" <gi...@apache.org> on 2023/03/03 13:29:50 UTC

[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #34401: GH-33976: [Python] Refactor the internal usage of ExecPlan to use new Declaration/ExecNodeOptions bindings

jorisvandenbossche commented on code in PR #34401:
URL: https://github.com/apache/arrow/pull/34401#discussion_r1124455798


##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -235,163 +237,121 @@ def _perform_join(join_type, left_operand not None, left_keys,
     -------
     result_table : Table or InMemoryDataset
     """
-    cdef:
-        vector[CFieldRef] c_left_keys
-        vector[CFieldRef] c_right_keys
-        vector[CFieldRef] c_left_columns
-        vector[CFieldRef] c_right_columns
-        vector[CDeclaration] c_decl_plan
-        vector[CExpression] c_projections
-        vector[c_string] c_projected_col_names
-        CJoinType c_join_type
-
     # Prepare left and right tables Keys to send them to the C++ function
     left_keys_order = {}
-    if isinstance(left_keys, str):
+    if not isinstance(left_keys, (tuple, list)):
         left_keys = [left_keys]
     for idx, key in enumerate(left_keys):
         left_keys_order[key] = idx
-        c_left_keys.push_back(CFieldRef(<c_string>tobytes(key)))
 
     right_keys_order = {}
-    if isinstance(right_keys, str):
+    if not isinstance(right_keys, (list, tuple)):
         right_keys = [right_keys]
     for idx, key in enumerate(right_keys):
         right_keys_order[key] = idx
-        c_right_keys.push_back(CFieldRef(<c_string>tobytes(key)))
 
     # By default expose all columns on both left and right table
-    if isinstance(left_operand, Table):
-        left_columns = left_operand.column_names
-    elif isinstance(left_operand, Dataset):
-        left_columns = left_operand.schema.names
-    else:
-        raise TypeError("Unsupported left join member type")
-
-    if isinstance(right_operand, Table):
-        right_columns = right_operand.column_names
-    elif isinstance(right_operand, Dataset):
-        right_columns = right_operand.schema.names
-    else:
-        raise TypeError("Unsupported right join member type")
+    left_columns = left_operand.schema.names
+    right_columns = right_operand.schema.names
 
     # Pick the join type
-    if join_type == "left semi":
-        c_join_type = CJoinType_LEFT_SEMI
-        right_columns = []
-    elif join_type == "right semi":
-        c_join_type = CJoinType_RIGHT_SEMI
-        left_columns = []
-    elif join_type == "left anti":
-        c_join_type = CJoinType_LEFT_ANTI
+    if join_type == "left semi" or join_type == "left anti":
         right_columns = []
-    elif join_type == "right anti":
-        c_join_type = CJoinType_RIGHT_ANTI
+    elif join_type == "right semi" or join_type == "right anti":
         left_columns = []
-    elif join_type == "inner":
-        c_join_type = CJoinType_INNER
-        right_columns = [
-            col for col in right_columns if col not in right_keys_order
-        ]
-    elif join_type == "left outer":
-        c_join_type = CJoinType_LEFT_OUTER
+    elif join_type == "inner" or join_type == "left outer":
         right_columns = [
             col for col in right_columns if col not in right_keys_order
         ]
     elif join_type == "right outer":
-        c_join_type = CJoinType_RIGHT_OUTER
         left_columns = [
             col for col in left_columns if col not in left_keys_order
         ]
-    elif join_type == "full outer":
-        c_join_type = CJoinType_FULL_OUTER
-    else:
-        raise ValueError("Unsupported join type")
 
     # Turn the columns to vectors of FieldRefs
     # and set aside indices of keys.
     left_column_keys_indices = {}
     for idx, colname in enumerate(left_columns):
-        c_left_columns.push_back(CFieldRef(<c_string>tobytes(colname)))
         if colname in left_keys:
             left_column_keys_indices[colname] = idx
     right_column_keys_indices = {}
     for idx, colname in enumerate(right_columns):
-        c_right_columns.push_back(CFieldRef(<c_string>tobytes(colname)))
         if colname in right_keys:
             right_column_keys_indices[colname] = idx
 
     # Add the join node to the execplan
-    if coalesce_keys:
-        c_decl_plan.push_back(
-            CDeclaration(tobytes("hashjoin"), CHashJoinNodeOptions(
-                c_join_type, c_left_keys, c_right_keys,
-                c_left_columns, c_right_columns,
-                _true,
-                <c_string>tobytes(left_suffix or ""),
-                <c_string>tobytes(right_suffix or "")
-            ))
-        )
-        if join_type == "full outer":
-            # In case of full outer joins, the join operation will output all columns
-            # so that we can coalesce the keys and exclude duplicates in a subsequent projection.
-            left_columns_set = set(left_columns)
-            right_columns_set = set(right_columns)
-            # Where the right table columns start.
-            right_operand_index = len(left_columns)
-            for idx, col in enumerate(left_columns + right_columns):
-                if idx < len(left_columns) and col in left_column_keys_indices:
-                    # Include keys only once and coalesce left+right table keys.
-                    c_projected_col_names.push_back(tobytes(col))
-                    # Get the index of the right key that is being paired
-                    # with this left key. We do so by retrieving the name
-                    # of the right key that is in the same position in the provided keys
-                    # and then looking up the index for that name in the right table.
-                    right_key_index = right_column_keys_indices[right_keys[left_keys_order[col]]]
-                    c_projections.push_back(Expression.unwrap(
-                        Expression._call("coalesce", [
-                            Expression._field(idx), Expression._field(
-                                right_operand_index+right_key_index)
-                        ])
-                    ))
-                elif idx >= right_operand_index and col in right_column_keys_indices:
-                    # Do not include right table keys. As they would lead to duplicated keys.
-                    continue
-                else:
-                    # For all the other columns incude them as they are.
-                    # Just recompute the suffixes that the join produced as the projection
-                    # would lose them otherwise.
-                    if left_suffix and idx < right_operand_index and col in right_columns_set:
-                        col += left_suffix
-                    if right_suffix and idx >= right_operand_index and col in left_columns_set:
-                        col += right_suffix
-                    c_projected_col_names.push_back(tobytes(col))
-                    c_projections.push_back(
-                        Expression.unwrap(Expression._field(idx)))
-            c_decl_plan.push_back(
-                CDeclaration(tobytes("project"), CProjectNodeOptions(
-                    c_projections, c_projected_col_names))
-            )
-    else:
-        c_decl_plan.push_back(
-            CDeclaration(tobytes("hashjoin"), CHashJoinNodeOptions(
-                c_join_type, c_left_keys, c_right_keys,
-                _true,
-                <c_string>tobytes(left_suffix or ""),
-                <c_string>tobytes(right_suffix or "")
-            ))
+    # TEMP
+    if isinstance(left_operand, Dataset):
+        left_operand = left_operand.to_table()
+    if isinstance(right_operand, Dataset):
+        right_operand = right_operand.to_table()

Review Comment:
   This is just a temporary hack because for now there is only a TableSourceNodeOptions, I need to add a proper ScanNodeOptions in another pre-cursor PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org