You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by am...@apache.org on 2022/04/13 14:42:48 UTC

[arrow] branch master updated: ARROW-15526: [Python] Support for Dataset.join

This is an automated email from the ASF dual-hosted git repository.

amolina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b1fc905d0 ARROW-15526: [Python] Support for Dataset.join
4b1fc905d0 is described below

commit 4b1fc905d0a75306176dfef34d90109804358440
Author: Alessandro Molina <am...@turbogears.org>
AuthorDate: Wed Apr 13 16:42:40 2022 +0200

    ARROW-15526: [Python] Support for Dataset.join
    
    Closes #12841 from amol-/ARROW-15526
    
    Authored-by: Alessandro Molina <am...@turbogears.org>
    Signed-off-by: Alessandro Molina <am...@turbogears.org>
---
 python/pyarrow/_dataset.pyx          | 50 +++++++++++++++++++-
 python/pyarrow/_exec_plan.pyx        | 20 +++++---
 python/pyarrow/table.pxi             |  3 +-
 python/pyarrow/tests/test_dataset.py | 90 ++++++++++++++++++++++++++++++++++++
 4 files changed, 154 insertions(+), 9 deletions(-)

diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx
index de30f530c7..4bc5f2cfd6 100644
--- a/python/pyarrow/_dataset.pyx
+++ b/python/pyarrow/_dataset.pyx
@@ -27,7 +27,7 @@ import warnings
 
 import pyarrow as pa
 from pyarrow.lib cimport *
-from pyarrow.lib import ArrowTypeError, frombytes, tobytes
+from pyarrow.lib import ArrowTypeError, frombytes, tobytes, _pc
 from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow._compute cimport Expression, _bind
 from pyarrow._fs cimport FileSystem, FileInfo, FileSelector
@@ -623,6 +623,54 @@ cdef class FileSystemDataset(Dataset):
         """The FileFormat of this source."""
         return FileFormat.wrap(self.filesystem_dataset.format())
 
+    def join(self, right_dataset, keys, right_keys=None, join_type="left outer",
+             left_suffix=None, right_suffix=None, coalesce_keys=True,
+             use_threads=True):
+        """
+        Perform a join between this dataset and another one.
+
+        Result of the join will be a new dataset, where further
+        operations can be applied.
+
+        Parameters
+        ----------
+        right_dataset : dataset
+            The dataset to join to the current one, acting as the right dataset
+            in the join operation.
+        keys : str or list[str]
+            The columns from current dataset that should be used as keys
+            of the join operation left side.
+        right_keys : str or list[str], default None
+            The columns from the right_dataset that should be used as keys
+            on the join operation right side.
+            When ``None`` use the same key names as the left dataset.
+        join_type : str, default "left outer"
+            The kind of join that should be performed, one of
+            ("left semi", "right semi", "left anti", "right anti",
+            "inner", "left outer", "right outer", "full outer")
+        left_suffix : str, default None
+            Which suffix to add to right column names. This prevents confusion
+            when the columns in left and right datasets have colliding names.
+        right_suffix : str, default None
+            Which suffic to add to the left column names. This prevents confusion
+            when the columns in left and right datasets have colliding names.
+        coalesce_keys : bool, default True
+            If the duplicated keys should be omitted from one of the sides
+            in the join result.
+        use_threads : bool, default True
+            Whenever to use multithreading or not.
+
+        Returns
+        -------
+        InMemoryDataset
+        """
+        if right_keys is None:
+            right_keys = keys
+        return _pc()._exec_plan._perform_join(join_type, self, keys, right_dataset, right_keys,
+                                              left_suffix=left_suffix, right_suffix=right_suffix,
+                                              use_threads=use_threads, coalesce_keys=coalesce_keys,
+                                              output_type=InMemoryDataset)
+
 
 cdef class FileWriteOptions(_Weakrefable):
 
diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 39d36815bd..763329583a 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -28,9 +28,10 @@ from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport *
 from pyarrow.includes.libarrow_dataset cimport *
 from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table)
-from pyarrow.lib import tobytes, _pc
+from pyarrow.lib import tobytes
 from pyarrow._compute cimport Expression, _true
 from pyarrow._dataset cimport Dataset
+from pyarrow._dataset import InMemoryDataset
 
 Initialize()  # Initialise support for Datasets in ExecPlan
 
@@ -45,7 +46,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
         The sources from which the ExecPlan should fetch data.
         In most cases this is only one, unless the first node of the
         plan is able to get data from multiple different sources.
-    output_type : Table or Dataset
+    output_type : Table or InMemoryDataset
         In which format the output should be provided.
     plan : vector[CDeclaration]
         The nodes of the plan that should be applied to the sources
@@ -147,10 +148,12 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
     deref(c_exec_plan).StartProducing()
 
     # Convert output to the expected one.
+    c_out_table = GetResultValue(
+        CTable.FromRecordBatchReader(c_recordbatchreader.get()))
     if output_type == Table:
-        c_out_table = GetResultValue(
-            CTable.FromRecordBatchReader(c_recordbatchreader.get()))
         output = pyarrow_wrap_table(c_out_table)
+    elif output_type == InMemoryDataset:
+        output = InMemoryDataset(pyarrow_wrap_table(c_out_table))
     else:
         raise TypeError("Unsupported output type")
 
