You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/12/15 08:23:59 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #8911: ARROW-10844: [Rust] [DataFusion] Allow joins after a table registration

jorgecarleitao commented on a change in pull request #8911:
URL: https://github.com/apache/arrow/pull/8911#discussion_r543137903



##########
File path: rust/datafusion/tests/dataframe.rs
##########
@@ -15,182 +15,64 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::Int32Array;
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use arrow::error::Result as ArrowResult;
-use arrow::record_batch::RecordBatch;
-
-use datafusion::datasource::datasource::Statistics;
-use datafusion::error::{DataFusionError, Result};
-use datafusion::{datasource::TableProvider, physical_plan::collect};
-
-use datafusion::execution::context::ExecutionContext;
-use datafusion::logical_plan::{col, LogicalPlan, LogicalPlanBuilder};
-use datafusion::physical_plan::{
-    ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream,
-};
-
-use futures::stream::Stream;
-use std::any::Any;
-use std::pin::Pin;
 use std::sync::Arc;
-use std::task::{Context, Poll};
-
-use async_trait::async_trait;
-
-//// Custom source dataframe tests ////
 
-struct CustomTableProvider;
-#[derive(Debug, Clone)]
-struct CustomExecutionPlan {
-    projection: Option<Vec<usize>>,
-}
-struct TestCustomRecordBatchStream {
-    /// the nb of batches of TEST_CUSTOM_RECORD_BATCH generated
-    nb_batch: i32,
-}
-macro_rules! TEST_CUSTOM_SCHEMA_REF {
-    () => {
-        Arc::new(Schema::new(vec![
-            Field::new("c1", DataType::Int32, false),
-            Field::new("c2", DataType::Int32, false),
-        ]))
-    };
-}
-macro_rules! TEST_CUSTOM_RECORD_BATCH {
-    () => {
-        RecordBatch::try_new(
-            TEST_CUSTOM_SCHEMA_REF!(),
-            vec![
-                Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
-                Arc::new(Int32Array::from(vec![2, 12, 12, 120])),
-            ],
-        )
-    };
-}
-
-impl RecordBatchStream for TestCustomRecordBatchStream {
-    fn schema(&self) -> SchemaRef {
-        TEST_CUSTOM_SCHEMA_REF!()
-    }
-}
-
-impl Stream for TestCustomRecordBatchStream {
-    type Item = ArrowResult<RecordBatch>;
-
-    fn poll_next(
-        self: Pin<&mut Self>,
-        _cx: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        if self.nb_batch > 0 {
-            self.get_mut().nb_batch -= 1;
-            Poll::Ready(Some(TEST_CUSTOM_RECORD_BATCH!()))
-        } else {
-            Poll::Ready(None)
-        }
-    }
-}
-
-#[async_trait]
-impl ExecutionPlan for CustomExecutionPlan {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-    fn schema(&self) -> SchemaRef {
-        let schema = TEST_CUSTOM_SCHEMA_REF!();
-        match &self.projection {
-            None => schema,
-            Some(p) => Arc::new(Schema::new(
-                p.iter().map(|i| schema.field(*i).clone()).collect(),
-            )),
-        }
-    }
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
-    }
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![]
-    }
-    fn with_new_children(
-        &self,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        if children.is_empty() {
-            Ok(Arc::new(self.clone()))
-        } else {
-            Err(DataFusionError::Internal(
-                "Children cannot be replaced in CustomExecutionPlan".to_owned(),
-            ))
-        }
-    }
-    async fn execute(&self, _partition: usize) -> Result<SendableRecordBatchStream> {
-        Ok(Box::pin(TestCustomRecordBatchStream { nb_batch: 1 }))
-    }
-}
-
-impl TableProvider for CustomTableProvider {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn schema(&self) -> SchemaRef {
-        TEST_CUSTOM_SCHEMA_REF!()
-    }
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::{
+    array::{Int32Array, StringArray},
+    record_batch::RecordBatch,
+};
 
-    fn scan(
-        &self,
-        projection: &Option<Vec<usize>>,
-        _batch_size: usize,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(CustomExecutionPlan {
-            projection: projection.clone(),
-        }))
-    }
+use datafusion::error::Result;
+use datafusion::{datasource::MemTable, prelude::JoinType};
 
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
-    }
-}
+use datafusion::execution::context::ExecutionContext;
 
 #[tokio::test]
-async fn custom_source_dataframe() -> Result<()> {
+async fn join() -> Result<()> {
+    let schema1 = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Int32, false),
+    ]));
+    let schema2 = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("c", DataType::Int32, false),
+    ]));
+
+    // define data.
+    let batch1 = RecordBatch::try_new(
+        schema1.clone(),
+        vec![
+            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
+        ],
+    )?;
+    // define data.
+    let batch2 = RecordBatch::try_new(
+        schema2.clone(),
+        vec![
+            Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
+            Arc::new(Int32Array::from(vec![1, 10, 10, 100])),
+        ],
+    )?;
+
     let mut ctx = ExecutionContext::new();
 
-    let table = ctx.read_table(Arc::new(CustomTableProvider))?;
-    let logical_plan = LogicalPlanBuilder::from(&table.to_logical_plan())
-        .project(vec![col("c2")])?
-        .build()?;
+    let table1 = MemTable::try_new(schema1, vec![vec![batch1]])?;
+    let table2 = MemTable::try_new(schema2, vec![vec![batch2]])?;
+
+    ctx.register_table("aa", Box::new(table1));
 
-    let optimized_plan = ctx.optimize(&logical_plan)?;
-    match &optimized_plan {
-        LogicalPlan::Projection { input, .. } => match &**input {
-            LogicalPlan::TableScan {
-                table_schema,
-                projected_schema,
-                ..
-            } => {
-                assert_eq!(table_schema.fields().len(), 2);
-                assert_eq!(projected_schema.fields().len(), 1);
-            }
-            _ => panic!("input to projection should be TableScan"),
-        },
-        _ => panic!("expect optimized_plan to be projection"),
-    }
+    let df1 = ctx.table("aa")?;
 
-    let expected = "Projection: #c2\
-        \n  TableScan: projection=Some([1])";
-    assert_eq!(format!("{:?}", optimized_plan), expected);
+    ctx.register_table("aaa", Box::new(table2));

Review comment:
       So, this was not very obvious from the diff, but there was no test for joins. The `dataframe.rs` had a test for custom sources, which I moved to its own module, and wrote a new test from scratch for this issue alone, where we join two DataFrames. This issue is only observed when we use `join`, as otherwise there is never a scan for two separate tables, which is why I wrote it as a test using joins.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org