You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Jorge Leitão (Jira)" <ji...@apache.org> on 2020/12/09 05:12:00 UTC
[jira] [Assigned] (ARROW-10844) [Rust] [DataFusion] join of two
DataFrames is not possible
[ https://issues.apache.org/jira/browse/ARROW-10844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jorge Leitão reassigned ARROW-10844:
------------------------------------
Assignee: Jorge Leitão
> [Rust] [DataFusion] join of two DataFrames is not possible
> ----------------------------------------------------------
>
> Key: ARROW-10844
> URL: https://issues.apache.org/jira/browse/ARROW-10844
> Project: Apache Arrow
> Issue Type: Bug
> Components: Rust, Rust - DataFusion
> Affects Versions: 3.0.0
> Reporter: Jorge Leitão
> Assignee: Jorge Leitão
> Priority: Blocker
> Fix For: 3.0.0
>
>
> The complete failing test:
>
> {code:java}
> use std::sync::Arc;
> use arrow::{array::{Int32Array, StringArray}, record_batch::RecordBatch};
> use arrow::datatypes::{DataType, Field, Schema};
> use datafusion::{datasource::MemTable, prelude::JoinType};
> use datafusion::error::Result;
> use datafusion::execution::context::ExecutionContext;
> #[tokio::test]
> async fn join() -> Result<()> {
> let schema1 = Arc::new(Schema::new(vec![
> Field::new("a", DataType::Utf8, false),
> Field::new("b", DataType::Int32, false),
> ]));
> let schema2 = Arc::new(Schema::new(vec![
> Field::new("a", DataType::Utf8, false),
> Field::new("c", DataType::Int32, false),
> ]));
> // define data.
> let batch1 = RecordBatch::try_new(
> schema1.clone(),
> vec![
> Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
> Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
> ],
> )?;
> // define data.
> let batch2 = RecordBatch::try_new(
> schema2.clone(),
> vec![
> Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
> Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
> ],
> )?;
> let mut ctx = ExecutionContext::new();
> let table1 = MemTable::new(schema1, vec![vec![batch1]])?;
> let table2 = MemTable::new(schema2, vec![vec![batch2]])?;
> ctx.register_table("aa", Box::new(table1));
> let df1 = ctx.table("aa")?;
> ctx.register_table("aaa", Box::new(table2));
> let df2 = ctx.table("aaa")?;
> let a = df1.join(df2, JoinType::Inner, &["a"], &["a"])?;
> let batches = a.collect().await?;
> assert_eq!(batches.len(), 1);
> Ok(())
> }
> {code}
>
> When the create dataframes via `ctx.table`, they receive a clone of the \{{ExecutionContextState}} If at a later stage the context receives a new table, that table will not be part of the state on the first DataFrame. On a Join op, the left DataFrame's state is passed to the newly created DataFrame, which is then used in collect(). Because the right side has a table not in the state of the left, the execution fails.
>
> We may need an Arc<Mutex<{{ExecutionContextState}}>> to share a common mutable state across multiple DataFrames. Alternatively, not require tables to be registered in the context to be used by DataFrames.
> Note that the current example in `DataFrame::join` docs works because the table is registered for both DataFrames.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)