You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "e1ijah1 (via GitHub)" <gi...@apache.org> on 2023/05/22 03:33:25 UTC

[GitHub] [arrow-datafusion] e1ijah1 opened a new pull request, #6404: feat: support type cast in SchemaAdapter

e1ijah1 opened a new pull request, #6404:
URL: https://github.com/apache/arrow-datafusion/pull/6404

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #6381
   
   # Rationale for this change
   
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   introduces a `map_schema` method that maps the file schema to the table schema, and a `SchemaMapping` structure to hold the mapping and necessary type conversions. 
   
   # Are these changes tested?
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are they covered by existing tests)?
   -->
   
   Yes
   
   # Are there any user-facing changes?
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6404: feat: support type cast in SchemaAdapter

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6404:
URL: https://github.com/apache/arrow-datafusion/pull/6404#discussion_r1200663208


##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -450,6 +451,69 @@ impl SchemaAdapter {
             &options,
         )?)
     }
+
+    /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
+    ///
+    /// If the provided `file_schema` contains columns of a different type to the expected
+    /// `table_schema`, the method will attempt to cast the array data from the file schema
+    /// to the table schema where possible.
+    #[allow(dead_code)]
+    pub fn map_schema(&self, file_schema: &Schema) -> Result<SchemaMapping> {
+        let mut field_mappings = Vec::new();
+
+        for (idx, field) in self.table_schema.fields().iter().enumerate() {
+            match file_schema.field_with_name(field.name()) {
+                Ok(file_field) => {
+                    if can_cast_types(file_field.data_type(), field.data_type()) {
+                        field_mappings.push((idx, field.data_type().clone()))
+                    } else {
+                        return Err(DataFusionError::Plan(format!(
+                            "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
+                            field.name(),
+                            file_field.data_type(),
+                            field.data_type()
+                        )));
+                    }
+                }
+                Err(_) => {
+                    return Err(DataFusionError::Plan(format!(
+                        "File schema does not contain expected field {}",
+                        field.name()
+                    )));
+                }
+            }
+        }
+        Ok(SchemaMapping {
+            table_schema: self.table_schema.clone(),
+            field_mappings,
+        })
+    }
+}
+
+/// The SchemaMapping struct holds a mapping from the file schema to the table schema
+/// and any necessary type conversions that need to be applied.
+pub struct SchemaMapping {
+    #[allow(dead_code)]
+    table_schema: SchemaRef,
+    #[allow(dead_code)]
+    field_mappings: Vec<(usize, DataType)>,
+}
+
+impl SchemaMapping {
+    /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
+    #[allow(dead_code)]
+    fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
+        let mut mapped_cols = Vec::with_capacity(self.field_mappings.len());
+
+        for (idx, data_type) in &self.field_mappings {
+            let array = batch.column(*idx);
+            let casted_array = arrow::compute::cast(array, data_type)?;
+            mapped_cols.push(casted_array);
+        }
+
+        let record_batch = RecordBatch::try_new(self.table_schema.clone(), mapped_cols)?;

Review Comment:
   I think this should use try_new_with_options to match adapt_batch above



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -1124,6 +1191,138 @@ mod tests {
         assert!(mapped.is_err());
     }
 
+    #[test]
+    fn schema_adapter_map_schema() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]));
+
+        let adapter = SchemaAdapter::new(table_schema.clone());
+
+        // file schema matches table schema
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema has columns of a different but castable type
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true), // can be casted to UInt64
+            Field::new("c3", DataType::Float32, true), // can be casted to Float64
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema lacks necessary columns
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema);
+
+        assert!(
+            mapping.is_err(),
+            "Mapping should fail if a necessary column is missing."
+        );
+
+        // file schema has columns of a different and non-castable type
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true),
+            Field::new("c3", DataType::Date64, true), // cannot be casted to Float64
+        ]);
+        let mapping = adapter.map_schema(&file_schema);
+
+        assert!(
+            mapping.is_err(),
+            "Mapping should fail if a column cannot be casted to the required type."
+        );
+    }
+
+    #[test]
+    fn schema_mapping_map_batch() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt32, true),
+            Field::new("c3", DataType::Float64, true),
+        ]));
+
+        let adapter = SchemaAdapter::new(table_schema.clone());
+
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float32, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).expect("map schema failed");
+
+        let c1 = StringArray::from(vec!["hello", "world"]);
+        let c2 = UInt64Array::from(vec![9_u64, 5_u64]);
+        let c3 = Float32Array::from(vec![2.0_f32, 7.0_f32]);
+        let batch = RecordBatch::try_new(
+            Arc::new(file_schema),
+            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
+        )
+        .unwrap();
+
+        let mapped_batch = mapping.map_batch(batch).unwrap();
+
+        assert_eq!(mapped_batch.schema(), table_schema);
+        assert_eq!(mapped_batch.num_columns(), 3);
+        assert_eq!(mapped_batch.num_rows(), 2);
+
+        let c1 = mapped_batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let c2 = mapped_batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<UInt32Array>()
+            .unwrap();
+        let c3 = mapped_batch
+            .column(2)
+            .as_any()
+            .downcast_ref::<Float64Array>()
+            .unwrap();

Review Comment:
   ```suggestion
               .as_primitive::<Float64Type>();
   ```



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -1124,6 +1191,138 @@ mod tests {
         assert!(mapped.is_err());
     }
 
