You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/06/01 11:30:01 UTC

[arrow-rs] branch master updated: feat: add read parquet by custom rowgroup examples (#4332)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new deb38964b feat: add read parquet by custom rowgroup examples (#4332)
deb38964b is described below

commit deb38964be1440ae2f3477900663bb29c171817f
Author: sundyli <54...@qq.com>
AuthorDate: Thu Jun 1 04:29:54 2023 -0700

    feat: add read parquet by custom rowgroup examples (#4332)
    
    * feat: add parquet read by custom rowgroup example
    
    * feat: add parquet read by custom rowgroup example
    
    * address comments
    
    * address comments
    
    * address comments
---
 .github/workflows/parquet.yml          |   6 ++
 parquet/Cargo.toml                     |   5 +
 parquet/examples/read_with_rowgroup.rs | 185 +++++++++++++++++++++++++++++++++
 3 files changed, 196 insertions(+)

diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml
index ee5813f56..55599b776 100644
--- a/.github/workflows/parquet.yml
+++ b/.github/workflows/parquet.yml
@@ -60,6 +60,12 @@ jobs:
         run: cargo test -p parquet
       - name: Test --all-features
         run: cargo test -p parquet --all-features
+      - name: Run examples
+        run: |
+          # Test parquet examples
+          cargo run -p parquet --example read_parquet
+          cargo run -p parquet --example async_read_parquet --features="async"
+          cargo run -p parquet --example read_with_rowgroup --features="async"
 
   # test compilation
   linux-features:
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index cc48424a6..adcbe82a7 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -111,6 +111,11 @@ name = "async_read_parquet"
 required-features = ["arrow", "async"]
 path = "./examples/async_read_parquet.rs"
 
+[[example]]
+name = "read_with_rowgroup"
+required-features = ["arrow", "async"]
+path = "./examples/read_with_rowgroup.rs"
+
 [[test]]
 name = "arrow_writer_layout"
 required-features = ["arrow"]
diff --git a/parquet/examples/read_with_rowgroup.rs b/parquet/examples/read_with_rowgroup.rs
new file mode 100644
index 000000000..b2d113d50
--- /dev/null
+++ b/parquet/examples/read_with_rowgroup.rs
@@ -0,0 +1,185 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::util::pretty::print_batches;
+use bytes::{Buf, Bytes};
+use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, RowGroups, RowSelection};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{parquet_to_arrow_field_levels, ProjectionMask};
+use parquet::column::page::{PageIterator, PageReader};
+use parquet::errors::{ParquetError, Result};
+use parquet::file::metadata::RowGroupMetaData;
+use parquet::file::reader::{ChunkReader, Length};
+use parquet::file::serialized_reader::SerializedPageReader;
+use std::sync::Arc;
+use tokio::fs::File;
+
+#[tokio::main(flavor = "current_thread")]
+async fn main() -> Result<()> {
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = format!("{testdata}/alltypes_plain.parquet");
+    let mut file = File::open(&path).await.unwrap();
+
+    // The metadata could be cached in other places, this example only shows how to read
+    let metadata = file.get_metadata().await?;
+
+    for rg in metadata.row_groups() {
+        let mut rowgroup = InMemoryRowGroup::create(rg.clone(), ProjectionMask::all());
+        rowgroup.async_fetch_data(&mut file, None).await?;
+        let reader = rowgroup.build_reader(1024, None)?;
+
+        for batch in reader {
+            let batch = batch?;
+            print_batches(&[batch])?;
+        }
+    }
+
+    Ok(())
+}
+
+/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
+struct ColumnChunkIterator {
+    reader: Option<Result<Box<dyn PageReader>>>,
+}
+
+impl Iterator for ColumnChunkIterator {
+    type Item = Result<Box<dyn PageReader>>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.reader.take()
+    }
+}
+
+impl PageIterator for ColumnChunkIterator {}
+
+/// An in-memory column chunk
+#[derive(Clone)]
+pub struct ColumnChunkData {
+    offset: usize,
+    data: Bytes,
+}
+
+impl ColumnChunkData {
+    fn get(&self, start: u64) -> Result<Bytes> {
+        let start = start as usize - self.offset;
+        Ok(self.data.slice(start..))
+    }
+}
+
+impl Length for ColumnChunkData {
+    fn len(&self) -> u64 {
+        self.data.len() as u64
+    }
+}
+
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
+
+    fn get_read(&self, start: u64) -> Result<Self::T> {
+        Ok(self.get(start)?.reader())
+    }
+
+    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
+        Ok(self.get(start)?.slice(..length))
+    }
+}
+
+#[derive(Clone)]
+pub struct InMemoryRowGroup {
+    pub metadata: RowGroupMetaData,
+    mask: ProjectionMask,
+    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
+}
+
+impl RowGroups for InMemoryRowGroup {
+    fn num_rows(&self) -> usize {
+        self.metadata.num_rows() as usize
+    }
+
+    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {i}, column was not fetched"
+            ))),
+            Some(data) => {
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        data.clone(),
+                        self.metadata.column(i),
+                        self.num_rows(),
+                        None,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
+    }
+}
+
+impl InMemoryRowGroup {
+    pub fn create(metadata: RowGroupMetaData, mask: ProjectionMask) -> Self {
+        let column_chunks = metadata.columns().iter().map(|_| None).collect::<Vec<_>>();
+
+        Self {
+            metadata,
+            mask,
+            column_chunks,
+        }
+    }
+
+    pub fn build_reader(
+        &self,
+        batch_size: usize,
+        selection: Option<RowSelection>,
+    ) -> Result<ParquetRecordBatchReader> {
+        let levels = parquet_to_arrow_field_levels(
+            &self.metadata.schema_descr_ptr(),
+            self.mask.clone(),
+            None,
+        )?;
+
+        ParquetRecordBatchReader::try_new_with_row_groups(
+            &levels, self, batch_size, selection,
+        )
+    }
+
+    /// fetch data from a reader in sync mode
+    pub async fn async_fetch_data<R: AsyncFileReader>(
+        &mut self,
+        reader: &mut R,
+        _selection: Option<&RowSelection>,
+    ) -> Result<()> {
+        let mut vs = std::mem::take(&mut self.column_chunks);
+        for (leaf_idx, meta) in self.metadata.columns().iter().enumerate() {
+            if self.mask.leaf_included(leaf_idx) {
+                let (start, len) = meta.byte_range();
+                let data = reader
+                    .get_bytes(start as usize..(start + len) as usize)
+                    .await?;
+
+                vs[leaf_idx] = Some(Arc::new(ColumnChunkData {
+                    offset: start as usize,
+                    data,
+                }));
+            }
+        }
+        self.column_chunks = std::mem::take(&mut vs);
+        Ok(())
+    }
+}