You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jorisvandenbossche (via GitHub)" <gi...@apache.org> on 2023/02/27 11:54:37 UTC

[GitHub] [arrow] jorisvandenbossche commented on a diff in pull request #34234: GH-34235: [Python] Add `join_asof` binding

jorisvandenbossche commented on code in PR #34234:
URL: https://github.com/apache/arrow/pull/34234#discussion_r1118630021


##########
python/pyarrow/_dataset.pyx:
##########
@@ -514,6 +514,53 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+    def join_asof(self, right_dataset, on, by, tolerance, right_on=None, right_by=None):
+        """
+        Perform an asof join between this dataset and another one.
+
+        Result of the join will be a new dataset, where further
+        operations can be applied.
+
+        Parameters
+        ----------
+        right_dataset : dataset
+            The dataset to join to the current one, acting as the right dataset
+            in the join operation.
+        on : str
+            The column from current dataset that should be used as the on key
+            of the join operation left side.
+        by : str or list[str]
+            The columns from current dataset that should be used as the by keys
+            of the join operation left side.
+        tolerance : int

Review Comment:
   If the "on" key is a timestamp column, what value can be used here? (not an int?)



##########
python/pyarrow/_exec_plan.pyx:
##########
@@ -390,6 +390,120 @@ def _perform_join(join_type, left_operand not None, left_keys,
     return result_table
 
 
+def _perform_join_asof(left_operand not None, left_on, left_by,
+                       right_operand not None, right_on, right_by,
+                       tolerance, output_type=Table):
+    """
+    Perform asof join of two tables or datasets.
+
+    The result will be an output table with the result of the join operation
+
+    Parameters
+    ----------
+    left_operand : Table or Dataset
+        The left operand for the join operation.
+    left_on : str
+        The left key (or keys) on which the join operation should be performed.
+    left_by: str or list[str]
+        The left key (or keys) on which the join operation should be performed.
+    right_operand : Table or Dataset
+        The right operand for the join operation.
+    right_on : str or list[str]
+        The right key (or keys) on which the join operation should be performed.
+    right_by: str or list[str]
+        The right key (or keys) on which the join operation should be performed.
+    tolerance : int
+        The tolerance to use for the asof join. The tolerance is interpreted in
+        the same units as the "on" key.
+    output_type: Table or InMemoryDataset
+        The output type for the exec plan result.
+
+    Returns
+    -------
+    result_table : Table or InMemoryDataset
+    """
+    cdef:
+        vector[CFieldRef] c_left_by
+        vector[CFieldRef] c_right_by
+        CAsofJoinKeys c_left_keys
+        CAsofJoinKeys c_right_keys
+        vector[CAsofJoinKeys] c_input_keys
+        vector[CDeclaration] c_decl_plan
+
+    # Prepare left AsofJoinNodeOption::Keys
+    if isinstance(left_by, str):
+        left_by = [left_by]
+    for key in left_by:
+        c_left_by.push_back(CFieldRef(<c_string>tobytes(key)))
+
+    c_left_keys.on_key = CFieldRef(<c_string>tobytes(left_on))
+    c_left_keys.by_key = c_left_by
+
+    c_input_keys.push_back(c_left_keys)
+
+    # Prepare right AsofJoinNodeOption::Keys
+    right_by_order = {}
+    if isinstance(right_by, str):
+        right_by = [right_by]
+    for key in right_by:
+        c_right_by.push_back(CFieldRef(<c_string>tobytes(key)))
+
+    c_right_keys.on_key = CFieldRef(<c_string>tobytes(right_on))
+    c_right_keys.by_key = c_right_by
+
+    c_input_keys.push_back(c_right_keys)
+
+    # By default expose all columns on both left and right table
+    if isinstance(left_operand, Table):
+        left_columns = left_operand.column_names
+    elif isinstance(left_operand, Dataset):
+        left_columns = left_operand.schema.names
+    else:
+        raise TypeError("Unsupported left join member type")

Review Comment:
   Those `left_columns`/`right_columns` is not really used, except for checking the column collisions? 
   What happens if we don't check for this here in cython and there actually is a column collision? Does the C++ implementation give an error for that as well?
   
   



##########
python/pyarrow/table.pxi:
##########
@@ -4993,6 +4993,78 @@ cdef class Table(_PandasConvertible):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=Table)
 
+    def join_asof(self, right_table, on, by, tolerance, right_on=None, right_by=None):
+        """
+        Perform an asof join between this table and another one.
+
+        Result of the join will be a new dataset, where further
+        operations can be applied.
+
+        Parameters
+        ----------
+        right_table : Table
+            The table to join to the current one, acting as the right table
+            in the join operation.
+        on : str
+            The column from current table that should be used as the on key
+            of the join operation left side.
+        by : str or list[str]
+            The columns from current table that should be used as the by keys
+            of the join operation left side.
+        tolerance : int
+            The tolerance for inexact "on" key matching. A right row is considered
+            a match with the left row ``right.on - left.on <= tolerance``. The
+            ``tolerance`` may be:
+                - negative, in which case a past-as-of-join occurs;
+                - or positive, in which case a future-as-of-join occurs;
+                - or zero, in which case an exact-as-of-join occurs.
+
+            The tolerance is interpreted in the same units as the "on" key.
+        right_on : str or list[str], default None
+            The columns from the right_table that should be used as the on key
+            on the join operation right side.
+            When ``None`` use the same key name as the left table.
+        right_by : str or list[str], default None
+            The columns from the right_table that should be used as by keys
+            on the join operation right side.
+            When ``None`` use the same key names as the left table.
+
+        Returns
+        -------
+        Table
+
+        Example
+        --------
+        >>> import pandas as pd
+        >>> import pyarrow as pa
+        >>> df1 = pd.DataFrame({'id': [1, 2, 3],
+        ...                     'year': [2020, 2022, 2019]})
+        >>> df2 = pd.DataFrame({'id': [3, 4],
+        ...                     'year': [2020, 2021],
+        ...                     'n_legs': [5, 100],
+        ...                     'animal': ["Brittle stars", "Centipede"]})
+        >>> t1 = pa.Table.from_pandas(df1).sort_by('year')
+        >>> t2 = pa.Table.from_pandas(df2).sort_by('year')
+
+        >>> t1.join_asof(t2, 'year', 'id', 1).combine_chunks().sort_by('year')

