You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/11/09 10:58:53 UTC
[arrow-datafusion] branch master updated: DataFrame supports
intersect and update readme (#1258)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d28a2ac DataFrame supports intersect and update readme (#1258)
d28a2ac is described below
commit d28a2ac95392f2941cf8eef9afa23378f9224dda
Author: Carlos <wx...@gmail.com>
AuthorDate: Tue Nov 9 18:58:45 2021 +0800
DataFrame supports intersect and update readme (#1258)
---
README.md | 4 ++--
datafusion/src/dataframe.rs | 15 +++++++++++++++
datafusion/src/execution/dataframe_impl.rs | 23 +++++++++++++++++++++++
3 files changed, 40 insertions(+), 2 deletions(-)
diff --git a/README.md b/README.md
index ea946f4..a1c850d 100644
--- a/README.md
+++ b/README.md
@@ -218,8 +218,8 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
- [ ] Set Operations
- [x] UNION ALL
- [x] UNION
- - [ ] INTERSECT
- - [ ] INTERSECT ALL
+ - [x] INTERSECT
+ - [x] INTERSECT ALL
- [ ] EXCEPT
- [ ] EXCEPT ALL
- [x] Joins
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index 4bfd720..7f4a3e5 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -375,4 +375,19 @@ pub trait DataFrame: Send + Sync {
/// # }
/// ```
fn registry(&self) -> Arc<dyn FunctionRegistry>;
+
+ /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let mut ctx = ExecutionContext::new();
+ /// let df = ctx.read_csv("tests/example.csv", CsvReadOptions::new()).await?;
+ /// let df = df.intersect(df.clone())?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ fn intersect(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>>;
}
diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs
index a313cc1..c202e19 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -231,6 +231,15 @@ impl DataFrame for DataFrameImpl {
.build()?,
)))
}
+
+ fn intersect(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>> {
+ let left_plan = self.to_logical_plan();
+ let right_plan = dataframe.to_logical_plan();
+ Ok(Arc::new(DataFrameImpl::new(
+ self.ctx_state.clone(),
+ &LogicalPlanBuilder::intersect(left_plan, right_plan, true)?,
+ )))
+ }
}
#[cfg(test)]
@@ -438,6 +447,20 @@ mod tests {
task.await.expect("task completed successfully");
}
+ #[tokio::test]
+ async fn intersect() -> Result<()> {
+ let df = test_table().await?.select_columns(&["c1", "c3"])?;
+ let plan = df.intersect(df.clone())?;
+ let result = plan.to_logical_plan();
+ let expected = create_plan(
+ "SELECT c1, c3 FROM aggregate_test_100
+ INTERSECT ALL SELECT c1, c3 FROM aggregate_test_100",
+ )
+ .await?;
+ assert_same_plan(&result, &expected);
+ Ok(())
+ }
+
/// Compare the formatted string representation of two plans for equality
fn assert_same_plan(plan1: &LogicalPlan, plan2: &LogicalPlan) {
assert_eq!(format!("{:?}", plan1), format!("{:?}", plan2));