You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Andrew Lamb (Jira)" <ji...@apache.org> on 2020/12/16 21:31:00 UTC

[jira] [Resolved] (ARROW-10844) [Rust] [DataFusion] join of two DataFrames is not possible

     [ https://issues.apache.org/jira/browse/ARROW-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Andrew Lamb resolved ARROW-10844.
---------------------------------
    Resolution: Fixed

Issue resolved by pull request 8911
[https://github.com/apache/arrow/pull/8911]

> [Rust] [DataFusion] join of two DataFrames is not possible
> ----------------------------------------------------------
>
>                 Key: ARROW-10844
>                 URL: https://issues.apache.org/jira/browse/ARROW-10844
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust, Rust - DataFusion
>    Affects Versions: 3.0.0
>            Reporter: Jorge Leitão
>            Assignee: Jorge Leitão
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 3.0.0
>
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> The complete failing test:
>  
> {code:java}
> use std::sync::Arc;
> use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch};
> use arrow::datatypes::{DataType, Field, Schema};
> use datafusion::{datasource::MemTable, prelude::JoinType};
> use datafusion::error::Result;
> use datafusion::execution::context::ExecutionContext;
> #[tokio::test]
> async fn join() -> Result<()> {
>     let schema1 = Arc::new(Schema::new(vec![
>         Field::new("a", DataType::Utf8, false),
>         Field::new("b", DataType::Int32, false),
>     ]));
>     let schema2 = Arc::new(Schema::new(vec![
>         Field::new("a", DataType::Utf8, false),
>         Field::new("c", DataType::Int32, false),
>     ]));
>     // define data.
>     let batch1 = RecordBatch::try_new(
>         schema1.clone(),
>         vec![
>             Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>             Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>         ],
>     )?;
>     // define data.
>     let batch2 = RecordBatch::try_new(
>         schema2.clone(),
>         vec![
>             Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
>             Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
>         ],
>     )?;
>     let mut ctx = ExecutionContext::new();
>     let table1 = MemTable::new(schema1, vec![vec![batch1]])?;
>     let table2 = MemTable::new(schema2, vec![vec![batch2]])?;
>     ctx.register_table("aa", Box::new(table1));
>     let df1 = ctx.table("aa")?;
>     ctx.register_table("aaa", Box::new(table2));
>     let df2 = ctx.table("aaa")?;
>     let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?;
>     let batches = a.collect().await?;
>     assert_eq!(batches.len(), 1);
>     Ok(())
> }
> {code}
>  
> When the create dataframes via `ctx.table`, they receive a clone of the \{{ExecutionContextState}} If at a later stage the context receives a new table, that table will not be part of the state on the first DataFrame. On a Join op, the left DataFrame's state is passed to the newly created DataFrame, which is then used in collect(). Because the right side has a table not in the state of the left, the execution fails.
>  
> We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common mutable state across multiple DataFrames. Alternatively, not require tables to be registered in the context to be used by DataFrames.
> Note that the current example in `DataFrame::join` docs works because the table is registered for both DataFrames.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)