You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/02/24 13:31:35 UTC
[arrow] branch master updated: ARROW-11452: [Rust] Fix issue with
Parquet Arrow reader not following type path
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new a83bc17 ARROW-11452: [Rust] Fix issue with Parquet Arrow reader not following type path
a83bc17 is described below
commit a83bc1792f53791ea2000a972a3c868d29b6f875
Author: Max Burke <ma...@urbanlogiq.com>
AuthorDate: Wed Feb 24 15:30:07 2021 +0200
ARROW-11452: [Rust] Fix issue with Parquet Arrow reader not following type path
Not sure where the test data file should go, but I've attached it.
[structs.parquet.zip](https://github.com/apache/arrow/files/5906689/structs.parquet.zip)
Closes #9390 from maxburke/ARROW-11452
Lead-authored-by: Max Burke <ma...@urbanlogiq.com>
Co-authored-by: Neville Dipale <ne...@gmail.com>
Signed-off-by: Neville Dipale <ne...@gmail.com>
---
cpp/submodules/parquet-testing | 2 +-
rust/parquet/src/arrow/array_reader.rs | 59 +++++++++++++++++++++++++++++-----
rust/parquet/src/arrow/arrow_reader.rs | 19 +++++++++++
rust/parquet/src/schema/types.rs | 4 +++
4 files changed, 75 insertions(+), 9 deletions(-)
diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index e31fe1a..8e7badc 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit e31fe1a02c9e9f271e4bfb8002d403c52f1ef8eb
+Subproject commit 8e7badc6a3817a02e06d17b5d8ab6b6dc356e890
diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs
index 7018785..dcdfbcb 100644
--- a/rust/parquet/src/arrow/array_reader.rs
+++ b/rust/parquet/src/arrow/array_reader.rs
@@ -1095,6 +1095,7 @@ where
for c in column_indices {
let column = parquet_schema.column(c).self_type() as *const Type;
+
leaves.insert(column, c);
let root = parquet_schema.get_column_root_ptr(c);
@@ -1395,12 +1396,11 @@ impl<'a> ArrayReaderBuilder {
self.file_reader.clone(),
)?);
- let arrow_type = self
- .arrow_schema
- .field_with_name(cur_type.name())
- .ok()
- .map(|f| f.data_type())
- .cloned();
+ let arrow_type: Option<ArrowType> = match self.get_arrow_field(&cur_type, context)
+ {
+ Some(f) => Some(f.data_type().clone()),
+ _ => None,
+ };
match cur_type.get_physical_type() {
PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
@@ -1631,9 +1631,13 @@ impl<'a> ArrayReaderBuilder {
let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
for child in cur_type.get_fields() {
+ let mut struct_context = context.clone();
if let Some(child_reader) = self.dispatch(child.clone(), context)? {
- let field = match self.arrow_schema.field_with_name(child.name()) {
- Ok(f) => f.to_owned(),
+ // TODO: this results in calling get_arrow_field twice, it could be reused
+ // from child_reader above, by making child_reader carry its `Field`
+ struct_context.path.append(vec![child.name().to_string()]);
+ let field = match self.get_arrow_field(child, &struct_context) {
+ Some(f) => f.clone(),
_ => Field::new(
child.name(),
child_reader.get_data_type().clone(),
@@ -1657,6 +1661,45 @@ impl<'a> ArrayReaderBuilder {
Ok(None)
}
}
+
+ fn get_arrow_field(
+ &self,
+ cur_type: &Type,
+ context: &'a ArrayReaderBuilderContext,
+ ) -> Option<&Field> {
+ let parts: Vec<&str> = context
+ .path
+ .parts()
+ .iter()
+ .map(|x| -> &str { x })
+ .collect::<Vec<&str>>();
+
+ // If the parts length is one it'll have the top level "schema" type. If
+ // it's two then it'll be a top-level type that we can get from the arrow
+ // schema directly.
+ if parts.len() <= 2 {
+ self.arrow_schema.field_with_name(cur_type.name()).ok()
+ } else {
+ // If it's greater than two then we need to traverse the type path
+ // until we find the actual field we're looking for.
+ let mut field: Option<&Field> = None;
+
+ for (i, part) in parts.iter().enumerate().skip(1) {
+ if i == 1 {
+ field = self.arrow_schema.field_with_name(part).ok();
+ } else if let Some(f) = field {
+ if let ArrowType::Struct(fields) = f.data_type() {
+ field = fields.iter().find(|f| f.name() == part)
+ } else {
+ field = None
+ }
+ } else {
+ field = None
+ }
+ }
+ field
+ }
+ }
}
#[cfg(test)]
diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs
index 288e043..7bbe8de 100644
--- a/rust/parquet/src/arrow/arrow_reader.rs
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -649,4 +649,23 @@ mod tests {
}
}
}
+
+ #[test]
+ fn test_read_structs() {
+ // This particular test file has columns of struct types where there is
+ // a column that has the same name as one of the struct fields
+ // (see: ARROW-11452)
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/nested_structs.rust.parquet", testdata);
+ let parquet_file_reader =
+ SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
+ let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
+ let record_batch_reader = arrow_reader
+ .get_record_reader(60)
+ .expect("Failed to read into array!");
+
+ for batch in record_batch_reader {
+ batch.unwrap();
+ }
+ }
}
diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs
index 27768fb..5c35e1c 100644
--- a/rust/parquet/src/schema/types.rs
+++ b/rust/parquet/src/schema/types.rs
@@ -561,6 +561,10 @@ impl ColumnPath {
pub fn append(&mut self, mut tail: Vec<String>) {
self.parts.append(&mut tail);
}
+
+ pub fn parts(&self) -> &[String] {
+ &self.parts
+ }
}
impl fmt::Display for ColumnPath {