You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/09 10:58:53 UTC

[arrow-datafusion] branch master updated: DataFrame supports intersect and update readme (#1258)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d28a2ac  DataFrame supports intersect and update readme (#1258)
d28a2ac is described below

commit d28a2ac95392f2941cf8eef9afa23378f9224dda
Author: Carlos <wx...@gmail.com>
AuthorDate: Tue Nov 9 18:58:45 2021 +0800

    DataFrame supports intersect and update readme (#1258)
---
 README.md                                  |  4 ++--
 datafusion/src/dataframe.rs                | 15 +++++++++++++++
 datafusion/src/execution/dataframe_impl.rs | 23 +++++++++++++++++++++++
 3 files changed, 40 insertions(+), 2 deletions(-)

diff --git a/README.md b/README.md
index ea946f4..a1c850d 100644
--- a/README.md
+++ b/README.md
@@ -218,8 +218,8 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
 - [ ] Set Operations
   - [x] UNION ALL
   - [x] UNION
-  - [ ] INTERSECT
-  - [ ] INTERSECT ALL
+  - [x] INTERSECT
+  - [x] INTERSECT ALL
   - [ ] EXCEPT
   - [ ] EXCEPT ALL
 - [x] Joins
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 4bfd720..7f4a3e5 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -375,4 +375,19 @@ pub trait DataFrame: Send + Sync {
     /// # }
     /// ```
     fn registry(&self) -> Arc<dyn FunctionRegistry>;
+
+    /// Calculate the intersection of two [`DataFrame`]s.  The two [`DataFrame`]s must have exactly the same schema
+    ///
+    /// ```
+    /// # use datafusion::prelude::*;
+    /// # use datafusion::error::Result;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let mut ctx = ExecutionContext::new();
+    /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
+    /// let df = df.intersect(df.clone())?;
+    /// # Ok(())
+    /// # }
+    /// ```
+    fn intersect(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>>;
 }
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index a313cc1..c202e19 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -231,6 +231,15 @@ impl DataFrame for DataFrameImpl {
                 .build()?,
         )))
     }
+
+    fn intersect(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>> {
+        let left_plan = self.to_logical_plan();
+        let right_plan = dataframe.to_logical_plan();
+        Ok(Arc::new(DataFrameImpl::new(
+            self.ctx_state.clone(),
+            &LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
+        )))
+    }
 }
 
 #[cfg(test)]
@@ -438,6 +447,20 @@ mod tests {
         task.await.expect("task completed successfully");
     }
 
+    #[tokio::test]
+    async fn intersect() -> Result<()> {
+        let df = test_table().await?.select_columns(&["c1", "c3"])?;
+        let plan = df.intersect(df.clone())?;
+        let result = plan.to_logical_plan();
+        let expected = create_plan(
+            "SELECT c1, c3 FROM aggregate_test_100
+            INTERSECT ALL SELECT c1, c3 FROM aggregate_test_100",
+        )
+        .await?;
+        assert_same_plan(&result, &expected);
+        Ok(())
+    }
+
     /// Compare the formatted string representation of two plans for equality
     fn assert_same_plan(plan1: &LogicalPlan, plan2: &LogicalPlan) {
         assert_eq!(format!("{:?}", plan1), format!("{:?}", plan2));