You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ne...@apache.org on 2020/09/25 16:02:33 UTC
[arrow] 02/03: ARROW-8423: [Rust] [Parquet] Serialize Arrow schema
metadata
This is an automated email from the ASF dual-hosted git repository.
nevime pushed a commit to branch rust-parquet-arrow-writer
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 28b075d2481d3cf47542d96bff42e843845be4c6
Author: Neville Dipale <ne...@gmail.com>
AuthorDate: Tue Aug 18 18:39:37 2020 +0200
ARROW-8423: [Rust] [Parquet] Serialize Arrow schema metadata
This will allow preserving Arrow-specific metadata when writing or reading Parquet files created from C++ or Rust.
If the schema can't be deserialised, the normal Parquet > Arrow schema conversion is performed.
Closes #7917 from nevi-me/ARROW-8243
Authored-by: Neville Dipale <ne...@gmail.com>
Signed-off-by: Neville Dipale <ne...@gmail.com>
---
rust/parquet/Cargo.toml | 3 +-
rust/parquet/src/arrow/arrow_writer.rs | 27 ++-
rust/parquet/src/arrow/mod.rs | 4 +
rust/parquet/src/arrow/schema.rs | 306 ++++++++++++++++++++++++++++-----
rust/parquet/src/file/properties.rs | 6 +-
5 files changed, 290 insertions(+), 56 deletions(-)
diff --git a/rust/parquet/Cargo.toml b/rust/parquet/Cargo.toml
index 50d7c34..60e43c9 100644
--- a/rust/parquet/Cargo.toml
+++ b/rust/parquet/Cargo.toml
@@ -40,6 +40,7 @@ zstd = { version = "0.5", optional = true }
chrono = "0.4"
num-bigint = "0.3"
arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT", optional = true }
+base64 = { version = "*", optional = true }
[dev-dependencies]
rand = "0.7"
@@ -52,4 +53,4 @@ arrow = { path = "../arrow", version = "2.0.0-SNAPSHOT" }
serde_json = { version = "1.0", features = ["preserve_order"] }
[features]
-default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"]
+default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
diff --git a/rust/parquet/src/arrow/arrow_writer.rs b/rust/parquet/src/arrow/arrow_writer.rs
index 0c1c490..1ca8d50 100644
--- a/rust/parquet/src/arrow/arrow_writer.rs
+++ b/rust/parquet/src/arrow/arrow_writer.rs
@@ -24,6 +24,7 @@ use arrow::datatypes::{DataType as ArrowDataType, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow_array::Array;
+use super::schema::add_encoded_arrow_schema_to_metadata;
use crate::column::writer::ColumnWriter;
use crate::errors::{ParquetError, Result};
use crate::file::properties::WriterProperties;
@@ -53,17 +54,17 @@ impl<W: 'static + ParquetWriter> ArrowWriter<W> {
pub fn try_new(
writer: W,
arrow_schema: SchemaRef,
- props: Option<Rc<WriterProperties>>,
+ props: Option<WriterProperties>,
) -> Result<Self> {
let schema = crate::arrow::arrow_to_parquet_schema(&arrow_schema)?;
- let props = match props {
- Some(props) => props,
- None => Rc::new(WriterProperties::builder().build()),
- };
+ // add serialized arrow schema
+ let mut props = props.unwrap_or_else(|| WriterProperties::builder().build());
+ add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
+
let file_writer = SerializedFileWriter::new(
writer.try_clone()?,
schema.root_schema_ptr(),
- props,
+ Rc::new(props),
)?;
Ok(Self {
@@ -495,7 +496,7 @@ mod tests {
use arrow::record_batch::{RecordBatch, RecordBatchReader};
use crate::arrow::{ArrowReader, ParquetFileArrowReader};
- use crate::file::reader::SerializedFileReader;
+ use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
use crate::util::test_common::get_temp_file;
#[test]
@@ -584,7 +585,7 @@ mod tests {
)
.unwrap();
- let mut file = get_temp_file("test_arrow_writer.parquet", &[]);
+ let mut file = get_temp_file("test_arrow_writer_binary.parquet", &[]);
let mut writer =
ArrowWriter::try_new(file.try_clone().unwrap(), Arc::new(schema), None)
.unwrap();
@@ -674,8 +675,16 @@ mod tests {
)
.unwrap();
+ let props = WriterProperties::builder()
+ .set_key_value_metadata(Some(vec![KeyValue {
+ key: "test_key".to_string(),
+ value: Some("test_value".to_string()),
+ }]))
+ .build();
+
let file = get_temp_file("test_arrow_writer_complex.parquet", &[]);
- let mut writer = ArrowWriter::try_new(file, Arc::new(schema), None).unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file, Arc::new(schema), Some(props)).unwrap();
writer.write(&batch).unwrap();
writer.close().unwrap();
}
diff --git a/rust/parquet/src/arrow/mod.rs b/rust/parquet/src/arrow/mod.rs
index c8739c2..2bdb07c 100644
--- a/rust/parquet/src/arrow/mod.rs
+++ b/rust/parquet/src/arrow/mod.rs
@@ -58,6 +58,10 @@ pub mod schema;
pub use self::arrow_reader::ArrowReader;
pub use self::arrow_reader::ParquetFileArrowReader;
+pub use self::arrow_writer::ArrowWriter;
pub use self::schema::{
arrow_to_parquet_schema, parquet_to_arrow_schema, parquet_to_arrow_schema_by_columns,
};
+
+/// Schema metadata key used to store serialized Arrow IPC schema
+pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";
diff --git a/rust/parquet/src/arrow/schema.rs b/rust/parquet/src/arrow/schema.rs
index aebb9e7..d4cfe1f 100644
--- a/rust/parquet/src/arrow/schema.rs
+++ b/rust/parquet/src/arrow/schema.rs
@@ -26,24 +26,33 @@
use std::collections::{HashMap, HashSet};
use std::rc::Rc;
+use arrow::datatypes::{DataType, DateUnit, Field, Schema, TimeUnit};
+
use crate::basic::{LogicalType, Repetition, Type as PhysicalType};
use crate::errors::{ParquetError::ArrowError, Result};
-use crate::file::metadata::KeyValue;
+use crate::file::{metadata::KeyValue, properties::WriterProperties};
use crate::schema::types::{ColumnDescriptor, SchemaDescriptor, Type, TypePtr};
-use arrow::datatypes::TimeUnit;
-use arrow::datatypes::{DataType, DateUnit, Field, Schema};
-
-/// Convert parquet schema to arrow schema including optional metadata.
+/// Convert Parquet schema to Arrow schema including optional metadata.
+/// Attempts to decode any existing Arrow shcema metadata, falling back
+/// to converting the Parquet schema column-wise
pub fn parquet_to_arrow_schema(
parquet_schema: &SchemaDescriptor,
- metadata: &Option<Vec<KeyValue>>,
+ key_value_metadata: &Option<Vec<KeyValue>>,
) -> Result<Schema> {
- parquet_to_arrow_schema_by_columns(
- parquet_schema,
- 0..parquet_schema.columns().len(),
- metadata,
- )
+ let mut metadata = parse_key_value_metadata(key_value_metadata).unwrap_or_default();
+ let arrow_schema_metadata = metadata
+ .remove(super::ARROW_SCHEMA_META_KEY)
+ .map(|encoded| get_arrow_schema_from_metadata(&encoded));
+
+ match arrow_schema_metadata {
+ Some(Some(schema)) => Ok(schema),
+ _ => parquet_to_arrow_schema_by_columns(
+ parquet_schema,
+ 0..parquet_schema.columns().len(),
+ key_value_metadata,
+ ),
+ }
}
/// Convert parquet schema to arrow schema including optional metadata, only preserving some leaf columns.
@@ -81,6 +90,80 @@ where
.map(|fields| Schema::new_with_metadata(fields, metadata))
}
+/// Try to convert Arrow schema metadata into a schema
+fn get_arrow_schema_from_metadata(encoded_meta: &str) -> Option<Schema> {
+ let decoded = base64::decode(encoded_meta);
+ match decoded {
+ Ok(bytes) => {
+ let slice = if bytes[0..4] == [255u8; 4] {
+ &bytes[8..]
+ } else {
+ bytes.as_slice()
+ };
+ let message = arrow::ipc::get_root_as_message(slice);
+ message
+ .header_as_schema()
+ .map(arrow::ipc::convert::fb_to_schema)
+ }
+ Err(err) => {
+ // The C++ implementation returns an error if the schema can't be parsed.
+ // To prevent this, we explicitly log this, then compute the schema without the metadata
+ eprintln!(
+ "Unable to decode the encoded schema stored in {}, {:?}",
+ super::ARROW_SCHEMA_META_KEY,
+ err
+ );
+ None
+ }
+ }
+}
+
+/// Encodes the Arrow schema into the IPC format, and base64 encodes it
+fn encode_arrow_schema(schema: &Schema) -> String {
+ let mut serialized_schema = arrow::ipc::writer::schema_to_bytes(&schema);
+
+ // manually prepending the length to the schema as arrow uses the legacy IPC format
+ // TODO: change after addressing ARROW-9777
+ let schema_len = serialized_schema.len();
+ let mut len_prefix_schema = Vec::with_capacity(schema_len + 8);
+ len_prefix_schema.append(&mut vec![255u8, 255, 255, 255]);
+ len_prefix_schema.append((schema_len as u32).to_le_bytes().to_vec().as_mut());
+ len_prefix_schema.append(&mut serialized_schema);
+
+ base64::encode(&len_prefix_schema)
+}
+
+/// Mutates writer metadata by storing the encoded Arrow schema.
+/// If there is an existing Arrow schema metadata, it is replaced.
+pub(crate) fn add_encoded_arrow_schema_to_metadata(
+ schema: &Schema,
+ props: &mut WriterProperties,
+) {
+ let encoded = encode_arrow_schema(schema);
+
+ let schema_kv = KeyValue {
+ key: super::ARROW_SCHEMA_META_KEY.to_string(),
+ value: Some(encoded),
+ };
+
+ let mut meta = props.key_value_metadata.clone().unwrap_or_default();
+ // check if ARROW:schema exists, and overwrite it
+ let schema_meta = meta
+ .iter()
+ .enumerate()
+ .find(|(_, kv)| kv.key.as_str() == super::ARROW_SCHEMA_META_KEY);
+ match schema_meta {
+ Some((i, _)) => {
+ meta.remove(i);
+ meta.push(schema_kv);
+ }
+ None => {
+ meta.push(schema_kv);
+ }
+ }
+ props.key_value_metadata = Some(meta);
+}
+
/// Convert arrow schema to parquet schema
pub fn arrow_to_parquet_schema(schema: &Schema) -> Result<SchemaDescriptor> {
let fields: Result<Vec<TypePtr>> = schema
@@ -215,42 +298,48 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_logical_type(LogicalType::INTERVAL)
.with_repetition(repetition)
- .with_length(3)
+ .with_length(12)
+ .build()
+ }
+ DataType::Binary | DataType::LargeBinary => {
+ Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
+ .with_repetition(repetition)
.build()
}
- DataType::Binary => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
- .with_repetition(repetition)
- .build(),
DataType::FixedSizeBinary(length) => {
Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_repetition(repetition)
.with_length(*length)
.build()
}
- DataType::Utf8 => Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
- .with_logical_type(LogicalType::UTF8)
- .with_repetition(repetition)
- .build(),
- DataType::List(dtype) | DataType::FixedSizeList(dtype, _) => {
- Type::group_type_builder(name)
- .with_fields(&mut vec![Rc::new(
- Type::group_type_builder("list")
- .with_fields(&mut vec![Rc::new({
- let list_field = Field::new(
- "element",
- *dtype.clone(),
- field.is_nullable(),
- );
- arrow_to_parquet_type(&list_field)?
- })])
- .with_repetition(Repetition::REPEATED)
- .build()?,
- )])
- .with_logical_type(LogicalType::LIST)
- .with_repetition(Repetition::REQUIRED)
+ DataType::Utf8 | DataType::LargeUtf8 => {
+ Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY)
+ .with_logical_type(LogicalType::UTF8)
+ .with_repetition(repetition)
.build()
}
+ DataType::List(dtype)
+ | DataType::FixedSizeList(dtype, _)
+ | DataType::LargeList(dtype) => Type::group_type_builder(name)
+ .with_fields(&mut vec![Rc::new(
+ Type::group_type_builder("list")
+ .with_fields(&mut vec![Rc::new({
+ let list_field =
+ Field::new("element", *dtype.clone(), field.is_nullable());
+ arrow_to_parquet_type(&list_field)?
+ })])
+ .with_repetition(Repetition::REPEATED)
+ .build()?,
+ )])
+ .with_logical_type(LogicalType::LIST)
+ .with_repetition(Repetition::REQUIRED)
+ .build(),
DataType::Struct(fields) => {
+ if fields.is_empty() {
+ return Err(ArrowError(
+ "Parquet does not support writing empty structs".to_string(),
+ ));
+ }
// recursively convert children to types/nodes
let fields: Result<Vec<TypePtr>> = fields
.iter()
@@ -267,9 +356,6 @@ fn arrow_to_parquet_type(field: &Field) -> Result<Type> {
let dict_field = Field::new(name, *value.clone(), field.is_nullable());
arrow_to_parquet_type(&dict_field)
}
- DataType::LargeUtf8 | DataType::LargeBinary | DataType::LargeList(_) => {
- Err(ArrowError("Large arrays not supported".to_string()))
- }
}
}
/// This struct is used to group methods and data structures used to convert parquet
@@ -555,12 +641,16 @@ impl ParquetTypeConverter<'_> {
mod tests {
use super::*;
- use std::collections::HashMap;
+ use std::{collections::HashMap, convert::TryFrom, sync::Arc};
- use arrow::datatypes::{DataType, DateUnit, Field, TimeUnit};
+ use arrow::datatypes::{DataType, DateUnit, Field, IntervalUnit, TimeUnit};
- use crate::file::metadata::KeyValue;
- use crate::schema::{parser::parse_message_type, types::SchemaDescriptor};
+ use crate::file::{metadata::KeyValue, reader::SerializedFileReader};
+ use crate::{
+ arrow::{ArrowReader, ArrowWriter, ParquetFileArrowReader},
+ schema::{parser::parse_message_type, types::SchemaDescriptor},
+ util::test_common::get_temp_file,
+ };
#[test]
fn test_flat_primitives() {
@@ -1195,6 +1285,17 @@ mod tests {
}
#[test]
+ #[should_panic(expected = "Parquet does not support writing empty structs")]
+ fn test_empty_struct_field() {
+ let arrow_fields = vec![Field::new("struct", DataType::Struct(vec![]), false)];
+ let arrow_schema = Schema::new(arrow_fields);
+ let converted_arrow_schema = arrow_to_parquet_schema(&arrow_schema);
+
+ assert!(converted_arrow_schema.is_err());
+ converted_arrow_schema.unwrap();
+ }
+
+ #[test]
fn test_metadata() {
let message_type = "
message test_schema {
@@ -1216,4 +1317,123 @@ mod tests {
assert_eq!(converted_arrow_schema.metadata(), &expected_metadata);
}
+
+ #[test]
+ fn test_arrow_schema_roundtrip() -> Result<()> {
+ // This tests the roundtrip of an Arrow schema
+ // Fields that are commented out fail roundtrip tests or are unsupported by the writer
+ let metadata: HashMap<String, String> =
+ [("Key".to_string(), "Value".to_string())]
+ .iter()
+ .cloned()
+ .collect();
+
+ let schema = Schema::new_with_metadata(
+ vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::Binary, false),
+ Field::new("c3", DataType::FixedSizeBinary(3), false),
+ Field::new("c4", DataType::Boolean, false),
+ Field::new("c5", DataType::Date32(DateUnit::Day), false),
+ Field::new("c6", DataType::Date64(DateUnit::Millisecond), false),
+ Field::new("c7", DataType::Time32(TimeUnit::Second), false),
+ Field::new("c8", DataType::Time32(TimeUnit::Millisecond), false),
+ Field::new("c13", DataType::Time64(TimeUnit::Microsecond), false),
+ Field::new("c14", DataType::Time64(TimeUnit::Nanosecond), false),
+ Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
+ Field::new(
+ "c16",
+ DataType::Timestamp(
+ TimeUnit::Millisecond,
+ Some(Arc::new("UTC".to_string())),
+ ),
+ false,
+ ),
+ Field::new(
+ "c17",
+ DataType::Timestamp(
+ TimeUnit::Microsecond,
+ Some(Arc::new("Africa/Johannesburg".to_string())),
+ ),
+ false,
+ ),
+ Field::new(
+ "c18",
+ DataType::Timestamp(TimeUnit::Nanosecond, None),
+ false,
+ ),
+ Field::new("c19", DataType::Interval(IntervalUnit::DayTime), false),
+ Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false),
+ Field::new("c21", DataType::List(Box::new(DataType::Boolean)), false),
+ Field::new(
+ "c22",
+ DataType::FixedSizeList(Box::new(DataType::Boolean), 5),
+ false,
+ ),
+ Field::new(
+ "c23",
+ DataType::List(Box::new(DataType::List(Box::new(DataType::Struct(
+ vec![
+ Field::new("a", DataType::Int16, true),
+ Field::new("b", DataType::Float64, false),
+ ],
+ ))))),
+ true,
+ ),
+ Field::new(
+ "c24",
+ DataType::Struct(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::UInt16, false),
+ ]),
+ false,
+ ),
+ Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true),
+ Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true),
+ // Field::new("c27", DataType::Duration(TimeUnit::Second), false),
+ // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false),
+ // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false),
+ // Field::new("c30", DataType::Duration(TimeUnit::Nanosecond), false),
+ // Field::new_dict(
+ // "c31",
+ // DataType::Dictionary(
+ // Box::new(DataType::Int32),
+ // Box::new(DataType::Utf8),
+ // ),
+ // true,
+ // 123,
+ // true,
+ // ),
+ Field::new("c32", DataType::LargeBinary, true),
+ Field::new("c33", DataType::LargeUtf8, true),
+ Field::new(
+ "c34",
+ DataType::LargeList(Box::new(DataType::LargeList(Box::new(
+ DataType::Struct(vec![
+ Field::new("a", DataType::Int16, true),
+ Field::new("b", DataType::Float64, true),
+ ]),
+ )))),
+ true,
+ ),
+ ],
+ metadata,
+ );
+
+ // write to an empty parquet file so that schema is serialized
+ let file = get_temp_file("test_arrow_schema_roundtrip.parquet", &[]);
+ let mut writer = ArrowWriter::try_new(
+ file.try_clone().unwrap(),
+ Arc::new(schema.clone()),
+ None,
+ )?;
+ writer.close()?;
+
+ // read file back
+ let parquet_reader = SerializedFileReader::try_from(file)?;
+ let mut arrow_reader = ParquetFileArrowReader::new(Rc::new(parquet_reader));
+ let read_schema = arrow_reader.get_schema()?;
+ assert_eq!(schema, read_schema);
+ Ok(())
+ }
}
diff --git a/rust/parquet/src/file/properties.rs b/rust/parquet/src/file/properties.rs
index 188d6ec..b62ce7b 100644
--- a/rust/parquet/src/file/properties.rs
+++ b/rust/parquet/src/file/properties.rs
@@ -89,8 +89,8 @@ pub type WriterPropertiesPtr = Rc<WriterProperties>;
/// Writer properties.
///
-/// It is created as an immutable data structure, use [`WriterPropertiesBuilder`] to
-/// assemble the properties.
+/// All properties except the key-value metadata are immutable,
+/// use [`WriterPropertiesBuilder`] to assemble these properties.
#[derive(Debug, Clone)]
pub struct WriterProperties {
data_pagesize_limit: usize,
@@ -99,7 +99,7 @@ pub struct WriterProperties {
max_row_group_size: usize,
writer_version: WriterVersion,
created_by: String,
- key_value_metadata: Option<Vec<KeyValue>>,
+ pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
}