You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/07 12:11:14 UTC
[arrow-datafusion] branch main updated: Avoid per-batch field lookups in SchemaMapping (#6563)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8f7f76d6e3 Avoid per-batch field lookups in SchemaMapping (#6563)
8f7f76d6e3 is described below
commit 8f7f76d6e3a2dc7c3e43374b39402ce2548fb70f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Jun 7 13:11:06 2023 +0100
Avoid per-batch field lookups in SchemaMapping (#6563)
---
.../core/src/datasource/physical_plan/mod.rs | 122 +++++++--------------
1 file changed, 39 insertions(+), 83 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
index c78a39f7ae..fcb118569f 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -58,6 +58,7 @@ use crate::{
use datafusion_common::tree_node::{TreeNode, VisitRecursion};
use datafusion_physical_expr::expressions::Column;
+use arrow::compute::cast;
use log::{debug, warn};
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -425,46 +426,7 @@ impl SchemaAdapter {
file_schema: &Schema,
) -> Option<usize> {
let field = self.table_schema.field(index);
- file_schema.index_of(field.name()).ok()
- }
-
- /// Re-order projected columns by index in record batch to match table schema column ordering. If the record
- /// batch does not contain a column for an expected field, insert a null-valued column at the
- /// required column index.
- #[allow(dead_code)]
- pub fn adapt_batch(
- &self,
- batch: RecordBatch,
- projections: &[usize],
- ) -> Result<RecordBatch> {
- let batch_rows = batch.num_rows();
-
- let batch_schema = batch.schema();
-
- let mut cols: Vec<ArrayRef> = Vec::with_capacity(batch.columns().len());
- let batch_cols = batch.columns().to_vec();
-
- for field_idx in projections {
- let table_field = &self.table_schema.fields()[*field_idx];
- if let Some((batch_idx, _name)) =
- batch_schema.column_with_name(table_field.name().as_str())
- {
- cols.push(batch_cols[batch_idx].clone());
- } else {
- cols.push(new_null_array(table_field.data_type(), batch_rows))
- }
- }
-
- let projected_schema = Arc::new(self.table_schema.clone().project(projections)?);
-
- // Necessary to handle empty batches
- let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
-
- Ok(RecordBatch::try_new_with_options(
- projected_schema,
- cols,
- &options,
- )?)
+ Some(file_schema.fields.find(field.name())?.0)
}
/// Creates a `SchemaMapping` that can be used to cast or map the columns from the file schema to the table schema.
@@ -472,37 +434,43 @@ impl SchemaAdapter {
/// If the provided `file_schema` contains columns of a different type to the expected
/// `table_schema`, the method will attempt to cast the array data from the file schema
/// to the table schema where possible.
+ ///
+ /// Returns a [`SchemaMapping`] that can be applied to the output batch
+ /// along with an ordered list of columns to project from the file
pub fn map_schema(
&self,
file_schema: &Schema,
) -> Result<(SchemaMapping, Vec<usize>)> {
- let mut field_mappings: Vec<bool> = vec![false; self.table_schema.fields().len()];
- let mut mapped: Vec<usize> = vec![];
-
- for (idx, field) in self.table_schema.fields().iter().enumerate() {
- if let Ok(mapped_idx) = file_schema.index_of(field.name().as_str()) {
- if can_cast_types(
- file_schema.field(mapped_idx).data_type(),
- field.data_type(),
- ) {
- field_mappings[idx] = true;
- mapped.push(mapped_idx);
- } else {
- return Err(DataFusionError::Plan(format!(
- "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
- field.name(),
- file_schema.field(mapped_idx).data_type(),
- field.data_type()
- )));
+ let mut projection = Vec::with_capacity(file_schema.fields().len());
+ let mut field_mappings = vec![None; self.table_schema.fields().len()];
+
+ for (file_idx, file_field) in file_schema.fields.iter().enumerate() {
+ if let Some((table_idx, table_field)) =
+ self.table_schema.fields().find(file_field.name())
+ {
+ match can_cast_types(file_field.data_type(), table_field.data_type()) {
+ true => {
+ field_mappings[table_idx] = Some(projection.len());
+ projection.push(file_idx);
+ }
+ false => {
+ return Err(DataFusionError::Plan(format!(
+ "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}",
+ file_field.name(),
+ file_field.data_type(),
+ table_field.data_type()
+ )))
+ }
}
}
}
+
Ok((
SchemaMapping {
table_schema: self.table_schema.clone(),
field_mappings,
},
- mapped,
+ projection,
))
}
}
@@ -513,9 +481,8 @@ impl SchemaAdapter {
pub struct SchemaMapping {
/// The schema of the table. This is the expected schema after conversion and it should match the schema of the query result.
table_schema: SchemaRef,
- /// In `field_mappings`, a `true` value indicates that the corresponding field in `table_schema` exists in `file_schema`,
- /// while a `false` value indicates that the corresponding field does not exist.
- field_mappings: Vec<bool>,
+ /// Mapping from field index in `table_schema` to index in projected file_schema
+ field_mappings: Vec<Option<usize>>,
}
impl SchemaMapping {
@@ -523,34 +490,23 @@ impl SchemaMapping {
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
let batch_rows = batch.num_rows();
let batch_cols = batch.columns().to_vec();
- let batch_schema = batch.schema();
let cols = self
.table_schema
.fields()
.iter()
- .enumerate()
- .map(|(idx, field)| {
- if self.field_mappings[idx] {
- match batch_schema.index_of(field.name()) {
- Ok(batch_idx) => arrow::compute::cast(
- &batch_cols[batch_idx],
- field.data_type(),
- )
- .map_err(DataFusionError::ArrowError),
- Err(_) => Ok(new_null_array(field.data_type(), batch_rows)),
- }
- } else {
- Ok(new_null_array(field.data_type(), batch_rows))
- }
+ .zip(&self.field_mappings)
+ .map(|(field, file_idx)| match file_idx {
+ Some(batch_idx) => cast(&batch_cols[*batch_idx], field.data_type()),
+ None => Ok(new_null_array(field.data_type(), batch_rows)),
})
- .collect::<Result<Vec<_>>>()?;
+ .collect::<Result<Vec<_>, _>>()?;
// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
- let record_batch =
- RecordBatch::try_new_with_options(self.table_schema.clone(), cols, &options)?;
+ let schema = self.table_schema.clone();
+ let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}
@@ -1246,7 +1202,7 @@ mod tests {
let indices = vec![1, 2, 4];
let schema = SchemaRef::from(table_schema.project(&indices).unwrap());
let adapter = SchemaAdapter::new(schema);
- let (mapping, _) = adapter.map_schema(&file_schema).unwrap();
+ let (mapping, projection) = adapter.map_schema(&file_schema).unwrap();
let id = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
let c1 = BooleanArray::from(vec![Some(true), Some(false), Some(true)]);
@@ -1268,9 +1224,9 @@ mod tests {
],
)
.unwrap();
-
let rows_num = batch.num_rows();
- let mapped_batch = mapping.map_batch(batch).unwrap();
+ let projected = batch.project(&projection).unwrap();
+ let mapped_batch = mapping.map_batch(projected).unwrap();
assert_eq!(
mapped_batch.schema(),