You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/10/30 17:18:45 UTC
[GitHub] [arrow-ballista] thinkharderdev opened a new issue, #484: Ballista: Partition columns are duplicated in protobuf decoding.
thinkharderdev opened a new issue, #484:
URL: https://github.com/apache/arrow-ballista/issues/484
**Describe the bug**
Trying to read a partitioned parquet dataset while still allowing predicate pushdown on partition columns, I am manually constructing a table scan Logical plan on a manually constructed `ListingTable` which specified the partition column(s). The `ListingTable` constructor will add the partition columns to the Schema. This is then serialized and sent to the ballista scheduler which will deserialize and construct a new `ListingTable`, which will again add the partition column to the schema and result in an error when constructing the `DFSchema`
**To Reproduce**
Steps to reproduce the behavior:
```rust
// Assume we have a paritioned parquet table
let table_path = "/path/to/table";
// Construct a ListingOptions
let listing_options = ListingOptions {
file_extension: String::new(),
format: Arc::new(ParquetFormat::default()),
table_partition_cols: vec!["my-partition-column".into()],
collect_stat: true,
target_partitions: 1,
};
// Infer the schema
let schema = listing_options.infer_schema(store.clone(), table_path).await?;
// Construct a ListingTable with our provider
let provider = ListingTable::new(
store,
"/path/to/some/table".to_string(),
schema,
listing_options,
);
// Create a table scan plan
let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, Arc::new(provider), None)?
.limit(10)?
.build()?;
// Create a DistributedExecQuery
let query = Arc::new(DistributedQueryExec::new(scheduler_url, config, plan));
// Execute the query
let stream = plan.execute(0).await?;
```
**Expected behavior**
A clear and concise description of what you expected to happen.
This should work and the planner should pushdown a filter on `my-parition-column` to the physical scan so we only read parquet files from the requested partitions.
**Additional context**
Add any other context about the problem here.
A simple way to fix this would be to check in the `ListingTable` constructor whether we already have the partition columns included in the schema:
```rust
pub fn new(
object_store: Arc<dyn ObjectStore>,
table_path: String,
file_schema: SchemaRef,
options: ListingOptions,
) -> Self {
// Add the partition columns to the file schema
let mut table_fields = file_schema.fields().clone();
for part in &options.table_partition_cols {
// Only add the partition column if it doesn't already exist
if table_fields.iter().find(|f| f.name() == part).is_none() {
table_fields.push(Field::new(
part,
DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
false,
));
}
}
Self {
object_store,
table_path,
file_schema,
table_schema: Arc::new(Schema::new(table_fields)),
options,
}
}
```
When I try this locally it works in the sense that I don't get an error for duplicate fields, but I do get another error downstream. My guess is that this is because the partition column datatype is hard-coded but haven't debugged it fully.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org