Review Comment:
   Can you use explicit keyword names (except for the first table argument)? I think that will make it easier to understand what those arguments are.



##########
python/pyarrow/tests/test_dataset.py:
##########
@@ -4839,6 +4839,57 @@ def test_dataset_join_collisions(tempdir):
     ], names=["colA", "colB", "colVals", "colB_r", "colVals_r"])
 
 
+@pytest.mark.dataset
+def test_dataset_join_asof(tempdir):
+    t1 = pa.Table.from_pydict({
+        "colA": [1, 1, 5, 6, 7],
+        "col2": ["a", "b", "a", "b", "f"]
+    })
+    ds.write_dataset(t1, tempdir / "t1", format="ipc")
+    ds1 = ds.dataset(tempdir / "t1", format="ipc")
+
+    t2 = pa.Table.from_pydict({
+        "colB": [2, 9, 15],
+        "col3": ["a", "b", "g"],
+        "colC": [1., 3., 5.]
+    })
+    ds.write_dataset(t2, tempdir / "t2", format="ipc")
+    ds2 = ds.dataset(tempdir / "t2", format="ipc")
+
+    result = ds1.join_asof(ds2, "colA", "col2", 1, "colB", "col3")

Review Comment:
   Also here, can you use keyword names instead of positional arguments (except for the first argument) to make it a bit more readable?



##########
python/pyarrow/tests/test_table.py:
##########
@@ -2297,6 +2297,49 @@ def test_table_join_many_columns():
     })
 
 
+@pytest.mark.dataset
+def test_table_join_asof():
+    t1 = pa.Table.from_pydict({
+        "colA": [1, 1, 5, 6, 7],
+        "col2": ["a", "b", "a", "b", "f"]
+    })
+
+    t2 = pa.Table.from_pydict({
+        "colB": [2, 9, 15],
+        "col3": ["a", "b", "g"],
+        "colC": [1., 3., 5.]
+    })
+
+    r = t1.join_asof(t2, "colA", "col2", 1, "colB", "col3")

Review Comment:
   Some additional test case ideas to ensure good coverage:
   
   - A test where the left/right column names are the same, so you can rely on not having to specify right_on/by
   - A test where the `by` keys is a list of columns instead of a single one



##########
python/pyarrow/_dataset.pyx:
##########
@@ -514,6 +514,53 @@ cdef class Dataset(_Weakrefable):
                                               use_threads=use_threads, coalesce_keys=coalesce_keys,
                                               output_type=InMemoryDataset)
 
+    def join_asof(self, right_dataset, on, by, tolerance, right_on=None, right_by=None):
+        """
+        Perform an asof join between this dataset and another one.
+
+        Result of the join will be a new dataset, where further
+        operations can be applied.
+
+        Parameters
+        ----------
+        right_dataset : dataset
+            The dataset to join to the current one, acting as the right dataset
+            in the join operation.
+        on : str
+            The column from current dataset that should be used as the on key
+            of the join operation left side.
+        by : str or list[str]
+            The columns from current dataset that should be used as the by keys
+            of the join operation left side.
+        tolerance : int
+            The tolerance for inexact "on" key matching. A right row is considered
+            a match with the left row `right.on - left.on <= tolerance`. The
+            `tolerance` may be:
+                - negative, in which case a past-as-of-join occurs;
+                - or positive, in which case a future-as-of-join occurs;
+                - or zero, in which case an exact-as-of-join occurs.

Review Comment:
   ```suggestion
   
               - negative, in which case a past-as-of-join occurs;
               - or positive, in which case a future-as-of-join occurs;
               - or zero, in which case an exact-as-of-join occurs.
   ```
   
   Formatting nitpick: restructuredtext is quite strict about this, and so a list shouldn't use indentation but requires a blank line before and after



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org