You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/04/14 14:45:22 UTC
[arrow-rs] branch master updated: Add option to skip decoding arrow metadata from parquet (#1459) (#1558)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d70193990 Add option to skip decoding arrow metadata from parquet (#1459) (#1558)
d70193990 is described below
commit d701939901a1a660d0cf2e0dc35cc3e8a92df54f
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 14 15:45:15 2022 +0100
Add option to skip decoding arrow metadata from parquet (#1459) (#1558)
* Add option to skip decoding arrow metadata from parquet (#1459)
Fix inference from null logical type (#1557)
Replace some `&Option<T>` with `Option<&T>` (#1556)
* Update parquet/src/arrow/arrow_reader.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* Fmt
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
parquet/src/arrow/array_reader.rs | 2 +-
parquet/src/arrow/arrow_reader.rs | 170 +++++++++++++++++++++++++++++++---
parquet/src/arrow/schema.rs | 36 +++----
parquet/src/file/metadata.rs | 4 +-
parquet/src/file/serialized_reader.rs | 1 -
5 files changed, 181 insertions(+), 32 deletions(-)
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index 813970658..f54e6797e 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -2084,7 +2084,7 @@ mod tests {
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
- let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), &None).unwrap();
+ let arrow_schema = parquet_to_arrow_schema(schema.as_ref(), None).unwrap();
let file = tempfile::tempfile().unwrap();
let props = WriterProperties::builder()
diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs
index 141e49c77..80323e59f 100644
--- a/parquet/src/arrow/arrow_reader.rs
+++ b/parquet/src/arrow/arrow_reader.rs
@@ -30,7 +30,7 @@ use crate::arrow::schema::{
parquet_to_arrow_schema_by_columns, parquet_to_arrow_schema_by_root_columns,
};
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::ParquetMetaData;
+use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::FileReader;
/// Arrow reader api.
@@ -78,8 +78,35 @@ pub trait ArrowReader {
T: IntoIterator<Item = usize>;
}
+#[derive(Debug, Clone, Default)]
+pub struct ArrowReaderOptions {
+ skip_arrow_metadata: bool,
+}
+
+impl ArrowReaderOptions {
+ /// Create a new [`ArrowReaderOptions`] with the default settings
+ fn new() -> Self {
+ Self::default()
+ }
+
+ /// Parquet files generated by some writers may contain embedded arrow
+ /// schema and metadata. This may not be correct or compatible with your system.
+ ///
+ /// For example:[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
+ ///
+
+ /// Set `skip_arrow_metadata` to true, to skip decoding this
+ pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
+ Self {
+ skip_arrow_metadata,
+ }
+ }
+}
+
pub struct ParquetFileArrowReader {
file_reader: Arc<dyn FileReader>,
+
+ options: ArrowReaderOptions,
}
impl ArrowReader for ParquetFileArrowReader {
@@ -87,10 +114,7 @@ impl ArrowReader for ParquetFileArrowReader {
fn get_schema(&mut self) -> Result<Schema> {
let file_metadata = self.file_reader.metadata().file_metadata();
- parquet_to_arrow_schema(
- file_metadata.schema_descr(),
- file_metadata.key_value_metadata(),
- )
+ parquet_to_arrow_schema(file_metadata.schema_descr(), self.get_kv_metadata())
}
fn get_schema_by_columns<T>(
@@ -106,13 +130,13 @@ impl ArrowReader for ParquetFileArrowReader {
parquet_to_arrow_schema_by_columns(
file_metadata.schema_descr(),
column_indices,
- file_metadata.key_value_metadata(),
+ self.get_kv_metadata(),
)
} else {
parquet_to_arrow_schema_by_root_columns(
file_metadata.schema_descr(),
column_indices,
- file_metadata.key_value_metadata(),
+ self.get_kv_metadata(),
)
}
}
@@ -154,14 +178,41 @@ impl ArrowReader for ParquetFileArrowReader {
}
impl ParquetFileArrowReader {
+ /// Create a new [`ParquetFileArrowReader`]
pub fn new(file_reader: Arc<dyn FileReader>) -> Self {
- Self { file_reader }
+ Self {
+ file_reader,
+ options: Default::default(),
+ }
+ }
+
+ /// Create a new [`ParquetFileArrowReader`] with the provided [`ArrowReaderOptions`]
+ pub fn new_with_options(
+ file_reader: Arc<dyn FileReader>,
+ options: ArrowReaderOptions,
+ ) -> Self {
+ Self {
+ file_reader,
+ options,
+ }
}
- // Expose the reader metadata
+ /// Expose the reader metadata
pub fn get_metadata(&mut self) -> ParquetMetaData {
self.file_reader.metadata().clone()
}
+
+ /// Returns the key value metadata, returns `None` if [`ArrowReaderOptions::skip_arrow_metadata`]
+ fn get_kv_metadata(&self) -> Option<&Vec<KeyValue>> {
+ if self.options.skip_arrow_metadata {
+ return None;
+ }
+
+ self.file_reader
+ .metadata()
+ .file_metadata()
+ .key_value_metadata()
+ }
}
pub struct ParquetRecordBatchReader {
@@ -245,18 +296,22 @@ mod tests {
use rand::{thread_rng, RngCore};
use serde_json::json;
use serde_json::Value::{Array as JArray, Null as JNull, Object as JObject};
+ use tempfile::tempfile;
use arrow::array::*;
- use arrow::datatypes::{DataType as ArrowDataType, Field};
+ use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::{RecordBatch, RecordBatchReader};
- use crate::arrow::arrow_reader::{ArrowReader, ParquetFileArrowReader};
+ use crate::arrow::arrow_reader::{
+ ArrowReader, ArrowReaderOptions, ParquetFileArrowReader,
+ };
use crate::arrow::converter::{
BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter,
IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter,
};
use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
+ use crate::arrow::ArrowWriter;
use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
use crate::column::writer::get_typed_column_writer_mut;
use crate::data_type::{
@@ -1238,4 +1293,97 @@ mod tests {
let val = list.value(0);
assert_eq!(val.len(), 0);
}
+
+ #[test]
+ fn test_null_schema_inference() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/null_list.parquet", testdata);
+ let reader =
+ Arc::new(SerializedFileReader::try_from(File::open(&path).unwrap()).unwrap());
+
+ let arrow_field = Field::new(
+ "emptylist",
+ ArrowDataType::List(Box::new(Field::new("item", ArrowDataType::Null, true))),
+ true,
+ );
+
+ let options = ArrowReaderOptions::default().with_skip_arrow_metadata(true);
+ let mut arrow_reader = ParquetFileArrowReader::new_with_options(reader, options);
+ let schema = arrow_reader.get_schema().unwrap();
+ assert_eq!(schema.fields().len(), 1);
+ assert_eq!(schema.field(0), &arrow_field);
+ }
+
+ #[test]
+ fn test_skip_metadata() {
+ let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
+ let field = Field::new("col", col.data_type().clone(), true);
+
+ let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
+
+ let metadata = [("key".to_string(), "value".to_string())]
+ .into_iter()
+ .collect();
+
+ let schema_with_metadata =
+ Arc::new(Schema::new(vec![field.with_metadata(Some(metadata))]));
+
+ assert_ne!(schema_with_metadata, schema_without_metadata);
+
+ let batch =
+ RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef])
+ .unwrap();
+
+ let file = |version: WriterVersion| {
+ let props = WriterProperties::builder()
+ .set_writer_version(version)
+ .build();
+
+ let file = tempfile().unwrap();
+ let mut writer = ArrowWriter::try_new(
+ file.try_clone().unwrap(),
+ batch.schema(),
+ Some(props),
+ )
+ .unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ file
+ };
+
+ let v1_reader = Arc::new(
+ SerializedFileReader::new(file(WriterVersion::PARQUET_1_0)).unwrap(),
+ );
+ let v2_reader = Arc::new(
+ SerializedFileReader::new(file(WriterVersion::PARQUET_2_0)).unwrap(),
+ );
+
+ let mut arrow_reader = ParquetFileArrowReader::new(v1_reader.clone());
+ assert_eq!(
+ &arrow_reader.get_schema().unwrap(),
+ schema_with_metadata.as_ref()
+ );
+
+ let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
+ let mut arrow_reader =
+ ParquetFileArrowReader::new_with_options(v1_reader, options);
+ assert_eq!(
+ &arrow_reader.get_schema().unwrap(),
+ schema_without_metadata.as_ref()
+ );
+
+ let mut arrow_reader = ParquetFileArrowReader::new(v2_reader.clone());
+ assert_eq!(
+ &arrow_reader.get_schema().unwrap(),
+ schema_with_metadata.as_ref()
+ );
+
+ let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
+ let mut arrow_reader =
+ ParquetFileArrowReader::new_with_options(v2_reader, options);
+ assert_eq!(
+ &arrow_reader.get_schema().unwrap(),
+ schema_without_metadata.as_ref()
+ );
+ }
}
diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs
index 049d7c4f4..b0679e361 100644
--- a/parquet/src/arrow/schema.rs
+++ b/parquet/src/arrow/schema.rs
@@ -45,7 +45,7 @@ use crate::{
/// to converting the Parquet schema column-wise
pub fn parquet_to_arrow_schema(
parquet_schema: &SchemaDescriptor,
- key_value_metadata: &Option<Vec<KeyValue>>,
+ key_value_metadata: Option<&Vec<KeyValue>>,
) -> Result<Schema> {
let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
metadata
@@ -65,7 +65,7 @@ pub fn parquet_to_arrow_schema(
pub fn parquet_to_arrow_schema_by_root_columns<T>(
parquet_schema: &SchemaDescriptor,
column_indices: T,
- key_value_metadata: &Option<Vec<KeyValue>>,
+ key_value_metadata: Option<&Vec<KeyValue>>,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
@@ -116,7 +116,7 @@ where
pub fn parquet_to_arrow_schema_by_columns<T>(
parquet_schema: &SchemaDescriptor,
column_indices: T,
- key_value_metadata: &Option<Vec<KeyValue>>,
+ key_value_metadata: Option<&Vec<KeyValue>>,
) -> Result<Schema>
where
T: IntoIterator<Item = usize>,
@@ -274,7 +274,7 @@ pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
}
fn parse_key_value_metadata(
- key_value_metadata: &Option<Vec<KeyValue>>,
+ key_value_metadata: Option<&Vec<KeyValue>>,
) -> Option<HashMap<String, String>> {
match key_value_metadata {
Some(key_values) => {
@@ -690,6 +690,8 @@ impl ParquetTypeConverter<'_> {
t.unit
))),
},
+ // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#unknown-always-null
+ (Some(LogicalType::UNKNOWN(_)), _) => Ok(DataType::Null),
(None, ConvertedType::UINT_8) => Ok(DataType::UInt8),
(None, ConvertedType::UINT_16) => Ok(DataType::UInt16),
(None, ConvertedType::UINT_32) => Ok(DataType::UInt32),
@@ -1071,7 +1073,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let arrow_fields = vec![
Field::new("boolean", DataType::Boolean, false),
@@ -1103,7 +1105,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let arrow_fields = vec![
Field::new("binary", DataType::Binary, false),
@@ -1125,7 +1127,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let arrow_fields = vec![
Field::new("boolean", DataType::Boolean, false),
@@ -1136,7 +1138,7 @@ mod tests {
let converted_arrow_schema = parquet_to_arrow_schema_by_columns(
&parquet_schema,
vec![0usize, 1usize],
- &None,
+ None,
)
.unwrap();
assert_eq!(&arrow_fields, converted_arrow_schema.fields());
@@ -1340,7 +1342,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1419,7 +1421,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1535,7 +1537,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1573,7 +1575,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1623,7 +1625,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], &None)
+ parquet_to_arrow_schema_by_columns(&parquet_schema, vec![0, 3, 4], None)
.unwrap();
let converted_fields = converted_arrow_schema.fields();
@@ -1674,7 +1676,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], &None)
+ parquet_to_arrow_schema_by_columns(&parquet_schema, vec![3, 4, 0], None)
.unwrap();
let converted_fields = converted_arrow_schema.fields();
@@ -1730,7 +1732,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &None).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, None).unwrap();
let converted_fields = converted_arrow_schema.fields();
assert_eq!(arrow_fields.len(), converted_fields.len());
@@ -1961,7 +1963,7 @@ mod tests {
let key_value_metadata = vec![
KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
- KeyValue::new("baz".to_owned(), None)
+ KeyValue::new("baz".to_owned(), None),
];
let mut expected_metadata: HashMap<String, String> = HashMap::new();
@@ -1969,7 +1971,7 @@ mod tests {
let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type));
let converted_arrow_schema =
- parquet_to_arrow_schema(&parquet_schema, &Some(key_value_metadata)).unwrap();
+ parquet_to_arrow_schema(&parquet_schema, Some(&key_value_metadata)).unwrap();
assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
}
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 899496f2b..e3de7fa7f 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -144,8 +144,8 @@ impl FileMetaData {
}
/// Returns key_value_metadata of this file.
- pub fn key_value_metadata(&self) -> &Option<Vec<KeyValue>> {
- &self.key_value_metadata
+ pub fn key_value_metadata(&self) -> Option<&Vec<KeyValue>> {
+ self.key_value_metadata.as_ref()
}
/// Returns Parquet ['Type`] that describes schema in this file.
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 4c10d26fa..012cbf1f9 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -817,7 +817,6 @@ mod tests {
.metadata
.file_metadata()
.key_value_metadata()
- .as_ref()
.unwrap();
assert_eq!(metadata.len(), 3);