You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/04/06 19:10:49 UTC
[arrow-rs] branch master updated: Only require compatible batch schema in ArrowWriter (#4027)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9bf99840b Only require compatible batch schema in ArrowWriter (#4027)
9bf99840b is described below
commit 9bf99840b135ae9a7ae365e114f4df1d30627998
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Thu Apr 6 20:10:43 2023 +0100
Only require compatible batch schema in ArrowWriter (#4027)
---
parquet/src/arrow/arrow_writer/mod.rs | 52 ++++++++++++++++++++++++++++++++++-
1 file changed, 51 insertions(+), 1 deletion(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs
index 4cf54dc88..d026f971e 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -132,7 +132,10 @@ impl<W: Write> ArrowWriter<W> {
/// and drop any fully written `RecordBatch`
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
// validate batch schema against writer's supplied schema
- if self.arrow_schema != batch.schema() {
+ let batch_schema = batch.schema();
+ if !(Arc::ptr_eq(&self.arrow_schema, &batch_schema)
+ || self.arrow_schema.contains(&batch_schema))
+ {
return Err(ParquetError::ArrowError(
"Record batch schema does not match writer schema".to_string(),
));
@@ -2358,4 +2361,51 @@ mod tests {
let actual = pretty_format_batches(&batches).unwrap().to_string();
assert_eq!(actual, expected);
}
+
+ #[test]
+ fn test_arrow_writer_metadata() {
+ let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
+ let file_schema = batch_schema.clone().with_metadata(
+ vec![("foo".to_string(), "bar".to_string())]
+ .into_iter()
+ .collect(),
+ );
+
+ let batch = RecordBatch::try_new(
+ Arc::new(batch_schema),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
+ )
+ .unwrap();
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer =
+ ArrowWriter::try_new(&mut buf, Arc::new(file_schema), None).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ }
+
+ #[test]
+ fn test_arrow_writer_nullable() {
+ let batch_schema = Schema::new(vec![Field::new("int32", DataType::Int32, false)]);
+ let file_schema = Schema::new(vec![Field::new("int32", DataType::Int32, true)]);
+ let file_schema = Arc::new(file_schema);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(batch_schema),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4])) as _],
+ )
+ .unwrap();
+
+ let mut buf = Vec::with_capacity(1024);
+ let mut writer =
+ ArrowWriter::try_new(&mut buf, file_schema.clone(), None).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let mut read = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
+ let back = read.next().unwrap().unwrap();
+ assert_eq!(back.schema(), file_schema);
+ assert_ne!(back.schema(), batch.schema());
+ assert_eq!(back.column(0).as_ref(), batch.column(0).as_ref());
+ }
}