@@ -162,7 +165,8 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
 def _perform_join(join_type, left_operand not None, left_keys,
                   right_operand not None, right_keys,
                   left_suffix=None, right_suffix=None,
-                  use_threads=True, coalesce_keys=False):
+                  use_threads=True, coalesce_keys=False,
+                  output_type=Table):
     """
     Perform join of two tables or datasets.
 
@@ -191,6 +195,8 @@ def _perform_join(join_type, left_operand not None, left_keys,
     coalesce_keys : bool, default False
         If the duplicated keys should be omitted from one of the sides
         in the join result.
+    output_type: Table or InMemoryDataset
+        The output type for the exec plan result.
 
     Returns
     -------
@@ -338,7 +344,7 @@ def _perform_join(join_type, left_operand not None, left_keys,
         )
 
     result_table = execplan([left_operand, right_operand],
-                            output_type=Table,
-                            plan=c_decl_plan)
+                            plan=c_decl_plan,
+                            output_type=output_type)
 
     return result_table
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index 2a1af697b8..c4e8326652 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -4724,7 +4724,8 @@ cdef class Table(_PandasConvertible):
             right_keys = keys
         return _pc()._exec_plan._perform_join(join_type, self, keys, right_table, right_keys,
                                               left_suffix=left_suffix, right_suffix=right_suffix,
-                                              use_threads=use_threads, coalesce_keys=coalesce_keys)
+                                              use_threads=use_threads, coalesce_keys=coalesce_keys,
+                                              output_type=Table)
 
     def group_by(self, keys):
         """Declare a grouping over the columns of the table.
diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py
index b4564abef5..511f8206cc 100644
--- a/python/pyarrow/tests/test_dataset.py
+++ b/python/pyarrow/tests/test_dataset.py
@@ -4310,3 +4310,93 @@ def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader):
     )
     table = dataset_reader.to_table(fsds)
     assert table.schema == schema
+
+
+@pytest.mark.dataset
+def test_dataset_join(tempdir):
+    t1 = pa.table({
+        "colA": [1, 2, 6],
+        "col2": ["a", "b", "f"]
+    })
+    ds.write_dataset(t1, tempdir / "t1", format="parquet")
+    ds1 = ds.dataset(tempdir / "t1")
+
+    t2 = pa.table({
+        "colB": [99, 2, 1],
+        "col3": ["Z", "B", "A"]
+    })
+    ds.write_dataset(t2, tempdir / "t2", format="parquet")
+    ds2 = ds.dataset(tempdir / "t2")
+
+    result = ds1.join(ds2, "colA", "colB")
+    assert result.to_table() == pa.table({
+        "colA": [1, 2, 6],
+        "col2": ["a", "b", "f"],
+        "col3": ["A", "B", None]
+    })
+
+    result = ds1.join(ds2, "colA", "colB", join_type="full outer")
+    assert result.to_table().sort_by("colA") == pa.table({
+        "colA": [1, 2, 6, 99],
+        "col2": ["a", "b", "f", None],
+        "col3": ["A", "B", None, "Z"]
+    })
+
+
+@pytest.mark.dataset
+def test_dataset_join_unique_key(tempdir):
+    t1 = pa.table({
+        "colA": [1, 2, 6],
+        "col2": ["a", "b", "f"]
+    })
+    ds.write_dataset(t1, tempdir / "t1", format="parquet")
+    ds1 = ds.dataset(tempdir / "t1")
+
+    t2 = pa.table({
+        "colA": [99, 2, 1],
+        "col3": ["Z", "B", "A"]
+    })
+    ds.write_dataset(t2, tempdir / "t2", format="parquet")
+    ds2 = ds.dataset(tempdir / "t2")
+
+    result = ds1.join(ds2, "colA")
+    assert result.to_table() == pa.table({
+        "colA": [1, 2, 6],
+        "col2": ["a", "b", "f"],
+        "col3": ["A", "B", None]
+    })
+
+    result = ds1.join(ds2, "colA", join_type="full outer", right_suffix="_r")
+    assert result.to_table().sort_by("colA") == pa.table({
+        "colA": [1, 2, 6, 99],
+        "col2": ["a", "b", "f", None],
+        "col3": ["A", "B", None, "Z"]
+    })
+
+
+@pytest.mark.dataset
+def test_dataset_join_collisions(tempdir):
+    t1 = pa.table({
+        "colA": [1, 2, 6],
+        "colB": [10, 20, 60],
+        "colVals": ["a", "b", "f"]
+    })
+    ds.write_dataset(t1, tempdir / "t1", format="parquet")
+    ds1 = ds.dataset(tempdir / "t1")
+
+    t2 = pa.table({
+        "colA": [99, 2, 1],
+        "colB": [99, 20, 10],
+        "colVals": ["Z", "B", "A"]
+    })
+    ds.write_dataset(t2, tempdir / "t2", format="parquet")
+    ds2 = ds.dataset(tempdir / "t2")
+
+    result = ds1.join(ds2, "colA", join_type="full outer", right_suffix="_r")
+    assert result.to_table().sort_by("colA") == pa.table([
+        [1, 2, 6, 99],
+        [10, 20, 60, None],
+        ["a", "b", "f", None],
+        [10, 20, None, 99],
+        ["A", "B", None, "Z"],
+    ], names=["colA", "colB", "colVals", "colB_r", "colVals_r"])