You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/14 12:35:42 UTC

[GitHub] [arrow] jorgecarleitao opened a new pull request #8911: ARROW-10844: [Rust] [DataFusion] Allow joins after a table registration

jorgecarleitao opened a new pull request #8911:
URL: https://github.com/apache/arrow/pull/8911


   This PR modifies to the `ExecutionContext` necessary to run joins where `register_table` is called between creation of DataFrame.
   
   The underlying issue is that the `ExecutionContextState` was not being shared between the `DataFrame`, thereby causing them to not share newly added tables.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb closed pull request #8911: ARROW-10844: [Rust] [DataFusion] Allow joins after a table registration

Posted by GitBox <gi...@apache.org>.
alamb closed pull request #8911:
URL: https://github.com/apache/arrow/pull/8911


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8911: ARROW-10844: [Rust] [DataFusion] Allow joins after a table registration

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #8911:
URL: https://github.com/apache/arrow/pull/8911#discussion_r543137903



##########
File path: rust/datafusion/tests/dataframe.rs
##########
@@ -15,182 +15,64 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::Int32Array;
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use arrow::error::Result as ArrowResult;
-use arrow::record_batch::RecordBatch;
-
-use datafusion::datasource::datasource::Statistics;
-use datafusion::error::{DataFusionError, Result};
-use datafusion::{datasource::TableProvider, physical_plan::collect};
-
-use datafusion::execution::context::ExecutionContext;
-use datafusion::logical_plan::{col, LogicalPlan, LogicalPlanBuilder};
-use datafusion::physical_plan::{
-    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
-};
-
-use futures::stream::Stream;
-use std::any::Any;
-use std::pin::Pin;
 use std::sync::Arc;
-use std::task::{Context, Poll};
-
-use async_trait::async_trait;
-
-//// Custom source dataframe tests ////
 
-struct CustomTableProvider;
-#[derive(Debug, Clone)]
-struct CustomExecutionPlan {
-    projection: Option<Vec<usize>>,
-}
-struct TestCustomRecordBatchStream {
-    /// the nb of batches of TEST_CUSTOM_RECORD_BATCH generated
-    nb_batch: i32,
-}
-macro_rules! TEST_CUSTOM_SCHEMA_REF {
-    () => {
-        Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Int32, false),
-        ]))
-    };
-}
-macro_rules! TEST_CUSTOM_RECORD_BATCH {
-    () => {
-        RecordBatch::try_new(
-            TEST_CUSTOM_SCHEMA_REF!(),
-            vec![
-                Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
-                Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
-            ],
-        )
-    };
-}
-
-impl RecordBatchStream for TestCustomRecordBatchStream {
-    fn schema(&self) -> SchemaRef {
-        TEST_CUSTOM_SCHEMA_REF!()
-    }
-}
-
-impl Stream for TestCustomRecordBatchStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        if self.nb_batch > 0 {
-            self.get_mut().nb_batch -= 1;
-            Poll::Ready(Some(TEST_CUSTOM_RECORD_BATCH!()))
-        } else {
-            Poll::Ready(None)
-        }
-    }
-}
-
-#[async_trait]
-impl ExecutionPlan for CustomExecutionPlan {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-    fn schema(&self) -> SchemaRef {
-        let schema = TEST_CUSTOM_SCHEMA_REF!();
-        match &self.projection {
-            None => schema,
-            Some(p) => Arc::new(Schema::new(
-                p.iter().map(|i| schema.field(*i).clone()).collect(),
-            )),
-        }
-    }
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
-    }
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![]
-    }
-    fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(
-                "Children cannot be replaced in CustomExecutionPlan".to_owned(),
-            ))
-        }
-    }
-    async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
-    }
-}
-
-impl TableProvider for CustomTableProvider {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        TEST_CUSTOM_SCHEMA_REF!()
-    }
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::{
+    array::{Int32Array, StringArray},
+    record_batch::RecordBatch,
+};
 
