You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/07 12:11:14 UTC

[arrow-datafusion] branch main updated: Avoid per-batch field lookups in SchemaMapping (#6563)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f7f76d6e3 Avoid per-batch field lookups in SchemaMapping (#6563)
8f7f76d6e3 is described below

commit 8f7f76d6e3a2dc7c3e43374b39402ce2548fb70f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Jun 7 13:11:06 2023 +0100

    Avoid per-batch field lookups in SchemaMapping (#6563)
---
 .../core/src/datasource/physical_plan/mod.rs       | 122 +++++++--------------
 1 file changed, 39 insertions(+), 83 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
index c78a39f7ae..fcb118569f 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -58,6 +58,7 @@ use crate::{
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_physical_expr::expressions::Column;
 
+use arrow::compute::cast;
 use log::{debug, warn};
 use object_store::path::Path;
 use object_store::ObjectMeta;
@@ -425,46 +426,7 @@ impl SchemaAdapter {
         file_schema: &Schema,
     ) -> Option<usize> {
         let field = self.table_schema.field(index);
-        file_schema.index_of(field.name()).ok()
-    }
-
-    /// Re-order projected columns by index in record batch to match table schema column ordering. If the record
-    /// batch does not contain a column for an expected field, insert a null-valued column at the
-    /// required column index.
-    #[allow(dead_code)]
-    pub fn adapt_batch(
-        &self,
-        batch: RecordBatch,
-        projections: &[usize],
-    ) -> Result<RecordBatch> {
-        let batch_rows = batch.num_rows();
-
-        let batch_schema = batch.schema();
-
-        let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.columns().len());
-        let batch_cols = batch.columns().to_vec();
-
-        for field_idx in projections {
-            let table_field = &self.table_schema.fields()[*field_idx];
-            if let Some((batch_idx, _name)) =
-                batch_schema.column_with_name(table_field.name().as_str())
-            {
-                cols.push(batch_cols[batch_idx].clone());
-            } else {
-                cols.push(new_null_array(table_field.data_type(), batch_rows))
-            }
-        }
-
-        let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);
-
-        // Necessary to handle empty batches
-        let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
-
-        Ok(RecordBatch::try_new_with_options(
-            projected_schema,
-            cols,
-            &options,
-        )?)
+        Some(file_schema.fields.find(field.name())?.0)
     }
 
     /// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
@@ -472,37 +434,43 @@ impl SchemaAdapter {
     /// If the provided `file_schema` contains columns of a different type to the expected
     /// `table_schema`, the method will attempt to cast the array data from the file schema
     /// to the table schema where possible.
+    ///
+    /// Returns a [`SchemaMapping`] that can be applied to the output batch
+    /// along with an ordered list of columns to project from the file
     pub fn map_schema(
         &self,
         file_schema: &Schema,
     ) -> Result<(SchemaMapping, Vec<usize>)> {
-        let mut field_mappings: Vec<bool> = vec![false; self.table_schema.fields().len()];
-        let mut mapped: Vec<usize> = vec![];
-
-        for (idx, field) in self.table_schema.fields().iter().enumerate() {
-            if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
-                if can_cast_types(
-                    file_schema.field(mapped_idx).data_type(),
-                    field.data_type(),
-                ) {
-                    field_mappings[idx] = true;
-                    mapped.push(mapped_idx);
-                } else {
-                    return Err(DataFusionError::Plan(format!(
-                        "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
-                        field.name(),
-                        file_schema.field(mapped_idx).data_type(),
-                        field.data_type()
-                    )));
+        let mut projection = Vec::with_capacity(file_schema.fields().len());
+        let mut field_mappings = vec![None; self.table_schema.fields().len()];
+
+        for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+            if let Some((table_idx, table_field)) =
+                self.table_schema.fields().find(file_field.name())
+            {
+                match can_cast_types(file_field.data_type(), table_field.data_type()) {
+                    true => {
+                        field_mappings[table_idx] = Some(projection.len());
+                        projection.push(file_idx);
+                    }
+                    false => {
+                        return Err(DataFusionError::Plan(format!(
+                            "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
+                            file_field.name(),
+                            file_field.data_type(),
+                            table_field.data_type()
+                        )))
+                    }
                 }
             }
         }
+
         Ok((
             SchemaMapping {
                 table_schema: self.table_schema.clone(),
                 field_mappings,
             },
-            mapped,
+            projection,
         ))
     }
 }
@@ -513,9 +481,8 @@ impl SchemaAdapter {
 pub struct SchemaMapping {
     /// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result.
     table_schema: SchemaRef,
-    /// In `field_mappings`, a `true` value indicates that the corresponding field in `table_schema` exists in `file_schema`,
-    /// while a `false` value indicates that the corresponding field does not exist.
-    field_mappings: Vec<bool>,
+    /// Mapping from field index in `table_schema` to index in projected file_schema
+    field_mappings: Vec<Option<usize>>,
 }
 
 impl SchemaMapping {
@@ -523,34 +490,23 @@ impl SchemaMapping {
     fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
         let batch_rows = batch.num_rows();
         let batch_cols = batch.columns().to_vec();
-        let batch_schema = batch.schema();
 
         let cols = self
             .table_schema
             .fields()
             .iter()
-            .enumerate()
-            .map(|(idx, field)| {
-                if self.field_mappings[idx] {
-                    match batch_schema.index_of(field.name()) {
-                        Ok(batch_idx) => arrow::compute::cast(
-                            &batch_cols[batch_idx],
-                            field.data_type(),
-                        )
-                        .map_err(DataFusionError::ArrowError),
-                        Err(_) => Ok(new_null_array(field.data_type(), batch_rows)),
-                    }
-                } else {
-                    Ok(new_null_array(field.data_type(), batch_rows))
-                }
+            .zip(&self.field_mappings)
+            .map(|(field, file_idx)| match file_idx {
+                Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()),
+                None => Ok(new_null_array(field.data_type(), batch_rows)),
             })
-            .collect::<Result<Vec<_>>>()?;
+            .collect::<Result<Vec<_>, _>>()?;
 
         // Necessary to handle empty batches
         let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
 
-        let record_batch =
-            RecordBatch::try_new_with_options(self.table_schema.clone(), cols, &options)?;
+        let schema = self.table_schema.clone();
+        let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
         Ok(record_batch)
     }
 }
@@ -1246,7 +1202,7 @@ mod tests {
         let indices = vec![1, 2, 4];
         let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
         let adapter = SchemaAdapter::new(schema);
-        let (mapping, _) = adapter.map_schema(&file_schema).unwrap();
+        let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
 
         let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
         let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
@@ -1268,9 +1224,9 @@ mod tests {
             ],
         )
         .unwrap();
-
         let rows_num = batch.num_rows();
-        let mapped_batch = mapping.map_batch(batch).unwrap();
+        let projected = batch.project(&projection).unwrap();
+        let mapped_batch = mapping.map_batch(projected).unwrap();
 
         assert_eq!(
             mapped_batch.schema(),