You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/31 20:41:26 UTC

[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1709: Create SchemaAdapter trait to map table schema to file schemas

alamb commented on a change in pull request #1709:
URL: https://github.com/apache/arrow-datafusion/pull/1709#discussion_r796045906



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -477,44 +455,20 @@ fn read_partition(
         }
 
         let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
-        let mapped_projections =
-            map_projections(&file_schema, &arrow_reader.get_schema()?, projection)?;
+        let adapted_projections =
+            schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?;
 
         let mut batch_reader =
-            arrow_reader.get_record_reader_by_columns(mapped_projections, batch_size)?;
+            arrow_reader.get_record_reader_by_columns(adapted_projections, batch_size)?;
         loop {
             match batch_reader.next() {
                 Some(Ok(batch)) => {
-                    let total_cols = &file_schema.fields().len();
-                    let batch_rows = batch.num_rows();
                     total_rows += batch.num_rows();
 
-                    let batch_schema = batch.schema();
-
-                    let mut cols: Vec<ArrayRef> = Vec::with_capacity(*total_cols);
-                    let batch_cols = batch.columns().to_vec();
-
-                    for field_idx in projection {
-                        let merged_field = &file_schema.fields()[*field_idx];
-                        if let Some((batch_idx, _name)) =
-                            batch_schema.column_with_name(merged_field.name().as_str())
-                        {
-                            cols.push(batch_cols[batch_idx].clone());
-                        } else {
-                            cols.push(new_null_array(
-                                merged_field.data_type(),
-                                batch_rows,
-                            ))
-                        }
-                    }
-
-                    let projected_schema = file_schema.clone().project(projection)?;
-
-                    let merged_batch =
-                        RecordBatch::try_new(Arc::new(projected_schema), cols)?;
+                    let adapted_batch = schema_adapter.adapt_batch(batch, projection)?;

Review comment:
       this is very nice

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> {
     }
 }
 
+/// A utility which can adapt file-level record batches to a table schema which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from the table schema to
+///    the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to the expected columns
+///    indexes and insert null-valued columns wherever the file schema was missing a colum present
+///    in the table schema.
+#[derive(Clone, Debug)]
+pub(crate) struct SchemaAdapter {
+    /// Schema for the table
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter {
+    pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
+        Self { table_schema }
+    }
+
+    /// Map projected column indexes to the file schema. This will fail if the table schema
+    /// and the file schema contain a field with the same name and different types.
+    pub fn map_projections(

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org