You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/02/28 12:35:16 UTC

[arrow] branch master updated: ARROW-11719: [Rust][Datafusion] support creating memory table with merged schema

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f7cf157  ARROW-11719: [Rust][Datafusion] support creating memory table with merged schema
f7cf157 is described below

commit f7cf15749f2df907994f48ef0bfbed3001bf145e
Author: Qingping Hou <da...@gmail.com>
AuthorDate: Sun Feb 28 07:34:06 2021 -0500

    ARROW-11719: [Rust][Datafusion] support creating memory table with merged schema
    
    * Added `contains` method for `arrow::datatypes::Schema` and
    `arrow::datatypes::Field`
    * Relax batch schema validation using `contains` check when creating a
    MemTable in datafusion
    
    Closes #9537 from houqp/qp_schema
    
    Authored-by: Qingping Hou <da...@gmail.com>
    Signed-off-by: Andrew Lamb <an...@nerdnetworks.org>
---
 rust/arrow/src/datatypes/field.rs        | 43 +++++++++++++++
 rust/arrow/src/datatypes/schema.rs       | 36 +++++++++++++
 rust/datafusion/src/datasource/memory.rs | 92 +++++++++++++++++++++++++++++++-
 3 files changed, 169 insertions(+), 2 deletions(-)

diff --git a/rust/arrow/src/datatypes/field.rs b/rust/arrow/src/datatypes/field.rs
index cd43510..11fc31d 100644
--- a/rust/arrow/src/datatypes/field.rs
+++ b/rust/arrow/src/datatypes/field.rs
@@ -488,6 +488,49 @@ impl Field {
 
         Ok(())
     }
+
+    /// Check to see if `self` is a superset of `other` field. Superset is defined as:
+    ///
+    /// * if nullability doesn't match, self needs to be nullable
+    /// * self.metadata is a superset of other.metadata
+    /// * all other fields are equal
+    pub fn contains(&self, other: &Field) -> bool {
+        if self.name != other.name
+            || self.data_type != other.data_type
+            || self.dict_id != other.dict_id
+            || self.dict_is_ordered != other.dict_is_ordered
+        {
+            return false;
+        }
+
+        if self.nullable != other.nullable && !self.nullable {
+            return false;
+        }
+
+        // make sure self.metadata is a superset of other.metadata
+        match (&self.metadata, &other.metadata) {
+            (None, Some(_)) => {
+                return false;
+            }
+            (Some(self_meta), Some(other_meta)) => {
+                for (k, v) in other_meta.iter() {
+                    match self_meta.get(k) {
+                        Some(s) => {
+                            if s != v {
+                                return false;
+                            }
+                        }
+                        None => {
+                            return false;
+                        }
+                    }
+                }
+            }
+            _ => {}
+        }
+
+        true
+    }
 }
 
 // TODO: improve display with crate https://crates.io/crates/derive_more ?
diff --git a/rust/arrow/src/datatypes/schema.rs b/rust/arrow/src/datatypes/schema.rs
index 1e9acf7..ad89b29 100644
--- a/rust/arrow/src/datatypes/schema.rs
+++ b/rust/arrow/src/datatypes/schema.rs
@@ -279,6 +279,42 @@ impl Schema {
             )),
         }
     }