-    fn scan(
-        &self,
-        projection: &Option<Vec<usize>>,
-        _batch_size: usize,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(CustomExecutionPlan {
-            projection: projection.clone(),
-        }))
-    }
+use datafusion::error::Result;
+use datafusion::{datasource::MemTable, prelude::JoinType};
 
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
-    }
-}
+use datafusion::execution::context::ExecutionContext;
 
 #[tokio::test]
-async fn custom_source_dataframe() -> Result<()> {
+async fn join() -> Result<()> {
+    let schema1 = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Int32, false),
+    ]));
+    let schema2 = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("c", DataType::Int32, false),
+    ]));
+
+    // define data.
+    let batch1 = RecordBatch::try_new(
+        schema1.clone(),
+        vec![
+            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
+        ],
+    )?;
+    // define data.
+    let batch2 = RecordBatch::try_new(
+        schema2.clone(),
+        vec![
+            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
+        ],
+    )?;
+
     let mut ctx = ExecutionContext::new();
 
-    let table = ctx.read_table(Arc::new(CustomTableProvider))?;
-    let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
-        .project(vec![col("c2")])?
-        .build()?;
+    let table1 = MemTable::try_new(schema1, vec![vec![batch1]])?;
+    let table2 = MemTable::try_new(schema2, vec![vec![batch2]])?;
+
+    ctx.register_table("aa", Box::new(table1));
 
-    let optimized_plan = ctx.optimize(&logical_plan)?;
-    match &optimized_plan {
-        LogicalPlan::Projection { input, .. } => match &**input {
-            LogicalPlan::TableScan {
-                table_schema,
-                projected_schema,
-                ..
-            } => {
-                assert_eq!(table_schema.fields().len(), 2);
-                assert_eq!(projected_schema.fields().len(), 1);
-            }
-            _ => panic!("input to projection should be TableScan"),
-        },
-        _ => panic!("expect optimized_plan to be projection"),
-    }
+    let df1 = ctx.table("aa")?;
 
-    let expected = "Projection: #c2\
-        \n  TableScan: projection=Some([1])";
-    assert_eq!(format!("{:?}", optimized_plan), expected);
+    ctx.register_table("aaa", Box::new(table2));

Review comment:
       So, this was not very obvious from the diff, but there was no test for joins. The `dataframe.rs` had a test for custom sources, which I moved to its own module, and wrote a new test from scratch for this issue alone, where we join two DataFrames. This issue is only observed when we use `join`, as otherwise there is never a scan for two separate tables, which is why I wrote it as a test using joins.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] alamb commented on a change in pull request #8911: ARROW-10844: [Rust] [DataFusion] Allow joins after a table registration

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #8911:
URL: https://github.com/apache/arrow/pull/8911#discussion_r542702983



##########
File path: rust/datafusion/tests/dataframe.rs
##########
@@ -15,182 +15,64 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::Int32Array;
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use arrow::error::Result as ArrowResult;
-use arrow::record_batch::RecordBatch;
-
-use datafusion::datasource::datasource::Statistics;
-use datafusion::error::{DataFusionError, Result};
-use datafusion::{datasource::TableProvider, physical_plan::collect};
-
-use datafusion::execution::context::ExecutionContext;
-use datafusion::logical_plan::{col, LogicalPlan, LogicalPlanBuilder};
-use datafusion::physical_plan::{
-    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
-};
-
-use futures::stream::Stream;
-use std::any::Any;
-use std::pin::Pin;
 use std::sync::Arc;
-use std::task::{Context, Poll};
-
-use async_trait::async_trait;
-
-//// Custom source dataframe tests ////
 
