You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2020/09/25 16:02:33 UTC

[arrow] 02/03: ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata

This is an automated email from the ASF dual-hosted git repository.

nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 28b075d2481d3cf47542d96bff42e843845be4c6
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Tue Aug 18 18:39:37 2020 +0200

    ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata
    
    This will allow preserving Arrow-specific metadata when writing or reading Parquet files created from C++ or Rust.
    If the schema can't be deserialised, the normal Parquet > Arrow schema conversion is performed.
    
    Closes #7917 from nevi-me/ARROW-8243
    
    Authored-by: Neville Dipale <ne...@gmail.com>
    Signed-off-by: Neville Dipale <ne...@gmail.com>
---
 rust/parquet/Cargo.toml                |   3 +-
 rust/parquet/src/arrow/arrow_writer.rs |  27 ++-
 rust/parquet/src/arrow/mod.rs          |   4 +
 rust/parquet/src/arrow/schema.rs       | 306 ++++++++++++++++++++++++++++-----
 rust/parquet/src/file/properties.rs    |   6 +-
 5 files changed, 290 insertions(+), 56 deletions(-)

diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml
index 50d7c34..60e43c9 100644
--- a/rust/parquet/Cargo.toml
+++ b/rust/parquet/Cargo.toml
@@ -40,6 +40,7 @@ zstd = { version = "0.5", optional = true }
 chrono = "0.4"
 num-bigint = "0.3"
 arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT", optional = true }
+base64 = { version = "*", optional = true }
 
 [dev-dependencies]
 rand = "0.7"
@@ -52,4 +53,4 @@ arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT" }
 serde_json = { version = "1.0", features = ["preserve_order"] }
 
 [features]
-default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"]
+default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs
index 0c1c490..1ca8d50 100644
--- a/rust/parquet/src/arrow/arrow_writer.rs
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -24,6 +24,7 @@ use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
 use arrow::record_batch::RecordBatch;
 use arrow_array::Array;
 
+use super::schema::add_encoded_arrow_schema_to_metadata;
 use crate::column::writer::ColumnWriter;
 use crate::errors::{ParquetError, Result};
 use crate::file::properties::WriterProperties;
@@ -53,17 +54,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
     pub fn try_new(
         writer: W,
         arrow_schema: SchemaRef,
-        props: Option<Rc<WriterProperties>>,
+        props: Option<WriterProperties>,
     ) -> Result<Self> {
         let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
-        let props = match props {
-            Some(props) => props,
-            None => Rc::new(WriterProperties::builder().build()),
-        };
+        // add serialized arrow schema
+        let mut props = props.unwrap_or_else(|| WriterProperties::builder().build());
+        add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+
         let file_writer = SerializedFileWriter::new(
             writer.try_clone()?,
             schema.root_schema_ptr(),
-            props,
+            Rc::new(props),
         )?;
 
         Ok(Self {
@@ -495,7 +496,7 @@ mod tests {
     use arrow::record_batch::{RecordBatch, RecordBatchReader};
 
     use crate::arrow::{ArrowReader, ParquetFileArrowReader};
-    use crate::file::reader::SerializedFileReader;
+    use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
     use crate::util::test_common::get_temp_file;
 
     #[test]
@@ -584,7 +585,7 @@ mod tests {
         )
         .unwrap();
 
-        let mut file = get_temp_file("test_arrow_writer.parquet", &[]);
+        let mut file = get_temp_file("test_arrow_writer_binary.parquet", &[]);
         let mut writer =
             ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
                 .unwrap();
@@ -674,8 +675,16 @@ mod tests {
         )
         .unwrap();
 
+        let props = WriterProperties::builder()
+            .set_key_value_metadata(Some(vec![KeyValue {
+                key: "test_key".to_string(),
+                value: Some("test_value".to_string()),
+            }]))
+            .build();
+
         let file = get_temp_file("test_arrow_writer_complex.parquet", &[]);
-        let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+        let mut writer =
+            ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
         writer.write(&batch).unwrap();
         writer.close().unwrap();
     }
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index c8739c2..2bdb07c 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -58,6 +58,10 @@ pub mod schema;
 
 pub use self::arrow_reader::ArrowReader;
 pub use self::arrow_reader::ParquetFileArrowReader;
+pub use self::arrow_writer::ArrowWriter;
 pub use self::schema::{
     arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
 };
+
+/// Schema metadata key used to store serialized Arrow IPC schema
+pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs
index aebb9e7..d4cfe1f 100644
--- a/rust/parquet/src/arrow/schema.rs
+++ b/rust/parquet/src/arrow/schema.rs
@@ -26,24 +26,33 @@
 use std::collections::{HashMap, HashSet};
 use std::rc::Rc;
 
+use arrow::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit};
+
 use crate::basic::{LogicalType, Repetition, Type as PhysicalType};
 use crate::errors::{ParquetError::ArrowError, Result};
-use crate::file::metadata::KeyValue;
+use crate::file::{metadata::KeyValue, properties::WriterProperties};
 use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr};
 
-use arrow::datatypes::TimeUnit;
-use arrow::datatypes::{DataType, DateUnit, Field, Schema};
-
-/// Convert parquet schema to arrow schema including optional metadata.
+/// Convert Parquet schema to Arrow schema including optional metadata.
+/// Attempts to decode any existing Arrow shcema metadata, falling back
+/// to converting the Parquet schema column-wise
 pub fn parquet_to_arrow_schema(
     parquet_schema: &SchemaDescriptor,
-    metadata: &Option<Vec<KeyValue>>,
+    key_value_metadata: &Option<Vec<KeyValue>>,
 ) -> Result<Schema> {
-    parquet_to_arrow_schema_by_columns(
-        parquet_schema,
-        0..parquet_schema.columns().len(),
-        metadata,
-    )
+    let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
+    let arrow_schema_metadata = metadata
+        .remove(super::ARROW_SCHEMA_META_KEY)
+        .map(|encoded| get_arrow_schema_from_metadata(&encoded));
+
+    match arrow_schema_metadata {
+        Some(Some(schema)) => Ok(schema),
+        _ => parquet_to_arrow_schema_by_columns(
+            parquet_schema,
+            0..parquet_schema.columns().len(),
+            key_value_metadata,
+        ),
+    }
 }
 
 /// Convert parquet schema to arrow schema including optional metadata, only preserving some leaf columns.
@@ -81,6 +90,80 @@ where
         .map(|fields| Schema::new_with_metadata(fields, metadata))
 }
 