+
+    /// Check to see if `self` is a superset of `other` schema. Here are the comparision rules:
+    ///
+    /// * `self` and `other` should contain the same number of fields
+    /// * for every field `f` in `other`, the field in `self` with corresponding index should be a
+    /// superset of `f`.
+    /// * self.metadata is a superset of other.metadata
+    ///
+    /// In other words, any record conforms to `other` should also conform to `self`.
+    pub fn contains(&self, other: &Schema) -> bool {
+        if self.fields.len() != other.fields.len() {
+            return false;
+        }
+
+        for (i, field) in other.fields.iter().enumerate() {
+            if !self.fields[i].contains(field) {
+                return false;
+            }
+        }
+
+        // make sure self.metadata is a superset of other.metadata
+        for (k, v) in &other.metadata {
+            match self.metadata.get(k) {
+                Some(s) => {
+                    if s != v {
+                        return false;
+                    }
+                }
+                None => {
+                    return false;
+                }
+            }
+        }
+
+        true
+    }
 }
 
 impl fmt::Display for Schema {
diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs
index 1fc0eaa..0fafa0f 100644
--- a/rust/datafusion/src/datasource/memory.rs
+++ b/rust/datafusion/src/datasource/memory.rs
@@ -88,7 +88,7 @@ impl MemTable {
         if partitions
             .iter()
             .flatten()
-            .all(|batches| batches.schema() == schema)
+            .all(|batches| schema.contains(&batches.schema()))
         {
             let statistics = calculate_statistics(&schema, &partitions);
             debug!("MemTable statistics: {:?}", statistics);
@@ -220,6 +220,7 @@ mod tests {
     use arrow::array::Int32Array;
     use arrow::datatypes::{DataType, Field, Schema};
     use futures::StreamExt;
+    use std::collections::HashMap;
 
     #[tokio::test]
     async fn test_with_projection() -> Result<()> {
@@ -333,7 +334,7 @@ mod tests {
     }
 
     #[test]
-    fn test_schema_validation() -> Result<()> {
+    fn test_schema_validation_incompatible_column() -> Result<()> {
         let schema1 = Arc::new(Schema::new(vec![
             Field::new("a", DataType::Int32, false),
             Field::new("b", DataType::Int32, false),
@@ -365,4 +366,91 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_schema_validation_different_column_count() -> Result<()> {
+        let schema1 = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]));
+
+        let schema2 = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]));
+
+        let batch = RecordBatch::try_new(
+            schema1,
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(Int32Array::from(vec![7, 5, 9])),
+            ],
+        )?;
+
+        match MemTable::try_new(schema2, vec![vec![batch]]) {
+            Err(DataFusionError::Plan(e)) => assert_eq!(
+                "\"Mismatch between schema and batches\"",
+                format!("{:?}", e)
+            ),
+            _ => panic!("MemTable::new should have failed due to schema mismatch"),
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_merged_schema() -> Result<()> {
+        let mut metadata = HashMap::new();
+        metadata.insert("foo".to_string(), "bar".to_string());
+
+        let schema1 = Schema::new_with_metadata(
+            vec![
+                Field::new("a", DataType::Int32, false),
+                Field::new("b", DataType::Int32, false),
+                Field::new("c", DataType::Int32, false),
+            ],
+            // test for comparing metadata
+            metadata,
+        );
+
+        let schema2 = Schema::new(vec![
+            // test for comparing nullability
+            Field::new("a", DataType::Int32, true),
+            Field::new("b", DataType::Int32, false),
+            Field::new("c", DataType::Int32, false),
+        ]);
+
+        let merged_schema = Schema::try_merge(vec![schema1.clone(), schema2.clone()])?;
+
+        let batch1 = RecordBatch::try_new(
+            Arc::new(schema1),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(Int32Array::from(vec![4, 5, 6])),
+                Arc::new(Int32Array::from(vec![7, 8, 9])),
+            ],
+        )?;
+
+        let batch2 = RecordBatch::try_new(
+            Arc::new(schema2),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(Int32Array::from(vec![4, 5, 6])),
+                Arc::new(Int32Array::from(vec![7, 8, 9])),
+            ],
+        )?;
+
+        let provider =
+            MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;
+
+        let exec = provider.scan(&None, 1024, &[])?;
+        let mut it = exec.execute(0).await?;
+        let batch1 = it.next().await.unwrap()?;
+        assert_eq!(3, batch1.schema().fields().len());
+        assert_eq!(3, batch1.num_columns());
+        assert_eq!(provider.statistics().num_rows, Some(6));
+
+        Ok(())
+    }
 }