You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2021/02/24 13:31:35 UTC

[arrow] branch master updated: ARROW-11452: [Rust] Fix issue with Parquet Arrow reader not following type path

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new a83bc17  ARROW-11452: [Rust] Fix issue with Parquet Arrow reader not following type path
a83bc17 is described below

commit a83bc1792f53791ea2000a972a3c868d29b6f875
Author: Max Burke <ma...@urbanlogiq.com>
AuthorDate: Wed Feb 24 15:30:07 2021 +0200

    ARROW-11452: [Rust] Fix issue with Parquet Arrow reader not following type path
    
    Not sure where the test data file should go, but I've attached it.
    
    [structs.parquet.zip](https://github.com/apache/arrow/files/5906689/structs.parquet.zip)
    
    Closes #9390 from maxburke/ARROW-11452
    
    Lead-authored-by: Max Burke <ma...@urbanlogiq.com>
    Co-authored-by: Neville Dipale <ne...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 cpp/submodules/parquet-testing         |  2 +-
 rust/parquet/src/arrow/array_reader.rs | 59 +++++++++++++++++++++++++++++-----
 rust/parquet/src/arrow/arrow_reader.rs | 19 +++++++++++
 rust/parquet/src/schema/types.rs       |  4 +++
 4 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing
index e31fe1a..8e7badc 160000
--- a/cpp/submodules/parquet-testing
+++ b/cpp/submodules/parquet-testing
@@ -1 +1 @@
-Subproject commit e31fe1a02c9e9f271e4bfb8002d403c52f1ef8eb
+Subproject commit 8e7badc6a3817a02e06d17b5d8ab6b6dc356e890
diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs
index 7018785..dcdfbcb 100644
--- a/rust/parquet/src/arrow/array_reader.rs
+++ b/rust/parquet/src/arrow/array_reader.rs
@@ -1095,6 +1095,7 @@ where
 
     for c in column_indices {
         let column = parquet_schema.column(c).self_type() as *const Type;
+
         leaves.insert(column, c);
 
         let root = parquet_schema.get_column_root_ptr(c);
@@ -1395,12 +1396,11 @@ impl<'a> ArrayReaderBuilder {
             self.file_reader.clone(),
         )?);
 
-        let arrow_type = self
-            .arrow_schema
-            .field_with_name(cur_type.name())
-            .ok()
-            .map(|f| f.data_type())
-            .cloned();
+        let arrow_type: Option<ArrowType> = match self.get_arrow_field(&cur_type, context)
+        {
+            Some(f) => Some(f.data_type().clone()),
+            _ => None,
+        };
 
         match cur_type.get_physical_type() {
             PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new(
@@ -1631,9 +1631,13 @@ impl<'a> ArrayReaderBuilder {
         let mut children_reader = Vec::with_capacity(cur_type.get_fields().len());
 
         for child in cur_type.get_fields() {
+            let mut struct_context = context.clone();
             if let Some(child_reader) = self.dispatch(child.clone(), context)? {
-                let field = match self.arrow_schema.field_with_name(child.name()) {
-                    Ok(f) => f.to_owned(),
+                // TODO: this results in calling get_arrow_field twice, it could be reused
+                // from child_reader above, by making child_reader carry its `Field`
+                struct_context.path.append(vec![child.name().to_string()]);
+                let field = match self.get_arrow_field(child, &struct_context) {
+                    Some(f) => f.clone(),
                     _ => Field::new(
                         child.name(),
                         child_reader.get_data_type().clone(),
@@ -1657,6 +1661,45 @@ impl<'a> ArrayReaderBuilder {
             Ok(None)
         }
     }
+
+    fn get_arrow_field(
+        &self,
+        cur_type: &Type,
+        context: &'a ArrayReaderBuilderContext,
+    ) -> Option<&Field> {
+        let parts: Vec<&str> = context
+            .path
+            .parts()
+            .iter()
+            .map(|x| -> &str { x })
+            .collect::<Vec<&str>>();
+
+        // If the parts length is one it'll have the top level "schema" type. If
+        // it's two then it'll be a top-level type that we can get from the arrow
+        // schema directly.
+        if parts.len() <= 2 {
+            self.arrow_schema.field_with_name(cur_type.name()).ok()
+        } else {
+            // If it's greater than two then we need to traverse the type path
+            // until we find the actual field we're looking for.
+            let mut field: Option<&Field> = None;
+
+            for (i, part) in parts.iter().enumerate().skip(1) {
+                if i == 1 {
+                    field = self.arrow_schema.field_with_name(part).ok();
+                } else if let Some(f) = field {
+                    if let ArrowType::Struct(fields) = f.data_type() {
+                        field = fields.iter().find(|f| f.name() == part)
+                    } else {
+                        field = None
+                    }
+                } else {
+                    field = None
+                }
+            }
+            field
+        }
+    }
 }
 
 #[cfg(test)]
diff --git a/rust/parquet/src/arrow/arrow_reader.rs b/rust/parquet/src/arrow/arrow_reader.rs
index 288e043..7bbe8de 100644
--- a/rust/parquet/src/arrow/arrow_reader.rs
+++ b/rust/parquet/src/arrow/arrow_reader.rs
@@ -649,4 +649,23 @@ mod tests {
             }
         }
     }
+
+    #[test]
+    fn test_read_structs() {
+        // This particular test file has columns of struct types where there is
+        // a column that has the same name as one of the struct fields
+        // (see: ARROW-11452)
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/nested_structs.rust.parquet", testdata);
+        let parquet_file_reader =
+            SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap();
+        let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(parquet_file_reader));
+        let record_batch_reader = arrow_reader
+            .get_record_reader(60)
+            .expect("Failed to read into array!");
+
+        for batch in record_batch_reader {
+            batch.unwrap();
+        }
+    }
 }
diff --git a/rust/parquet/src/schema/types.rs b/rust/parquet/src/schema/types.rs
index 27768fb..5c35e1c 100644
--- a/rust/parquet/src/schema/types.rs
+++ b/rust/parquet/src/schema/types.rs
@@ -561,6 +561,10 @@ impl ColumnPath {
     pub fn append(&mut self, mut tail: Vec<String>) {
         self.parts.append(&mut tail);
     }
+
+    pub fn parts(&self) -> &[String] {
+        &self.parts
+    }
 }
 
 impl fmt::Display for ColumnPath {