+/// Try to convert Arrow schema metadata into a schema
+fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option<Schema> {
+    let decoded = base64::decode(encoded_meta);
+    match decoded {
+        Ok(bytes) => {
+            let slice = if bytes[0..4] == [255u8; 4] {
+                &bytes[8..]
+            } else {
+                bytes.as_slice()
+            };
+            let message = arrow::ipc::get_root_as_message(slice);
+            message
+                .header_as_schema()
+                .map(arrow::ipc::convert::fb_to_schema)
+        }
+        Err(err) => {
+            // The C++ implementation returns an error if the schema can't be parsed.
+            // To prevent this, we explicitly log this, then compute the schema without the metadata
+            eprintln!(
+                "Unable to decode the encoded schema stored in {}, {:?}",
+                super::ARROW_SCHEMA_META_KEY,
+                err
+            );
+            None
+        }
+    }
+}
+
+/// Encodes the Arrow schema into the IPC format, and base64 encodes it
+fn encode_arrow_schema(schema: &Schema) -> String {
+    let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema);
+
+    // manually prepending the length to the schema as arrow uses the legacy IPC format
+    // TODO: change after addressing ARROW-9777
+    let schema_len = serialized_schema.len();
+    let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
+    len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
+    len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
+    len_prefix_schema.append(&mut serialized_schema);
+
+    base64::encode(&len_prefix_schema)
+}
+
+/// Mutates writer metadata by storing the encoded Arrow schema.
+/// If there is an existing Arrow schema metadata, it is replaced.
+pub(crate) fn add_encoded_arrow_schema_to_metadata(
+    schema: &Schema,
+    props: &mut WriterProperties,
+) {
+    let encoded = encode_arrow_schema(schema);
+
+    let schema_kv = KeyValue {
+        key: super::ARROW_SCHEMA_META_KEY.to_string(),
+        value: Some(encoded),
+    };
+
+    let mut meta = props.key_value_metadata.clone().unwrap_or_default();
+    // check if ARROW:schema exists, and overwrite it
+    let schema_meta = meta
+        .iter()
+        .enumerate()
+        .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
+    match schema_meta {
+        Some((i, _)) => {
+            meta.remove(i);
+            meta.push(schema_kv);
+        }
+        None => {
+            meta.push(schema_kv);
+        }
+    }
+    props.key_value_metadata = Some(meta);
+}
+
 /// Convert arrow schema to parquet schema
 pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
     let fields: Result<Vec<TypePtr>> = schema
