You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/08/26 17:47:39 UTC

[arrow-rs] branch active_release updated: Support arrow readers for strings with DELTA_BYTE_ARRAY encoding (#709) (#718)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch active_release
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/active_release by this push:
     new c8d492c  Support arrow readers for strings with DELTA_BYTE_ARRAY encoding (#709) (#718)
c8d492c is described below

commit c8d492c8282d4c8045028a5555186227e8f40963
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Aug 26 13:47:33 2021 -0400

    Support arrow readers for strings with DELTA_BYTE_ARRAY encoding (#709) (#718)
    
    * Support arrow readers for strings with DELTA_BYTE_ARRAY encoding
    
    * Review fixes
    
    1. move slice init out of the loop,
    2. add tests for nulls,
    3. use `debug_assert` for programming error assertion.
    
    Co-authored-by: Ilya Biryukov <iu...@gmail.com>
---
 parquet/src/arrow/arrow_array_reader.rs | 166 +++++++++++++++++++++++++++++++-
 1 file changed, 164 insertions(+), 2 deletions(-)

diff --git a/parquet/src/arrow/arrow_array_reader.rs b/parquet/src/arrow/arrow_array_reader.rs
index 06f1efb..04de2d4 100644
--- a/parquet/src/arrow/arrow_array_reader.rs
+++ b/parquet/src/arrow/arrow_array_reader.rs
@@ -18,6 +18,8 @@
 use super::array_reader::ArrayReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::Encoding;
+use crate::data_type::{ByteArray, ByteArrayType};
+use crate::decoding::{Decoder, DeltaByteArrayDecoder};
 use crate::errors::{ParquetError, Result};
 use crate::{
     column::page::{Page, PageIterator},
@@ -485,7 +487,10 @@ impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
             // Encoding::RLE => Box::new(RleValueDecoder::new()),
             // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
             // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
-            // Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
+            Encoding::DELTA_BYTE_ARRAY => Ok(Box::new(DeltaByteArrayValueDecoder::new(
+                values_buffer,
+                num_values,
+            )?)),
             e => return Err(nyi_err!("Encoding {} is not supported", e)),
         }
     }
@@ -1074,6 +1079,39 @@ impl ValueDecoder for VariableLenDictionaryDecoder {
     }
 }
 
+pub(crate) struct DeltaByteArrayValueDecoder {
+    decoder: DeltaByteArrayDecoder<ByteArrayType>,
+}
+
+impl DeltaByteArrayValueDecoder {
+    pub fn new(data: ByteBufferPtr, num_values: usize) -> Result<Self> {
+        let mut decoder = DeltaByteArrayDecoder::new();
+        decoder.set_data(data, num_values)?;
+        Ok(Self { decoder })
+    }
+}
+
+impl ValueDecoder for DeltaByteArrayValueDecoder {
+    fn read_value_bytes(
+        &mut self,
+        mut num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        num_values = std::cmp::min(num_values, self.decoder.values_left());
+        let mut values_read = 0;
+        let mut buf = [ByteArray::new()];
+        while values_read < num_values {
+            let num_read = self.decoder.get(&mut buf)?;
+            debug_assert_eq!(num_read, 1);
+
+            read_bytes(buf[0].data(), 1);
+
+            values_read += 1;
+        }
+        Ok(values_read)
+    }
+}
+
 use arrow::datatypes::ArrowPrimitiveType;
 
 pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
@@ -1163,9 +1201,16 @@ impl ArrayConverter for StringArrayConverter {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::arrow::{ArrowReader, ParquetFileArrowReader};
+    use crate::basic::ConvertedType;
     use crate::column::page::Page;
+    use crate::column::writer::ColumnWriter;
     use crate::data_type::ByteArray;
     use crate::data_type::ByteArrayType;
+    use crate::file::properties::WriterProperties;
+    use crate::file::reader::SerializedFileReader;
+    use crate::file::serialized_reader::SliceableCursor;
+    use crate::file::writer::{FileWriter, SerializedFileWriter, TryClone};
     use crate::schema::parser::parse_message_type;
     use crate::schema::types::SchemaDescriptor;
     use crate::util::test_common::page_util::{
@@ -1177,7 +1222,8 @@ mod tests {
     use arrow::array::{PrimitiveArray, StringArray};
     use arrow::datatypes::Int32Type as ArrowInt32;
     use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
-    use std::sync::Arc;
+    use std::io::{Cursor, Seek, SeekFrom, Write};
+    use std::sync::{Arc, Mutex};
 
     /// Iterator for testing reading empty columns
     struct EmptyPageIterator {
@@ -1559,4 +1605,120 @@ mod tests {
             array_reader.get_rep_levels()
         );
     }
+
+    /// Allows to write parquet into memory. Intended only for use in tests.
+    #[derive(Clone)]
+    struct VecWriter {
+        data: Arc<Mutex<Cursor<Vec<u8>>>>,
+    }
+
+    impl VecWriter {
+        pub fn new() -> VecWriter {
+            VecWriter {
+                data: Arc::new(Mutex::new(Cursor::new(Vec::new()))),
+            }
+        }
+
+        pub fn consume(self) -> Vec<u8> {
+            Arc::try_unwrap(self.data)
+                .unwrap()
+                .into_inner()
+                .unwrap()
+                .into_inner()
+        }
+    }
+
+    impl TryClone for VecWriter {
+        fn try_clone(&self) -> std::io::Result<Self> {
+            Ok(self.clone())
+        }
+    }
+
+    impl Seek for VecWriter {
+        fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
+            self.data.lock().unwrap().seek(pos)
+        }
+
+        fn stream_position(&mut self) -> std::io::Result<u64> {
+            self.data.lock().unwrap().stream_position()
+        }
+    }
+
+    impl Write for VecWriter {
+        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+            self.data.lock().unwrap().write(buf)
+        }
+
+        fn flush(&mut self) -> std::io::Result<()> {
+            self.data.lock().unwrap().flush()
+        }
+    }
+
+    #[test]
+    fn test_string_delta_byte_array() {
+        use crate::basic;
+        use crate::schema::types::Type;
+
+        let data = VecWriter::new();
+        let schema = Arc::new(
+            Type::group_type_builder("string_test")
+                .with_fields(&mut vec![Arc::new(
+                    Type::primitive_type_builder("c", basic::Type::BYTE_ARRAY)
+                        .with_converted_type(ConvertedType::UTF8)
+                        .build()
+                        .unwrap(),
+                )])
+                .build()
+                .unwrap(),
+        );
+        // Disable dictionary and use the fallback encoding.
+        let p = Arc::new(
+            WriterProperties::builder()
+                .set_dictionary_enabled(false)
+                .set_encoding(Encoding::DELTA_BYTE_ARRAY)
+                .build(),
+        );
+        // Write a few strings.
+        let mut w = SerializedFileWriter::new(data.clone(), schema, p).unwrap();
+        let mut rg = w.next_row_group().unwrap();
+        let mut c = rg.next_column().unwrap().unwrap();
+        match &mut c {
+            ColumnWriter::ByteArrayColumnWriter(c) => {
+                c.write_batch(
+                    &[ByteArray::from("foo"), ByteArray::from("bar")],
+                    Some(&[0, 1, 0, 0, 1, 0]),
+                    Some(&[0, 0, 0, 0, 0, 0]),
+                )
+                .unwrap();
+            }
+            _ => panic!("unexpected column"),
+        };
+        rg.close_column(c).unwrap();
+        w.close_row_group(rg).unwrap();
+        w.close().unwrap();
+        std::mem::drop(w);
+
+        // Check we can read them back.
+        let r = SerializedFileReader::new(SliceableCursor::new(Arc::new(data.consume())))
+            .unwrap();
+        let mut r = ParquetFileArrowReader::new(Arc::new(r));
+
+        let batch = r
+            .get_record_reader_by_columns([0], 1024)
+            .unwrap()
+            .next()
+            .unwrap()
+            .unwrap();
+        assert_eq!(batch.columns().len(), 1);
+
+        let strings = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(
+            strings.into_iter().collect::<Vec<_>>(),
+            vec![None, Some("foo"), None, None, Some("bar"), None]
+        );
+    }
 }