-struct CustomTableProvider;
-#[derive(Debug, Clone)]
-struct CustomExecutionPlan {
-    projection: Option<Vec<usize>>,
-}
-struct TestCustomRecordBatchStream {
-    /// the nb of batches of TEST_CUSTOM_RECORD_BATCH generated
-    nb_batch: i32,
-}
-macro_rules! TEST_CUSTOM_SCHEMA_REF {
-    () => {
-        Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Int32, false),
-        ]))
-    };
-}
-macro_rules! TEST_CUSTOM_RECORD_BATCH {
-    () => {
-        RecordBatch::try_new(
-            TEST_CUSTOM_SCHEMA_REF!(),
-            vec![
-                Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
-                Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
-            ],
-        )
-    };
-}
-
-impl RecordBatchStream for TestCustomRecordBatchStream {
-    fn schema(&self) -> SchemaRef {
-        TEST_CUSTOM_SCHEMA_REF!()
-    }
-}
-
-impl Stream for TestCustomRecordBatchStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        if self.nb_batch > 0 {
-            self.get_mut().nb_batch -= 1;
-            Poll::Ready(Some(TEST_CUSTOM_RECORD_BATCH!()))
-        } else {
-            Poll::Ready(None)
-        }
-    }
-}
-
-#[async_trait]
-impl ExecutionPlan for CustomExecutionPlan {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-    fn schema(&self) -> SchemaRef {
-        let schema = TEST_CUSTOM_SCHEMA_REF!();
-        match &self.projection {
-            None => schema,
-            Some(p) => Arc::new(Schema::new(
-                p.iter().map(|i| schema.field(*i).clone()).collect(),
-            )),
-        }
-    }
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
-    }
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![]
-    }
-    fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(
-                "Children cannot be replaced in CustomExecutionPlan".to_owned(),
-            ))
-        }
-    }
-    async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
-    }
-}
-
-impl TableProvider for CustomTableProvider {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        TEST_CUSTOM_SCHEMA_REF!()
-    }
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::{
+    array::{Int32Array, StringArray},
+    record_batch::RecordBatch,
+};
 
-    fn scan(
-        &self,
-        projection: &Option<Vec<usize>>,
-        _batch_size: usize,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(CustomExecutionPlan {
-            projection: projection.clone(),
-        }))
-    }
+use datafusion::error::Result;
+use datafusion::{datasource::MemTable, prelude::JoinType};
 
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
-    }
-}
+use datafusion::execution::context::ExecutionContext;
 
 #[tokio::test]
-async fn custom_source_dataframe() -> Result<()> {
+async fn join() -> Result<()> {
+    let schema1 = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Int32, false),
+    ]));
+    let schema2 = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("c", DataType::Int32, false),
+    ]));
+
+    // define data.
+    let batch1 = RecordBatch::try_new(
+        schema1.clone(),
+        vec![
+            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
+        ],
+    )?;
+    // define data.
+    let batch2 = RecordBatch::try_new(
+        schema2.clone(),
+        vec![
+            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
+        ],
+    )?;
+
     let mut ctx = ExecutionContext::new();
 
-    let table = ctx.read_table(Arc::new(CustomTableProvider))?;
-    let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
-        .project(vec![col("c2")])?
-        .build()?;
+    let table1 = MemTable::try_new(schema1, vec![vec![batch1]])?;
+    let table2 = MemTable::try_new(schema2, vec![vec![batch2]])?;
+
+    ctx.register_table("aa", Box::new(table1));
 
-    let optimized_plan = ctx.optimize(&logical_plan)?;
-    match &optimized_plan {
-        LogicalPlan::Projection { input, .. } => match &**input {
-            LogicalPlan::TableScan {
-                table_schema,
-                projected_schema,
-                ..
-            } => {
-                assert_eq!(table_schema.fields().len(), 2);
-                assert_eq!(projected_schema.fields().len(), 1);
-            }
-            _ => panic!("input to projection should be TableScan"),
-        },
-        _ => panic!("expect optimized_plan to be projection"),
-    }
+    let df1 = ctx.table("aa")?;
 
-    let expected = "Projection: #c2\
-        \n  TableScan: projection=Some([1])";
-    assert_eq!(format!("{:?}", optimized_plan), expected);
+    ctx.register_table("aaa", Box::new(table2));

Review comment:
       Note for other reviewers: this is the key change -- namely that once you create a data frame from an ExecutionContext, the shared state can be changed and that change is seen across all objects that were created from the ExecutionContext (e.g register a table and have previously created data frames be able to use them)
   
   I recommend making an entirely separate test (`test_shared_context` or something) rather than re-using the join test (even if that new test is mostly copy paste). The rationale for a separate test is as documentation of the expected behavior to users and future developers --  That way it will be less likely that some future developer interprets this behavior (registering table after DF was made) as an unimportant implementation detail of the join test itself

