You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/30 17:18:45 UTC

[GitHub] [arrow-ballista] thinkharderdev opened a new issue, #484: Ballista: Partition columns are duplicated in protobuf decoding.

thinkharderdev opened a new issue, #484:
URL: https://github.com/apache/arrow-ballista/issues/484

   **Describe the bug**
   Trying to read a partitioned parquet dataset while still allowing predicate pushdown on partition columns, I am manually constructing a table scan Logical plan on a manually constructed `ListingTable` which specified the partition column(s). The `ListingTable` constructor will add the partition columns to the Schema. This is then serialized and sent to the ballista scheduler which will deserialize and construct a new `ListingTable`, which will again add the partition column to the schema and result in an error when constructing the `DFSchema`
   
   **To Reproduce**
   Steps to reproduce the behavior:
   ```rust
   // Assume we have a paritioned parquet table
   let table_path = "/path/to/table";
   
   // Construct a ListingOptions
       let listing_options = ListingOptions {
           file_extension: String::new(),
           format: Arc::new(ParquetFormat::default()),
           table_partition_cols: vec!["my-partition-column".into()],
           collect_stat: true,
           target_partitions: 1,
       };
   
   // Infer the schema
       let schema = listing_options.infer_schema(store.clone(), table_path).await?;
   
   // Construct a ListingTable with our provider
       let provider = ListingTable::new(
           store,
           "/path/to/some/table".to_string(),
           schema,
           listing_options,
       );
   
   // Create a table scan plan
   let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, Arc::new(provider), None)?
           .limit(10)?
           .build()?;
   
   // Create a DistributedExecQuery
   let query = Arc::new(DistributedQueryExec::new(scheduler_url, config, plan));
   
   // Execute the query
   let stream = plan.execute(0).await?;
   ```
   
   **Expected behavior**
   A clear and concise description of what you expected to happen.
   This should work and the planner should pushdown a filter on `my-parition-column` to the physical scan so we only read parquet files from the requested partitions.
   
   **Additional context**
   Add any other context about the problem here.
   A simple way to fix this would be to check in the `ListingTable` constructor whether we already have the partition columns included in the schema:
   
   ```rust
       pub fn new(
           object_store: Arc<dyn ObjectStore>,
           table_path: String,
           file_schema: SchemaRef,
           options: ListingOptions,
       ) -> Self {
           // Add the partition columns to the file schema
           let mut table_fields = file_schema.fields().clone();
           for part in &options.table_partition_cols {
              // Only add the partition column if it doesn't already exist
               if table_fields.iter().find(|f| f.name() == part).is_none() {
                   table_fields.push(Field::new(
                       part,
                       DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
                       false,
                   ));
               }
           }
   
           Self {
               object_store,
               table_path,
               file_schema,
               table_schema: Arc::new(Schema::new(table_fields)),
               options,
           }
       }
   ```
   
   When I try this locally it works in the sense that I don't get an error for duplicate fields, but I do get another error downstream. My guess is that this is because the partition column datatype is hard-coded but haven't debugged it fully.  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org