You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/26 17:47:39 UTC
[arrow-rs] branch active_release updated: Support arrow readers for
strings with DELTA_BYTE_ARRAY encoding (#709) (#718)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch active_release
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/active_release by this push:
new c8d492c Support arrow readers for strings with DELTA_BYTE_ARRAY encoding (#709) (#718)
c8d492c is described below
commit c8d492c8282d4c8045028a5555186227e8f40963
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Aug 26 13:47:33 2021 -0400
Support arrow readers for strings with DELTA_BYTE_ARRAY encoding (#709) (#718)
* Support arrow readers for strings with DELTA_BYTE_ARRAY encoding
* Review fixes
1. move slice init out of the loop,
2. add tests for nulls,
3. use `debug_assert` for programming error assertion.
Co-authored-by: Ilya Biryukov <iu...@gmail.com>
---
parquet/src/arrow/arrow_array_reader.rs | 166 +++++++++++++++++++++++++++++++-
1 file changed, 164 insertions(+), 2 deletions(-)
diff --git a/parquet/src/arrow/arrow_array_reader.rs b/parquet/src/arrow/arrow_array_reader.rs
index 06f1efb..04de2d4 100644
--- a/parquet/src/arrow/arrow_array_reader.rs
+++ b/parquet/src/arrow/arrow_array_reader.rs
@@ -18,6 +18,8 @@
use super::array_reader::ArrayReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Encoding;
+use crate::data_type::{ByteArray, ByteArrayType};
+use crate::decoding::{Decoder, DeltaByteArrayDecoder};
use crate::errors::{ParquetError, Result};
use crate::{
column::page::{Page, PageIterator},
@@ -485,7 +487,10 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
// Encoding::RLE => Box::new(RleValueDecoder::new()),
// Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
// Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
- // Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
+ Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayValueDecoder::new(
+ values_buffer,
+ num_values,
+ )?)),
e => return Err(nyi_err!("Encoding {} is not supported", e)),
}
}
@@ -1074,6 +1079,39 @@ impl ValueDecoder for VariableLenDictionaryDecoder {
}
}
+pub(crate) struct DeltaByteArrayValueDecoder {
+ decoder: DeltaByteArrayDecoder<ByteArrayType>,
+}
+
+impl DeltaByteArrayValueDecoder {
+ pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
+ let mut decoder = DeltaByteArrayDecoder::new();
+ decoder.set_data(data, num_values)?;
+ Ok(Self { decoder })
+ }
+}
+
+impl ValueDecoder for DeltaByteArrayValueDecoder {
+ fn read_value_bytes(
+ &mut self,
+ mut num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ num_values = std::cmp::min(num_values, self.decoder.values_left());
+ let mut values_read = 0;
+ let mut buf = [ByteArray::new()];
+ while values_read < num_values {
+ let num_read = self.decoder.get(&mut buf)?;
+ debug_assert_eq!(num_read, 1);
+
+ read_bytes(buf[0].data(), 1);
+
+ values_read += 1;
+ }
+ Ok(values_read)
+ }
+}
+
use arrow::datatypes::ArrowPrimitiveType;
pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
@@ -1163,9 +1201,16 @@ impl ArrayConverter for StringArrayConverter {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+ use crate::basic::ConvertedType;
use crate::column::page::Page;
+ use crate::column::writer::ColumnWriter;
use crate::data_type::ByteArray;
use crate::data_type::ByteArrayType;
+ use crate::file::properties::WriterProperties;
+ use crate::file::reader::SerializedFileReader;
+ use crate::file::serialized_reader::SliceableCursor;
+ use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone};
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use crate::util::test_common::page_util::{
@@ -1177,7 +1222,8 @@ mod tests {
use arrow::array::{PrimitiveArray, StringArray};
use arrow::datatypes::Int32Type as ArrowInt32;
use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
- use std::sync::Arc;
+ use std::io::{Cursor, Seek, SeekFrom, Write};
+ use std::sync::{Arc, Mutex};
/// Iterator for testing reading empty columns
struct EmptyPageIterator {
@@ -1559,4 +1605,120 @@ mod tests {
array_reader.get_rep_levels()
);
}
+
+ /// Allows to write parquet into memory. Intended only for use in tests.
+ #[derive(Clone)]
+ struct VecWriter {
+ data: Arc<Mutex<Cursor<Vec<u8>>>>,
+ }
+
+ impl VecWriter {
+ pub fn new() -> VecWriter {
+ VecWriter {
+ data: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
+ }
+ }
+
+ pub fn consume(self) -> Vec<u8> {
+ Arc::try_unwrap(self.data)
+ .unwrap()
+ .into_inner()
+ .unwrap()
+ .into_inner()
+ }
+ }
+
+ impl TryClone for VecWriter {
+ fn try_clone(&self) -> std::io::Result<Self> {
+ Ok(self.clone())
+ }
+ }
+
+ impl Seek for VecWriter {
+ fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
+ self.data.lock().unwrap().seek(pos)
+ }
+
+ fn stream_position(&mut self) -> std::io::Result<u64> {
+ self.data.lock().unwrap().stream_position()
+ }
+ }
+
+ impl Write for VecWriter {
+ fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+ self.data.lock().unwrap().write(buf)
+ }
+
+ fn flush(&mut self) -> std::io::Result<()> {
+ self.data.lock().unwrap().flush()
+ }
+ }
+
+ #[test]
+ fn test_string_delta_byte_array() {
+ use crate::basic;
+ use crate::schema::types::Type;
+
+ let data = VecWriter::new();
+ let schema = Arc::new(
+ Type::group_type_builder("string_test")
+ .with_fields(&mut vec![Arc::new(
+ Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY)
+ .with_converted_type(ConvertedType::UTF8)
+ .build()
+ .unwrap(),
+ )])
+ .build()
+ .unwrap(),
+ );
+ // Disable dictionary and use the fallback encoding.
+ let p = Arc::new(
+ WriterProperties::builder()
+ .set_dictionary_enabled(false)
+ .set_encoding(Encoding::DELTA_BYTE_ARRAY)
+ .build(),
+ );
+ // Write a few strings.
+ let mut w = SerializedFileWriter::new(data.clone(), schema, p).unwrap();
+ let mut rg = w.next_row_group().unwrap();
+ let mut c = rg.next_column().unwrap().unwrap();
+ match &mut c {
+ ColumnWriter::ByteArrayColumnWriter(c) => {
+ c.write_batch(
+ &[ByteArray::from("foo"), ByteArray::from("bar")],
+ Some(&[0, 1, 0, 0, 1, 0]),
+ Some(&[0, 0, 0, 0, 0, 0]),
+ )
+ .unwrap();
+ }
+ _ => panic!("unexpected column"),
+ };
+ rg.close_column(c).unwrap();
+ w.close_row_group(rg).unwrap();
+ w.close().unwrap();
+ std::mem::drop(w);
+
+ // Check we can read them back.
+ let r = SerializedFileReader::new(SliceableCursor::new(Arc::new(data.consume())))
+ .unwrap();
+ let mut r = ParquetFileArrowReader::new(Arc::new(r));
+
+ let batch = r
+ .get_record_reader_by_columns([0], 1024)
+ .unwrap()
+ .next()
+ .unwrap()
+ .unwrap();
+ assert_eq!(batch.columns().len(), 1);
+
+ let strings = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap();
+ assert_eq!(
+ strings.into_iter().collect::<Vec<_>>(),
+ vec![None, Some("foo"), None, None, Some("bar"), None]
+ );
+ }
}