You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/02 15:49:29 UTC

[arrow-rs] branch master updated: Add parquet-index binary (#3405)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 6139d8984 Add parquet-index binary (#3405)
6139d8984 is described below

commit 6139d8984ca702fa744aaf3b0b2193dd85468cf7
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Jan 2 15:49:24 2023 +0000

    Add parquet-index binary (#3405)
    
    * Add parquet-index binary
    
    * Improve error message for missing index
---
 parquet/Cargo.toml               |   4 +
 parquet/src/bin/parquet-index.rs | 173 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 177 insertions(+)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 6ee83a2c4..7a76ff64e 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -138,6 +138,10 @@ required-features = ["cli"]
 name = "parquet-layout"
 required-features = ["cli"]
 
+[[bin]]
+name = "parquet-index"
+required-features = ["cli"]
+
 [[bench]]
 name = "arrow_writer"
 required-features = ["arrow"]
diff --git a/parquet/src/bin/parquet-index.rs b/parquet/src/bin/parquet-index.rs
new file mode 100644
index 000000000..6622783e6
--- /dev/null
+++ b/parquet/src/bin/parquet-index.rs
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary that prints the [page index] of a parquet file
+//!
+//! # Install
+//!
+//! `parquet-layout` can be installed using `cargo`:
+//! ```
+//! cargo install parquet --features=cli
+//! ```
+//! After this `parquet-index` should be available:
+//! ```
+//! parquet-index XYZ.parquet COLUMN_NAME
+//! ```
+//!
+//! The binary can also be built from the source code and run as follows:
+//! ```
+//! cargo run --features=cli --bin parquet-index XYZ.parquet COLUMN_NAME
+//!
+//! [page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
+
+use clap::Parser;
+use parquet::errors::{ParquetError, Result};
+use parquet::file::page_index::index::{Index, PageIndex};
+use parquet::file::reader::{FileReader, SerializedFileReader};
+use parquet::file::serialized_reader::ReadOptionsBuilder;
+use parquet::format::PageLocation;
+use std::fs::File;
+
+#[derive(Debug, Parser)]
+#[clap(author, version, about("Prints the page index of a parquet file"), long_about = None)]
+struct Args {
+    #[clap(help("Path to a parquet file"))]
+    file: String,
+
+    #[clap(help("Column name to print"))]
+    column: String,
+}
+
+impl Args {
+    fn run(&self) -> Result<()> {
+        let file = File::open(&self.file)?;
+        let options = ReadOptionsBuilder::new().with_page_index().build();
+        let reader = SerializedFileReader::new_with_options(file, options)?;
+
+        let schema = reader.metadata().file_metadata().schema_descr();
+        let column_idx = schema
+            .columns()
+            .iter()
+            .position(|x| x.name() == self.column.as_str())
+            .ok_or_else(|| {
+                ParquetError::General(format!("Failed to find column {}", self.column))
+            })?;
+
+        // Column index data for all row groups and columns
+        let column_index = reader
+            .metadata()
+            .page_indexes()
+            .ok_or_else(|| ParquetError::General("Column index not found".to_string()))?;
+
+        // Offset index data for all row groups and columns
+        let offset_index = reader
+            .metadata()
+            .offset_indexes()
+            .ok_or_else(|| ParquetError::General("Offset index not found".to_string()))?;
+
+        // Iterate through each row group
+        for (row_group_idx, ((column_indices, offset_indices), row_group)) in column_index
+            .iter()
+            .zip(offset_index)
+            .zip(reader.metadata().row_groups())
+            .enumerate()
+        {
+            println!("Row Group: {}", row_group_idx);
+            let offset_index = offset_indices.get(column_idx).ok_or_else(|| {
+                ParquetError::General(format!(
+                    "No offset index for row group {} column chunk {}",
+                    row_group_idx, column_idx
+                ))
+            })?;
+
+            let row_counts = compute_row_counts(offset_index, row_group.num_rows());
+            match &column_indices[column_idx] {
+                Index::NONE => println!("NO INDEX"),
+                Index::BOOLEAN(v) => print_index(&v.indexes, offset_index, &row_counts)?,
+                Index::INT32(v) => print_index(&v.indexes, offset_index, &row_counts)?,
+                Index::INT64(v) => print_index(&v.indexes, offset_index, &row_counts)?,
+                Index::INT96(v) => print_index(&v.indexes, offset_index, &row_counts)?,
+                Index::FLOAT(v) => print_index(&v.indexes, offset_index, &row_counts)?,
+                Index::DOUBLE(v) => print_index(&v.indexes, offset_index, &row_counts)?,
+                Index::BYTE_ARRAY(_) => println!("BYTE_ARRAY not supported"),
+                Index::FIXED_LEN_BYTE_ARRAY(_) => {
+                    println!("FIXED_LEN_BYTE_ARRAY not supported")
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
+/// Computes the number of rows in each page within a column chunk
+fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
+    if offset_index.is_empty() {
+        return vec![];
+    }
+
+    let mut last = offset_index[0].first_row_index;
+    let mut out = Vec::with_capacity(offset_index.len());
+    for o in offset_index.iter().skip(1) {
+        out.push(o.first_row_index - last);
+        last = o.first_row_index;
+    }
+    out.push(rows);
+    out
+}
+
+/// Prints index information for a single column chunk
+fn print_index<T: std::fmt::Display>(
+    column_index: &[PageIndex<T>],
+    offset_index: &[PageLocation],
+    row_counts: &[i64],
+) -> Result<()> {
+    if column_index.len() != offset_index.len() {
+        return Err(ParquetError::General(format!(
+            "Index length mismatch, got {} and {}",
+            column_index.len(),
+            offset_index.len()
+        )));
+    }
+
+    for (idx, ((c, o), row_count)) in column_index
+        .iter()
+        .zip(offset_index)
+        .zip(row_counts)
+        .enumerate()
+    {
+        print!(
+            "Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
+            idx, o.offset, o.compressed_page_size, row_count
+        );
+        match &c.min {
+            Some(m) => print!(", min {:>10}", m),
+            None => print!(", min {:>10}", "NONE"),
+        }
+
+        match &c.max {
+            Some(m) => print!(", max {:>10}", m),
+            None => print!(", max {:>10}", "NONE"),
+        }
+        println!()
+    }
+
+    Ok(())
+}
+
+fn main() -> Result<()> {
+    Args::parse().run()
+}