@@ -215,42 +298,48 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                 .with_logical_type(LogicalType::INTERVAL)
                 .with_repetition(repetition)
-                .with_length(3)
+                .with_length(12)
+                .build()
+        }
+        DataType::Binary | DataType::LargeBinary => {
+            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
+                .with_repetition(repetition)
                 .build()
         }
-        DataType::Binary => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
-            .with_repetition(repetition)
-            .build(),
         DataType::FixedSizeBinary(length) => {
             Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
                 .with_repetition(repetition)
                 .with_length(*length)
                 .build()
         }
-        DataType::Utf8 => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
-            .with_logical_type(LogicalType::UTF8)
-            .with_repetition(repetition)
-            .build(),
-        DataType::List(dtype) | DataType::FixedSizeList(dtype, _) => {
-            Type::group_type_builder(name)
-                .with_fields(&mut vec![Rc::new(
-                    Type::group_type_builder("list")
-                        .with_fields(&mut vec![Rc::new({
-                            let list_field = Field::new(
-                                "element",
-                                *dtype.clone(),
-                                field.is_nullable(),
-                            );
-                            arrow_to_parquet_type(&list_field)?
-                        })])
-                        .with_repetition(Repetition::REPEATED)
-                        .build()?,
-                )])
-                .with_logical_type(LogicalType::LIST)
-                .with_repetition(Repetition::REQUIRED)
+        DataType::Utf8 | DataType::LargeUtf8 => {
+            Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
+                .with_logical_type(LogicalType::UTF8)
+                .with_repetition(repetition)
                 .build()
         }
+        DataType::List(dtype)
+        | DataType::FixedSizeList(dtype, _)
+        | DataType::LargeList(dtype) => Type::group_type_builder(name)
+            .with_fields(&mut vec![Rc::new(
+                Type::group_type_builder("list")
+                    .with_fields(&mut vec![Rc::new({
+                        let list_field =
+                            Field::new("element", *dtype.clone(), field.is_nullable());
+                        arrow_to_parquet_type(&list_field)?
+                    })])
+                    .with_repetition(Repetition::REPEATED)
+                    .build()?,
+            )])
+            .with_logical_type(LogicalType::LIST)
+            .with_repetition(Repetition::REQUIRED)
+            .build(),
         DataType::Struct(fields) => {
+            if fields.is_empty() {
+                return Err(ArrowError(
+                    "Parquet does not support writing empty structs".to_string(),
+                ));
+            }
             // recursively convert children to types/nodes
             let fields: Result<Vec<TypePtr>> = fields
                 .iter()
@@ -267,9 +356,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
             let dict_field = Field::new(name, *value.clone(), field.is_nullable());
             arrow_to_parquet_type(&dict_field)
         }
-        DataType::LargeUtf8 | DataType::LargeBinary | DataType::LargeList(_) => {
-            Err(ArrowError("Large arrays not supported".to_string()))
-        }
     }
 }
 /// This struct is used to group methods and data structures used to convert parquet
@@ -555,12 +641,16 @@ impl ParquetTypeConverter<'_> {
 mod tests {
     use super::*;
 
-    use std::collections::HashMap;
+    use std::{collections::HashMap, convert::TryFrom, sync::Arc};
 
-    use arrow::datatypes::{DataType, DateUnit, Field, TimeUnit};
+    use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, TimeUnit};
 