##########
File path: rust/datafusion/tests/custom_sources.rs
##########
@@ -0,0 +1,198 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       this test is 👍 Nice work @jorgecarleitao 

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -92,7 +95,7 @@ use parquet::arrow::ArrowWriter;
 /// ```
 pub struct ExecutionContext {
     /// Internal state for the context
-    pub state: ExecutionContextState,
+    pub state: Arc<Mutex<ExecutionContextState>>,

Review comment:
       ```suggestion
       /// This state is shared by all DataFrame's created from
       /// this context, and thus modifications (e.g. registering a new table) will be visible
       /// to all such DataFrames
       pub state: Arc<Mutex<ExecutionContextState>>,
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #8911: ARROW-10844: [Rust] [DataFusion] Allow joins after a table registration

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #8911:
URL: https://github.com/apache/arrow/pull/8911#issuecomment-744430320


   https://issues.apache.org/jira/browse/ARROW-10844


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] codecov-io commented on pull request #8911: ARROW-10844: [Rust] [DataFusion] Allow joins after a table registration

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #8911:
URL: https://github.com/apache/arrow/pull/8911#issuecomment-745816515


   # [Codecov](https://codecov.io/gh/apache/arrow/pull/8911?src=pr&el=h1) Report
   > Merging [#8911](https://codecov.io/gh/apache/arrow/pull/8911?src=pr&el=desc) (7283353) into [master](https://codecov.io/gh/apache/arrow/commit/26aef88b5e993ea64726e77cfa57e2a17031d934?el=desc) (26aef88) will **decrease** coverage by `0.00%`.
   > The diff coverage is `70.76%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/arrow/pull/8911/graphs/tree.svg?width=650&height=150&src=pr&token=LpTCFbqVT1)](https://codecov.io/gh/apache/arrow/pull/8911?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #8911      +/-   ##
   ==========================================
   - Coverage   83.26%   83.25%   -0.01%     
   ==========================================
     Files         195      196       +1     
     Lines       48085    48116      +31     
   ==========================================
   + Hits        40036    40059      +23     
   - Misses       8049     8057       +8     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow/pull/8911?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [rust/datafusion/src/execution/context.rs](https://codecov.io/gh/apache/arrow/pull/8911/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9leGVjdXRpb24vY29udGV4dC5ycw==) | `89.95% <43.90%> (-1.93%)` | :arrow_down: |
   | [rust/datafusion/tests/custom\_sources.rs](https://codecov.io/gh/apache/arrow/pull/8911/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3Rlc3RzL2N1c3RvbV9zb3VyY2VzLnJz) | `75.00% <75.00%> (ø)` | |
   | [rust/datafusion/src/execution/dataframe\_impl.rs](https://codecov.io/gh/apache/arrow/pull/8911/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3NyYy9leGVjdXRpb24vZGF0YWZyYW1lX2ltcGwucnM=) | `93.28% <100.00%> (+0.85%)` | :arrow_up: |
   | [rust/datafusion/tests/dataframe.rs](https://codecov.io/gh/apache/arrow/pull/8911/diff?src=pr&el=tree#diff-cnVzdC9kYXRhZnVzaW9uL3Rlc3RzL2RhdGFmcmFtZS5ycw==) | `100.00% <100.00%> (+25.00%)` | :arrow_up: |
   | [rust/parquet/src/arrow/record\_reader.rs](https://codecov.io/gh/apache/arrow/pull/8911/diff?src=pr&el=tree#diff-cnVzdC9wYXJxdWV0L3NyYy9hcnJvdy9yZWNvcmRfcmVhZGVyLnJz) | `96.25% <0.00%> (+1.71%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/arrow/pull/8911?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/arrow/pull/8911?src=pr&el=footer). Last update [3e71ea0...7283353](https://codecov.io/gh/apache/arrow/pull/8911?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org