You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/16 16:56:12 UTC

[GitHub] [arrow-rs] thinkharderdev commented on a diff in pull request #2464: Push ChunkReader into SerializedPageReader (#2463)

thinkharderdev commented on code in PR #2464:
URL: https://github.com/apache/arrow-rs/pull/2464#discussion_r947027655


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -471,234 +480,232 @@ pub(crate) fn decode_page(
     Ok(result)
 }
 
-enum SerializedPages<T: Read> {
-    /// Read entire chunk
-    Chunk { buf: T },
-    /// Read operate pages which can skip.
+enum SerializedPageReaderState {
+    Values {
+        /// The current byte offset in the reader
+        offset: usize,
+
+        /// The length of the chunk in bytes
+        remaining_bytes: usize,
+    },
     Pages {
-        offset_index: Vec<PageLocation>,
-        seen_num_data_pages: usize,
-        has_dictionary_page_to_read: bool,
-        page_bufs: VecDeque<T>,
+        /// Remaining page locations
+        page_locations: VecDeque<PageLocation>,
+        /// Remaining dictionary location if any
+        dictionary_page: Option<PageLocation>,
+        /// The total number of rows in this column chunk
+        total_rows: usize,
     },
 }
 
 /// A serialized implementation for Parquet [`PageReader`].
-pub struct SerializedPageReader<T: Read> {
-    // The file source buffer which references exactly the bytes for the column trunk
-    // to be read by this page reader.
-    buf: SerializedPages<T>,
+pub struct SerializedPageReader<R: ChunkReader> {
+    /// The chunk reader
+    reader: Arc<R>,
 
-    // The compression codec for this column chunk. Only set for non-PLAIN codec.
+    /// The compression codec for this column chunk. Only set for non-PLAIN codec.
     decompressor: Option<Box<dyn Codec>>,
 
-    // The number of values we have seen so far.
-    seen_num_values: i64,
-
-    // The number of total values in this column chunk.
-    total_num_values: i64,
-
-    // Column chunk type.
+    /// Column chunk type.
     physical_type: Type,
+
+    state: SerializedPageReaderState,
 }
 
-impl<T: Read> SerializedPageReader<T> {
-    /// Creates a new serialized page reader from file source.
+impl<R: ChunkReader> SerializedPageReader<R> {
+    /// Creates a new serialized page reader from a chunk reader and metadata
     pub fn new(
-        buf: T,
-        total_num_values: i64,
-        compression: Compression,
-        physical_type: Type,
+        reader: Arc<R>,
+        meta: &ColumnChunkMetaData,
+        total_rows: usize,
+        page_locations: Option<Vec<PageLocation>>,
     ) -> Result<Self> {
-        let decompressor = create_codec(compression)?;
-        let result = Self {
-            buf: SerializedPages::Chunk { buf },
-            total_num_values,
-            seen_num_values: 0,
-            decompressor,
-            physical_type,
-        };
-        Ok(result)
-    }
+        let decompressor = create_codec(meta.compression())?;
+        let (start, len) = meta.byte_range();
+
+        let state = match page_locations {
+            Some(locations) => {
+                let dictionary_page = match locations.first() {
+                    Some(dict_offset) if dict_offset.offset as u64 != start => {
+                        Some(PageLocation {
+                            offset: start as i64,
+                            compressed_page_size: (dict_offset.offset as u64 - start)
+                                as i32,
+                            first_row_index: 0,
+                        })
+                    }
+                    _ => None,
+                };
 
-    /// Creates a new serialized page reader from file source.
-    pub fn new_with_page_offsets(
-        total_num_values: i64,
-        compression: Compression,
-        physical_type: Type,
-        offset_index: Vec<PageLocation>,
-        has_dictionary_page_to_read: bool,
-        page_bufs: VecDeque<T>,
-    ) -> Result<Self> {
-        let decompressor = create_codec(compression)?;
-        let result = Self {
-            buf: SerializedPages::Pages {
-                offset_index,
-                seen_num_data_pages: 0,
-                has_dictionary_page_to_read,
-                page_bufs,
+                SerializedPageReaderState::Pages {
+                    page_locations: locations.into(),
+                    dictionary_page,
+                    total_rows,
+                }
+            }
+            None => SerializedPageReaderState::Values {
+                offset: start as usize,
+                remaining_bytes: len as usize,
             },
-            total_num_values,
-            seen_num_values: 0,
-            decompressor,
-            physical_type,
         };
-        Ok(result)
+
+        Ok(Self {
+            reader,
+            decompressor,
+            state,
+            physical_type: meta.column_type(),
+        })
     }
 }
 
-impl<T: Read + Send> Iterator for SerializedPageReader<T> {
+impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
     type Item = Result<Page>;
 
     fn next(&mut self) -> Option<Self::Item> {
         self.get_next_page().transpose()
     }
 }
 
-impl<T: Read + Send> PageReader for SerializedPageReader<T> {
+impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
     fn get_next_page(&mut self) -> Result<Option<Page>> {
-        let mut cursor;
-        let mut dictionary_cursor;
-        while self.seen_num_values < self.total_num_values {
-            match &mut self.buf {
-                SerializedPages::Chunk { buf } => {
-                    cursor = buf;
-                }
-                SerializedPages::Pages {
-                    offset_index,
-                    seen_num_data_pages,
-                    has_dictionary_page_to_read,
-                    page_bufs,
+        loop {
+            let page = match &mut self.state {
+                SerializedPageReaderState::Values {
+                    offset,
+                    remaining_bytes: remaining,
+                    ..
                 } => {
-                    if offset_index.len() <= *seen_num_data_pages {
+                    if *remaining == 0 {
                         return Ok(None);
-                    } else if *seen_num_data_pages == 0 && *has_dictionary_page_to_read {
-                        dictionary_cursor = page_bufs.pop_front().unwrap();
-                        cursor = &mut dictionary_cursor;
-                    } else {
-                        cursor = page_bufs.get_mut(*seen_num_data_pages).unwrap();
                     }
-                }
-            }
 
-            let page_header = read_page_header(cursor)?;
+                    let mut read = self.reader.get_read(*offset as u64, *remaining)?;
 
-            let to_read = page_header.compressed_page_size as usize;
-            let mut buffer = Vec::with_capacity(to_read);
-            let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?;
+                    let (header_len, header) = read_page_header_len(&mut read)?;
+                    let data_len = header.compressed_page_size as usize;
+                    *offset += header_len + data_len;
+                    *remaining -= header_len + data_len;
 
-            if read != to_read {
-                return Err(eof_err!(
-                    "Expected to read {} bytes of page, read only {}",
-                    to_read,
-                    read
-                ));
-            }
+                    if header.type_ == PageType::IndexPage {
+                        continue;
+                    }
+
+                    let mut buffer = Vec::with_capacity(data_len);
+                    let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
 
-            let buffer = ByteBufferPtr::new(buffer);
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer,
+                    if read != data_len {
+                        return Err(eof_err!(
+                            "Expected to read {} bytes of page, read only {}",
+                            data_len,
+                            read
+                        ));
+                    }
+
+                    decode_page(
+                        header,
+                        ByteBufferPtr::new(buffer),
                         self.physical_type,
                         self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    if let SerializedPages::Pages {
-                        seen_num_data_pages,
-                        ..
-                    } = &mut self.buf
-                    {
-                        *seen_num_data_pages += 1;
-                    }
-                    decoded
+                    )?
                 }
-                PageType::DictionaryPage => {
-                    if let SerializedPages::Pages {
-                        has_dictionary_page_to_read,
-                        ..
-                    } = &mut self.buf
+                SerializedPageReaderState::Pages {
+                    page_locations,
+                    dictionary_page,
+                    ..
+                } => {
+                    let front = match dictionary_page
+                        .take()
+                        .or_else(|| page_locations.pop_front())
                     {
-                        *has_dictionary_page_to_read = false;
+                        Some(front) => front,
+                        None => return Ok(None),
+                    };
+
+                    let page_len = front.compressed_page_size as usize;
+
+                    // TODO: Add ChunkReader get_bytes to potentially avoid copy

Review Comment:
   You mean `InMemoryColumnChunkReader`?
   
   If the idea that we make `InMemoryColumnChunk` impl `ChunkReader`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org