You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/14 14:45:22 UTC

[arrow-rs] branch master updated: Add option to skip decoding arrow metadata from parquet (#1459) (#1558)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d70193990 Add option to skip decoding arrow metadata from parquet (#1459) (#1558)
d70193990 is described below

commit d701939901a1a660d0cf2e0dc35cc3e8a92df54f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 14 15:45:15 2022 +0100

    Add option to skip decoding arrow metadata from parquet (#1459) (#1558)
    
    * Add option to skip decoding arrow metadata from parquet (#1459)
    
    Fix inference from null logical type (#1557)
    
    Replace some `&Option<T>` with `Option<&T>` (#1556)
    
    * Update parquet/src/arrow/arrow_reader.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Fmt
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 parquet/src/arrow/array_reader.rs     |   2 +-
 parquet/src/arrow/arrow_reader.rs     | 170 +++++++++++++++++++++++++++++++---
 parquet/src/arrow/schema.rs           |  36 +++----
 parquet/src/file/metadata.rs          |   4 +-
 parquet/src/file/serialized_reader.rs |   1 -
 5 files changed, 181 insertions(+), 32 deletions(-)

diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index 813970658..f54e6797e 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -2084,7 +2084,7 @@ mod tests {
             .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
             .unwrap();
 
-        let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), &None).unwrap();
+        let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
 
         let file = tempfile::tempfile().unwrap();
         let props = WriterProperties::builder()
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 141e49c77..80323e59f 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -30,7 +30,7 @@ use crate::arrow::schema::{
     parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
 };
 use crate::errors::{ParquetError, Result};
-use crate::file::metadata::ParquetMetaData;
+use crate::file::metadata::{KeyValue, ParquetMetaData};
 use crate::file::reader::FileReader;
 
 /// Arrow reader api.
@@ -78,8 +78,35 @@ pub trait ArrowReader {
         T: IntoIterator<Item = usize>;
 }
 
+#[derive(Debug, Clone, Default)]
+pub struct ArrowReaderOptions {
+    skip_arrow_metadata: bool,
+}
+
+impl ArrowReaderOptions {
+    /// Create a new [`ArrowReaderOptions`] with the default settings
+    fn new() -> Self {
+        Self::default()
+    }
+
+    /// Parquet files generated by some writers may contain embedded arrow
+    /// schema and metadata. This may not be correct or compatible with your system.
+    ///
+    /// For example:[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
+    ///
+
+    /// Set `skip_arrow_metadata` to true, to skip decoding this
+    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
+        Self {
+            skip_arrow_metadata,
+        }
+    }
+}
+
 pub struct ParquetFileArrowReader {
     file_reader: Arc<dyn FileReader>,
+
+    options: ArrowReaderOptions,
 }
 
 impl ArrowReader for ParquetFileArrowReader {
@@ -87,10 +114,7 @@ impl ArrowReader for ParquetFileArrowReader {
 
     fn get_schema(&mut self) -> Result<Schema> {
         let file_metadata = self.file_reader.metadata().file_metadata();
-        parquet_to_arrow_schema(
-            file_metadata.schema_descr(),
-            file_metadata.key_value_metadata(),
-        )
+        parquet_to_arrow_schema(file_metadata.schema_descr(), self.get_kv_metadata())
     }
 
     fn get_schema_by_columns<T>(
@@ -106,13 +130,13 @@ impl ArrowReader for ParquetFileArrowReader {
             parquet_to_arrow_schema_by_columns(
                 file_metadata.schema_descr(),
                 column_indices,
-                file_metadata.key_value_metadata(),
+                self.get_kv_metadata(),
             )
         } else {
             parquet_to_arrow_schema_by_root_columns(
                 file_metadata.schema_descr(),
                 column_indices,
-                file_metadata.key_value_metadata(),
+                self.get_kv_metadata(),
             )
         }
     }
@@ -154,14 +178,41 @@ impl ArrowReader for ParquetFileArrowReader {
 }
 
 impl ParquetFileArrowReader {
+    /// Create a new [`ParquetFileArrowReader`]
     pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
-        Self { file_reader }
+        Self {
+            file_reader,
+            options: Default::default(),
+        }
+    }
+
+    /// Create a new [`ParquetFileArrowReader`] with the provided [`ArrowReaderOptions`]
+    pub fn new_with_options(
+        file_reader: Arc<dyn FileReader>,
+        options: ArrowReaderOptions,
+    ) -> Self {
+        Self {
+            file_reader,
+            options,
+        }
     }
 
-    // Expose the reader metadata
+    /// Expose the reader metadata
     pub fn get_metadata(&mut self) -> ParquetMetaData {
         self.file_reader.metadata().clone()
     }
+
+    /// Returns the key value metadata, returns `None` if [`ArrowReaderOptions::skip_arrow_metadata`]
+    fn get_kv_metadata(&self) -> Option<&Vec<KeyValue>> {
+        if self.options.skip_arrow_metadata {
+            return None;
+        }
+
+        self.file_reader
+            .metadata()
+            .file_metadata()
+            .key_value_metadata()
+    }
 }
 
 pub struct ParquetRecordBatchReader {
@@ -245,18 +296,22 @@ mod tests {
     use rand::{thread_rng, RngCore};
     use serde_json::json;
     use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
+    use tempfile::tempfile;
 
     use arrow::array::*;
-    use arrow::datatypes::{DataType as ArrowDataType, Field};
+    use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
     use arrow::error::Result as ArrowResult;
     use arrow::record_batch::{RecordBatch, RecordBatchReader};
 
-    use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
+    use crate::arrow::arrow_reader::{
+        ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
+    };
     use crate::arrow::converter::{
         BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
         IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
     };
     use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
+    use crate::arrow::ArrowWriter;
     use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
     use crate::column::writer::get_typed_column_writer_mut;
     use crate::data_type::{
@@ -1238,4 +1293,97 @@ mod tests {
         let val = list.value(0);
         assert_eq!(val.len(), 0);
     }
+
+    #[test]
+    fn test_null_schema_inference() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/null_list.parquet", testdata);
+        let reader =
+            Arc::new(SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap());
+
+        let arrow_field = Field::new(
+            "emptylist",
+            ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Null, true))),
+            true,
+        );
+
+        let options = ArrowReaderOptions::default().with_skip_arrow_metadata(true);
+        let mut arrow_reader = ParquetFileArrowReader::new_with_options(reader, options);
+        let schema = arrow_reader.get_schema().unwrap();
+        assert_eq!(schema.fields().len(), 1);
+        assert_eq!(schema.field(0), &arrow_field);
+    }
+
+    #[test]
+    fn test_skip_metadata() {
+        let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
+        let field = Field::new("col", col.data_type().clone(), true);
+
+        let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
+
+        let metadata = [("key".to_string(), "value".to_string())]
+            .into_iter()
+            .collect();
+
+        let schema_with_metadata =
+            Arc::new(Schema::new(vec![field.with_metadata(Some(metadata))]));
+
+        assert_ne!(schema_with_metadata, schema_without_metadata);
+
+        let batch =
+            RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef])
+                .unwrap();
+
+        let file = |version: WriterVersion| {
+            let props = WriterProperties::builder()
+                .set_writer_version(version)
+                .build();
+
+            let file = tempfile().unwrap();
+            let mut writer = ArrowWriter::try_new(
+                file.try_clone().unwrap(),
+                batch.schema(),
+                Some(props),
+            )
+            .unwrap();
+            writer.write(&batch).unwrap();
+            writer.close().unwrap();
+            file
+        };
+
+        let v1_reader = Arc::new(
+            SerializedFileReader::new(file(WriterVersion::PARQUET_1_0)).unwrap(),
+        );
+        let v2_reader = Arc::new(
+            SerializedFileReader::new(file(WriterVersion::PARQUET_2_0)).unwrap(),
+        );
+
+        let mut arrow_reader = ParquetFileArrowReader::new(v1_reader.clone());
+        assert_eq!(
+            &arrow_reader.get_schema().unwrap(),
+            schema_with_metadata.as_ref()
+        );
+
+        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
+        let mut arrow_reader =
+            ParquetFileArrowReader::new_with_options(v1_reader, options);
+        assert_eq!(
+            &arrow_reader.get_schema().unwrap(),
+            schema_without_metadata.as_ref()
+        );
+
+        let mut arrow_reader = ParquetFileArrowReader::new(v2_reader.clone());
+        assert_eq!(
+            &arrow_reader.get_schema().unwrap(),
+            schema_with_metadata.as_ref()
+        );
+
+        let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
+        let mut arrow_reader =
+            ParquetFileArrowReader::new_with_options(v2_reader, options);
+        assert_eq!(
+            &arrow_reader.get_schema().unwrap(),
+            schema_without_metadata.as_ref()
+        );
+    }
 }
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index 049d7c4f4..b0679e361 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -45,7 +45,7 @@ use crate::{
 /// to converting the Parquet schema column-wise
 pub fn parquet_to_arrow_schema(
     parquet_schema: &SchemaDescriptor,
-    key_value_metadata: &Option<Vec<KeyValue>>,
+    key_value_metadata: Option<&Vec<KeyValue>>,
 ) -> Result<Schema> {
     let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
     metadata
@@ -65,7 +65,7 @@ pub fn parquet_to_arrow_schema(
 pub fn parquet_to_arrow_schema_by_root_columns<T>(
     parquet_schema: &SchemaDescriptor,
     column_indices: T,
-    key_value_metadata: &Option<Vec<KeyValue>>,
+    key_value_metadata: Option<&Vec<KeyValue>>,
 ) -> Result<Schema>
 where
     T: IntoIterator<Item = usize>,
@@ -116,7 +116,7 @@ where
 pub fn parquet_to_arrow_schema_by_columns<T>(
     parquet_schema: &SchemaDescriptor,
     column_indices: T,
-    key_value_metadata: &Option<Vec<KeyValue>>,
+    key_value_metadata: Option<&Vec<KeyValue>>,
 ) -> Result<Schema>
 where
     T: IntoIterator<Item = usize>,
@@ -274,7 +274,7 @@ pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
 }
 
 fn parse_key_value_metadata(
-    key_value_metadata: &Option<Vec<KeyValue>>,
+    key_value_metadata: Option<&Vec<KeyValue>>,
 ) -> Option<HashMap<String, String>> {
     match key_value_metadata {
         Some(key_values) => {
@@ -690,6 +690,8 @@ impl ParquetTypeConverter<'_> {
                     t.unit
                 ))),
             },
+            // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
+            (Some(LogicalType::UNKNOWN(_)), _) => Ok(DataType::Null),
             (None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
             (None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
             (None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
@@ -1071,7 +1073,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
 
         let arrow_fields = vec![
             Field::new("boolean", DataType::Boolean, false),
@@ -1103,7 +1105,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
 
         let arrow_fields = vec![
             Field::new("binary", DataType::Binary, false),
@@ -1125,7 +1127,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
 
         let arrow_fields = vec![
             Field::new("boolean", DataType::Boolean, false),
@@ -1136,7 +1138,7 @@ mod tests {
         let converted_arrow_schema = parquet_to_arrow_schema_by_columns(
             &parquet_schema,
             vec![0usize, 1usize],
-            &None,
+            None,
         )
         .unwrap();
         assert_eq!(&arrow_fields, converted_arrow_schema.fields());
@@ -1340,7 +1342,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
         assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1419,7 +1421,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
         assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1535,7 +1537,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
         assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1573,7 +1575,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
         assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1623,7 +1625,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], &None)
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None)
                 .unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
@@ -1674,7 +1676,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], &None)
+            parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None)
                 .unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
@@ -1730,7 +1732,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, None).unwrap();
         let converted_fields = converted_arrow_schema.fields();
 
         assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1961,7 +1963,7 @@ mod tests {
 
         let key_value_metadata = vec![
             KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
-            KeyValue::new("baz".to_owned(), None)
+            KeyValue::new("baz".to_owned(), None),
         ];
 
         let mut expected_metadata: HashMap<String, String> = HashMap::new();
@@ -1969,7 +1971,7 @@ mod tests {
 
         let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
         let converted_arrow_schema =
-            parquet_to_arrow_schema(&parquet_schema, &Some(key_value_metadata)).unwrap();
+            parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
 
         assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
     }
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 899496f2b..e3de7fa7f 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -144,8 +144,8 @@ impl FileMetaData {
     }
 
     /// Returns key_value_metadata of this file.
-    pub fn key_value_metadata(&self) -> &Option<Vec<KeyValue>> {
-        &self.key_value_metadata
+    pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
+        self.key_value_metadata.as_ref()
     }
 
     /// Returns Parquet ['Type`] that describes schema in this file.
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 4c10d26fa..012cbf1f9 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -817,7 +817,6 @@ mod tests {
             .metadata
             .file_metadata()
             .key_value_metadata()
-            .as_ref()
             .unwrap();
 
         assert_eq!(metadata.len(), 3);