You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/12/23 10:24:02 UTC
[arrow-datafusion] branch master updated: Fix SortExec discards field metadata on the output schema (#1477)
This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4012713 Fix SortExec discards field metadata on the output schema (#1477)
4012713 is described below
commit 401271377cd84dc1546827f66bda1b242860a6a8
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Dec 23 05:23:56 2021 -0500
Fix SortExec discards field metadata on the output schema (#1477)
---
datafusion/src/physical_plan/sort.rs | 64 ++++++++++++++++++++++++++++++------
1 file changed, 54 insertions(+), 10 deletions(-)
diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs
index e8898c1..dec9a91 100644
--- a/datafusion/src/physical_plan/sort.rs
+++ b/datafusion/src/physical_plan/sort.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::{
};
pub use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions};
-use arrow::datatypes::{Field, Schema, SchemaRef};
+use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, error::ArrowError};
@@ -201,15 +201,6 @@ fn sort_batch(
None,
)?;
- let schema = Arc::new(Schema::new(
- schema
- .fields()
- .iter()
- .zip(batch.columns().iter().map(|col| col.data_type()))
- .map(|(field, ty)| Field::new(field.name(), ty.clone(), field.is_nullable()))
- .collect::<Vec<_>>(),
- ));
-
// reorder all rows based on sorted indices
RecordBatch::try_new(
schema,
@@ -318,6 +309,8 @@ impl RecordBatchStream for SortStream {
#[cfg(test)]
mod tests {
+ use std::collections::{BTreeMap, HashMap};
+
use super::*;
use crate::datasource::object_store::local::LocalFileSystem;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -399,6 +392,57 @@ mod tests {
}
#[tokio::test]
+ async fn test_sort_metadata() -> Result<()> {
+ let field_metadata: BTreeMap<String, String> =
+ vec![("foo".to_string(), "bar".to_string())]
+ .into_iter()
+ .collect();
+ let schema_metadata: HashMap<String, String> =
+ vec![("baz".to_string(), "barf".to_string())]
+ .into_iter()
+ .collect();
+
+ let mut field = Field::new("field_name", DataType::UInt64, true);
+ field.set_metadata(Some(field_metadata.clone()));
+ let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone());
+ let schema = Arc::new(schema);
+
+ let data: ArrayRef =
+ Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::<UInt64Array>());
+
+ let batch = RecordBatch::try_new(schema.clone(), vec![data]).unwrap();
+ let input =
+ Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap());
+
+ let sort_exec = Arc::new(SortExec::try_new(
+ vec![PhysicalSortExpr {
+ expr: col("field_name", &schema)?,
+ options: SortOptions::default(),
+ }],
+ input,
+ )?);
+
+ let result: Vec<RecordBatch> = collect(sort_exec).await?;
+
+ let expected_data: ArrayRef =
+ Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::<UInt64Array>());
+ let expected_batch =
+ RecordBatch::try_new(schema.clone(), vec![expected_data]).unwrap();
+
+ // Data is correct
+ assert_eq!(&vec![expected_batch], &result);
+
+ // explicitlty ensure the metadata is present
+ assert_eq!(
+ result[0].schema().fields()[0].metadata(),
+ &Some(field_metadata)
+ );
+ assert_eq!(result[0].schema().metadata(), &schema_metadata);
+
+ Ok(())
+ }
+
+ #[tokio::test]
async fn test_lex_sort_by_float() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, true),