You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/30 13:28:41 UTC

[GitHub] [arrow-datafusion] thinkharderdev opened a new pull request #1709: Create SchemaAdapter trait to map table schema to file schemas

thinkharderdev opened a new pull request #1709:
URL: https://github.com/apache/arrow-datafusion/pull/1709


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #1669 
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   Previously, we added the ability to merge parquet files on read when they contain mergeable schemas. This PR abstracts that functionality into a generic `SchemaAdapter` struct which can perform the basic operations of mapping projected column indexes from the merged schema to the file schema and also conforming the file record batch to the merged schema. 
   
   This was already supported in Avro/CSV/NdJson since they are already mapping projections by column name rather than index and adding null-values columns in the case of missing columns in the file. Added test cases to the relevant formats to verify this is the case. 
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   1. Add `SchemaAdapter` struct and use it in `ParquetExec`. 
   2. Add test cases for Avro/CSV/Json formats to verify existing behavior. 
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   No
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #1709: Create SchemaAdapter trait to map table schema to file schemas

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1709:
URL: https://github.com/apache/arrow-datafusion/pull/1709


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1709: Create SchemaAdapter trait to map table schema to file schemas

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1709:
URL: https://github.com/apache/arrow-datafusion/pull/1709#discussion_r796045906



##########
File path: datafusion/src/physical_plan/file_format/parquet.rs
##########
@@ -477,44 +455,20 @@ fn read_partition(
         }
 
         let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader));
-        let mapped_projections =
-            map_projections(&file_schema, &arrow_reader.get_schema()?, projection)?;
+        let adapted_projections =
+            schema_adapter.map_projections(&arrow_reader.get_schema()?, projection)?;
 
         let mut batch_reader =
-            arrow_reader.get_record_reader_by_columns(mapped_projections, batch_size)?;
+            arrow_reader.get_record_reader_by_columns(adapted_projections, batch_size)?;
         loop {
             match batch_reader.next() {
                 Some(Ok(batch)) => {
-                    let total_cols = &file_schema.fields().len();
-                    let batch_rows = batch.num_rows();
                     total_rows += batch.num_rows();
 
-                    let batch_schema = batch.schema();
-
-                    let mut cols: Vec<ArrayRef> = Vec::with_capacity(*total_cols);
-                    let batch_cols = batch.columns().to_vec();
-
-                    for field_idx in projection {
-                        let merged_field = &file_schema.fields()[*field_idx];
-                        if let Some((batch_idx, _name)) =
-                            batch_schema.column_with_name(merged_field.name().as_str())
-                        {
-                            cols.push(batch_cols[batch_idx].clone());
-                        } else {
-                            cols.push(new_null_array(
-                                merged_field.data_type(),
-                                batch_rows,
-                            ))
-                        }
-                    }
-
-                    let projected_schema = file_schema.clone().project(projection)?;
-
-                    let merged_batch =
-                        RecordBatch::try_new(Arc::new(projected_schema), cols)?;
+                    let adapted_batch = schema_adapter.adapt_batch(batch, projection)?;

Review comment:
       this is very nice

##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -165,6 +169,87 @@ impl<'a> Display for FileGroupsDisplay<'a> {
     }
 }
 
+/// A utility which can adapt file-level record batches to a table schema which may have a schema
+/// obtained from merging multiple file-level schemas.
+///
+/// This is useful for enabling schema evolution in partitioned datasets.
+///
+/// This has to be done in two stages.
+///
+/// 1. Before reading the file, we have to map projected column indexes from the table schema to
+///    the file schema.
+///
+/// 2. After reading a record batch we need to map the read columns back to the expected columns
+///    indexes and insert null-valued columns wherever the file schema was missing a colum present
+///    in the table schema.
+#[derive(Clone, Debug)]
+pub(crate) struct SchemaAdapter {
+    /// Schema for the table
+    table_schema: SchemaRef,
+}
+
+impl SchemaAdapter {
+    pub(crate) fn new(table_schema: SchemaRef) -> SchemaAdapter {
+        Self { table_schema }
+    }
+
+    /// Map projected column indexes to the file schema. This will fail if the table schema
+    /// and the file schema contain a field with the same name and different types.
+    pub fn map_projections(

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org