You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/11/24 11:33:45 UTC

[arrow-rs] branch master updated: Add read parquet examples (#3170)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b9e3fee2 Add read parquet examples (#3170)
4b9e3fee2 is described below

commit 4b9e3fee2878401b141c17a7ac3767cc3fa6c06f
Author: xudong.w <wx...@gmail.com>
AuthorDate: Thu Nov 24 19:33:39 2022 +0800

    Add read parquet examples (#3170)
    
    * Add read parquet examples
    
    * address comments
    
    * add real row filter
    
    * Update parquet/examples/async_read_parquet.rs
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
    
    Co-authored-by: Raphael Taylor-Davies <17...@users.noreply.github.com>
---
 parquet/Cargo.toml                     | 10 ++++++
 parquet/examples/async_read_parquet.rs | 66 ++++++++++++++++++++++++++++++++++
 parquet/examples/read_parquet.rs       | 43 ++++++++++++++++++++++
 3 files changed, 119 insertions(+)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 7a150c949..73c778c4a 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -92,6 +92,16 @@ experimental = []
 # Enable async APIs
 async = ["futures", "tokio"]
 
+[[example]]
+name = "read_parquet"
+required-features = ["arrow"]
+path = "./examples/read_parquet.rs"
+
+[[example]]
+name = "async_read_parquet"
+required-features = ["arrow", "async"]
+path = "./examples/async_read_parquet.rs"
+
 [[test]]
 name = "arrow_writer_layout"
 required-features = ["arrow"]
diff --git a/parquet/examples/async_read_parquet.rs b/parquet/examples/async_read_parquet.rs
new file mode 100644
index 000000000..9b4b6d4ff
--- /dev/null
+++ b/parquet/examples/async_read_parquet.rs
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::util::pretty::print_batches;
+use futures::TryStreamExt;
+use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
+use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::errors::Result;
+use std::time::SystemTime;
+use tokio::fs::File;
+
+#[tokio::main(flavor = "current_thread")]
+async fn main() -> Result<()> {
+    // Create parquet file that will be read.
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = format!("{}/alltypes_plain.parquet", testdata);
+    let file = File::open(path).await.unwrap();
+
+    // Create a async parquet reader builder with batch_size.
+    // batch_size is the number of rows to read up to buffer once from pages, defaults to 1024
+    let mut builder = ParquetRecordBatchStreamBuilder::new(file)
+        .await
+        .unwrap()
+        .with_batch_size(8192);
+
+    let file_metadata = builder.metadata().file_metadata().clone();
+    let mask = ProjectionMask::roots(file_metadata.schema_descr(), [0, 1, 2]);
+    // Set projection mask to read only root columns 1 and 2.
+    builder = builder.with_projection(mask);
+
+    // Highlight: set `RowFilter`, it'll push down filter predicates to skip IO and decode.
+    // For more specific usage: please refer to https://github.com/apache/arrow-datafusion/blob/master/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs.
+    let filter = ArrowPredicateFn::new(
+        ProjectionMask::roots(file_metadata.schema_descr(), [0]),
+        |record_batch| arrow::compute::eq_dyn_scalar(record_batch.column(0), 1),
+    );
+    let row_filter = RowFilter::new(vec![Box::new(filter)]);
+    builder = builder.with_row_filter(row_filter);
+
+    // Build a async parquet reader.
+    let stream = builder.build().unwrap();
+
+    let start = SystemTime::now();
+
+    let result = stream.try_collect::<Vec<_>>().await?;
+
+    println!("took: {} ms", start.elapsed().unwrap().as_millis());
+
+    print_batches(&result).unwrap();
+
+    Ok(())
+}
diff --git a/parquet/examples/read_parquet.rs b/parquet/examples/read_parquet.rs
new file mode 100644
index 000000000..3d6d70aee
--- /dev/null
+++ b/parquet/examples/read_parquet.rs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::util::pretty::print_batches;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+use parquet::errors::Result;
+use std::fs::File;
+
+fn main() -> Result<()> {
+    // Create parquet file that will be read.
+    let testdata = arrow::util::test_util::parquet_test_data();
+    let path = format!("{}/alltypes_plain.parquet", testdata);
+    let file = File::open(path).unwrap();
+
+    // Create a sync parquet reader with batch_size.
+    // batch_size is the number of rows to read up to buffer once from pages, defaults to 1024
+    let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
+        .with_batch_size(8192)
+        .build()?;
+
+    let mut batches = Vec::new();
+
+    for batch in parquet_reader {
+        batches.push(batch?);
+    }
+
+    print_batches(&batches).unwrap();
+    Ok(())
+}