-    use crate::file::metadata::KeyValue;
-    use crate::schema::{parser::parse_message_type, types::SchemaDescriptor};
+    use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
+    use crate::{
+        arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
+        schema::{parser::parse_message_type, types::SchemaDescriptor},
+        util::test_common::get_temp_file,
+    };
 
     #[test]
     fn test_flat_primitives() {
@@ -1195,6 +1285,17 @@ mod tests {
     }
 
     #[test]
+    #[should_panic(expected = "Parquet does not support writing empty structs")]
+    fn test_empty_struct_field() {
+        let arrow_fields = vec![Field::new("struct", DataType::Struct(vec![]), false)];
+        let arrow_schema = Schema::new(arrow_fields);
+        let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema);
+
+        assert!(converted_arrow_schema.is_err());
+        converted_arrow_schema.unwrap();
+    }
+
+    #[test]
     fn test_metadata() {
         let message_type = "
         message test_schema {
@@ -1216,4 +1317,123 @@ mod tests {
 
         assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
     }
+
+    #[test]
+    fn test_arrow_schema_roundtrip() -> Result<()> {
+        // This tests the roundtrip of an Arrow schema
+        // Fields that are commented out fail roundtrip tests or are unsupported by the writer
+        let metadata: HashMap<String, String> =
+            [("Key".to_string(), "Value".to_string())]
+                .iter()
+                .cloned()
+                .collect();
+
+        let schema = Schema::new_with_metadata(
+            vec![
+                Field::new("c1", DataType::Utf8, false),
+                Field::new("c2", DataType::Binary, false),
+                Field::new("c3", DataType::FixedSizeBinary(3), false),
+                Field::new("c4", DataType::Boolean, false),
+                Field::new("c5", DataType::Date32(DateUnit::Day), false),
+                Field::new("c6", DataType::Date64(DateUnit::Millisecond), false),
+                Field::new("c7", DataType::Time32(TimeUnit::Second), false),
+                Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
+                Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
+                Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
+                Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
+                Field::new(
+                    "c16",
+                    DataType::Timestamp(
+                        TimeUnit::Millisecond,
+                        Some(Arc::new("UTC".to_string())),
+                    ),
+                    false,
+                ),
+                Field::new(
+                    "c17",
+                    DataType::Timestamp(
+                        TimeUnit::Microsecond,
+                        Some(Arc::new("Africa/Johannesburg".to_string())),
+                    ),
+                    false,
+                ),
+                Field::new(
+                    "c18",
+                    DataType::Timestamp(TimeUnit::Nanosecond, None),
+                    false,
+                ),
+                Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
+                Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
+                Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false),
+                Field::new(
+                    "c22",
+                    DataType::FixedSizeList(Box::new(DataType::Boolean), 5),
+                    false,
+                ),
+                Field::new(
+                    "c23",
+                    DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
+                        vec![
+                            Field::new("a", DataType::Int16, true),
+                            Field::new("b", DataType::Float64, false),
+                        ],
+                    ))))),
+                    true,
+                ),
+                Field::new(
+                    "c24",
+                    DataType::Struct(vec![
+                        Field::new("a", DataType::Utf8, false),
+                        Field::new("b", DataType::UInt16, false),
+                    ]),
+                    false,
+                ),
+                Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
+                Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
+                // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
+                // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
+                // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
+                // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
+                // Field::new_dict(
+                //     "c31",
+                //     DataType::Dictionary(
+                //         Box::new(DataType::Int32),
+                //         Box::new(DataType::Utf8),
+                //     ),
+                //     true,
+                //     123,
+                //     true,
+                // ),
+                Field::new("c32", DataType::LargeBinary, true),
+                Field::new("c33", DataType::LargeUtf8, true),
+                Field::new(
+                    "c34",
+                    DataType::LargeList(Box::new(DataType::LargeList(Box::new(
+                        DataType::Struct(vec![
+                            Field::new("a", DataType::Int16, true),
+                            Field::new("b", DataType::Float64, true),
+                        ]),
+                    )))),
+                    true,
+                ),
+            ],
+            metadata,
+        );
+
+        // write to an empty parquet file so that schema is serialized
+        let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]);
+        let mut writer = ArrowWriter::try_new(
+            file.try_clone().unwrap(),
+            Arc::new(schema.clone()),
+            None,
+        )?;
+        writer.close()?;
+
+        // read file back
+        let parquet_reader = SerializedFileReader::try_from(file)?;
+        let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader));
+        let read_schema = arrow_reader.get_schema()?;
+        assert_eq!(schema, read_schema);
+        Ok(())
+    }
 }
diff --git a/rust/parquet/src/file/properties.rs b/rust/parquet/src/file/properties.rs
index 188d6ec..b62ce7b 100644
--- a/rust/parquet/src/file/properties.rs
+++ b/rust/parquet/src/file/properties.rs
@@ -89,8 +89,8 @@ pub type WriterPropertiesPtr = Rc<WriterProperties>;
 
 /// Writer properties.
 ///
-/// It is created as an immutable data structure, use [`WriterPropertiesBuilder`] to
-/// assemble the properties.
+/// All properties except the key-value metadata are immutable,
+/// use [`WriterPropertiesBuilder`] to assemble these properties.
 #[derive(Debug, Clone)]
 pub struct WriterProperties {
     data_pagesize_limit: usize,
@@ -99,7 +99,7 @@ pub struct WriterProperties {
     max_row_group_size: usize,
     writer_version: WriterVersion,
     created_by: String,
-    key_value_metadata: Option<Vec<KeyValue>>,
+    pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
     default_column_properties: ColumnProperties,
     column_properties: HashMap<ColumnPath, ColumnProperties>,
 }