+    #[test]
+    fn schema_adapter_map_schema() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]));
+
+        let adapter = SchemaAdapter::new(table_schema.clone());
+
+        // file schema matches table schema
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema has columns of a different but castable type
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true), // can be casted to UInt64
+            Field::new("c3", DataType::Float32, true), // can be casted to Float64
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema lacks necessary columns
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema);
+
+        assert!(
+            mapping.is_err(),
+            "Mapping should fail if a necessary column is missing."
+        );
+
+        // file schema has columns of a different and non-castable type
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true),
+            Field::new("c3", DataType::Date64, true), // cannot be casted to Float64
+        ]);
+        let mapping = adapter.map_schema(&file_schema);
+
+        assert!(
+            mapping.is_err(),
+            "Mapping should fail if a column cannot be casted to the required type."
+        );
+    }
+
+    #[test]
+    fn schema_mapping_map_batch() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt32, true),
+            Field::new("c3", DataType::Float64, true),
+        ]));
+
+        let adapter = SchemaAdapter::new(table_schema.clone());
+
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float32, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).expect("map schema failed");
+
+        let c1 = StringArray::from(vec!["hello", "world"]);
+        let c2 = UInt64Array::from(vec![9_u64, 5_u64]);
+        let c3 = Float32Array::from(vec![2.0_f32, 7.0_f32]);
+        let batch = RecordBatch::try_new(
+            Arc::new(file_schema),
+            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
+        )
+        .unwrap();
+
+        let mapped_batch = mapping.map_batch(batch).unwrap();
+
+        assert_eq!(mapped_batch.schema(), table_schema);
+        assert_eq!(mapped_batch.num_columns(), 3);
+        assert_eq!(mapped_batch.num_rows(), 2);
+
+        let c1 = mapped_batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();

Review Comment:
   ```suggestion
               .as_string::<i32>()
   ```



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -1124,6 +1191,138 @@ mod tests {
         assert!(mapped.is_err());
     }
 
+    #[test]
+    fn schema_adapter_map_schema() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]));
+
+        let adapter = SchemaAdapter::new(table_schema.clone());
+
+        // file schema matches table schema
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema has columns of a different but castable type
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true), // can be casted to UInt64
+            Field::new("c3", DataType::Float32, true), // can be casted to Float64
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema lacks necessary columns
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema);
+
+        assert!(

Review Comment:
   We could perhaps do `unwrap_err().to_string()` and check the error message (to ensure that it is failing for the right reasons



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -1124,6 +1191,138 @@ mod tests {
         assert!(mapped.is_err());
     }
 
+    #[test]
+    fn schema_adapter_map_schema() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]));
+
+        let adapter = SchemaAdapter::new(table_schema.clone());
+
+        // file schema matches table schema
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float64, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema has columns of a different but castable type
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true), // can be casted to UInt64
+            Field::new("c3", DataType::Float32, true), // can be casted to Float64
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).unwrap();
+
+        assert_eq!(
+            mapping.field_mappings,
+            vec![
+                (0, DataType::Utf8),
+                (1, DataType::UInt64),
+                (2, DataType::Float64),
+            ]
+        );
+        assert_eq!(mapping.table_schema, table_schema);
+
+        // file schema lacks necessary columns
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema);
+
+        assert!(
+            mapping.is_err(),
+            "Mapping should fail if a necessary column is missing."
+        );
+
+        // file schema has columns of a different and non-castable type
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::Int32, true),
+            Field::new("c3", DataType::Date64, true), // cannot be casted to Float64
+        ]);
+        let mapping = adapter.map_schema(&file_schema);
+
+        assert!(
+            mapping.is_err(),
+            "Mapping should fail if a column cannot be casted to the required type."
+        );
+    }
+
+    #[test]
+    fn schema_mapping_map_batch() {
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt32, true),
+            Field::new("c3", DataType::Float64, true),
+        ]));
+
+        let adapter = SchemaAdapter::new(table_schema.clone());
+
+        let file_schema = Schema::new(vec![
+            Field::new("c1", DataType::Utf8, true),
+            Field::new("c2", DataType::UInt64, true),
+            Field::new("c3", DataType::Float32, true),
+        ]);
+
+        let mapping = adapter.map_schema(&file_schema).expect("map schema failed");
+
+        let c1 = StringArray::from(vec!["hello", "world"]);
+        let c2 = UInt64Array::from(vec![9_u64, 5_u64]);
+        let c3 = Float32Array::from(vec![2.0_f32, 7.0_f32]);
+        let batch = RecordBatch::try_new(
+            Arc::new(file_schema),
+            vec![Arc::new(c1), Arc::new(c2), Arc::new(c3)],
+        )
+        .unwrap();
+
+        let mapped_batch = mapping.map_batch(batch).unwrap();
+
+        assert_eq!(mapped_batch.schema(), table_schema);
+        assert_eq!(mapped_batch.num_columns(), 3);
+        assert_eq!(mapped_batch.num_rows(), 2);
+
+        let c1 = mapped_batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        let c2 = mapped_batch
+            .column(1)
+            .as_any()
+            .downcast_ref::<UInt32Array>()
+            .unwrap();

Review Comment:
   ```suggestion
               .as_primitive::<UInt32Type>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] e1ijah1 commented on pull request #6404: feat: support type cast in SchemaAdapter

Posted by "e1ijah1 (via GitHub)" <gi...@apache.org>.
e1ijah1 commented on PR #6404:
URL: https://github.com/apache/arrow-datafusion/pull/6404#issuecomment-1558727801

   > This is really nice, thank you. Left some minor comments, but this is also good to go as is
   
   Thank you for taking the time to review my PR 🙏. I have updated the code. Please take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold merged pull request #6404: feat: support type cast in SchemaAdapter

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #6404:
URL: https://github.com/apache/arrow-datafusion/pull/6404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org