You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/01/09 10:57:58 UTC
[arrow-rs] branch master updated: feat: add `parquet-rewrite` CLI (#3477)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 592d7a360 feat: add `parquet-rewrite` CLI (#3477)
592d7a360 is described below
commit 592d7a3601b1b7876ab5753abde66113f1a9dc23
Author: Marco Neumann <ma...@crepererum.net>
AuthorDate: Mon Jan 9 11:57:52 2023 +0100
feat: add `parquet-rewrite` CLI (#3477)
* feat: add `parquet-rewrite` CLI
Closes #3476.
* refactor: init ArrowWriter early
---
parquet/Cargo.toml | 4 +
parquet/src/bin/parquet-rewrite.rs | 293 +++++++++++++++++++++++++++++++++++++
2 files changed, 297 insertions(+)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 7a76ff64e..2aa744978 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -118,6 +118,10 @@ required-features = ["arrow"]
name = "parquet-read"
required-features = ["cli"]
+[[bin]]
+name = "parquet-rewrite"
+required-features = ["arrow", "cli"]
+
[[bin]]
name = "parquet-schema"
required-features = ["cli"]
diff --git a/parquet/src/bin/parquet-rewrite.rs b/parquet/src/bin/parquet-rewrite.rs
new file mode 100644
index 000000000..cd60225ca
--- /dev/null
+++ b/parquet/src/bin/parquet-rewrite.rs
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Binary file to rewrite parquet files.
+//!
+//! # Install
+//!
+//! `parquet-rewrite` can be installed using `cargo`:
+//! ```
+//! cargo install parquet --features=cli
+//! ```
+//! After this `parquet-rewrite` should be available:
+//! ```
+//! parquet-rewrite -i XYZ.parquet -o XYZ2.parquet
+//! ```
+//!
+//! The binary can also be built from the source code and run as follows:
+//! ```
+//! cargo run --features=cli --bin parquet-rewrite -- -i XYZ.parquet -o XYZ2.parquet
+//! ```
+
+use std::fs::File;
+
+use arrow_array::RecordBatchReader;
+use clap::{builder::PossibleValue, Parser, ValueEnum};
+use parquet::{
+ arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter},
+ basic::Compression,
+ file::{
+ properties::{EnabledStatistics, WriterProperties, WriterVersion},
+ reader::FileReader,
+ serialized_reader::SerializedFileReader,
+ },
+};
+
+#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
+enum CompressionArgs {
+ /// No compression.
+ None,
+
+ /// Snappy
+ Snappy,
+
+ /// GZip
+ Gzip,
+
+ /// LZO
+ Lzo,
+
+ /// Brotli
+ Brotli,
+
+ /// LZ4
+ Lz4,
+
+ /// Zstd
+ Zstd,
+
+ /// LZ4 Raw
+ Lz4Raw,
+}
+
+impl From<CompressionArgs> for Compression {
+ fn from(value: CompressionArgs) -> Self {
+ match value {
+ CompressionArgs::None => Self::UNCOMPRESSED,
+ CompressionArgs::Snappy => Self::SNAPPY,
+ CompressionArgs::Gzip => Self::GZIP,
+ CompressionArgs::Lzo => Self::LZO,
+ CompressionArgs::Brotli => Self::BROTLI,
+ CompressionArgs::Lz4 => Self::LZ4,
+ CompressionArgs::Zstd => Self::ZSTD,
+ CompressionArgs::Lz4Raw => Self::LZ4_RAW,
+ }
+ }
+}
+
+#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
+enum EnabledStatisticsArgs {
+ /// Compute no statistics
+ None,
+
+ /// Compute chunk-level statistics but not page-level
+ Chunk,
+
+ /// Compute page-level and chunk-level statistics
+ Page,
+}
+
+impl From<EnabledStatisticsArgs> for EnabledStatistics {
+ fn from(value: EnabledStatisticsArgs) -> Self {
+ match value {
+ EnabledStatisticsArgs::None => Self::None,
+ EnabledStatisticsArgs::Chunk => Self::Chunk,
+ EnabledStatisticsArgs::Page => Self::Page,
+ }
+ }
+}
+
+#[derive(Clone, Copy, Debug)]
+enum WriterVersionArgs {
+ Parquet1_0,
+ Parquet2_0,
+}
+
+impl ValueEnum for WriterVersionArgs {
+ fn value_variants<'a>() -> &'a [Self] {
+ &[Self::Parquet1_0, Self::Parquet2_0]
+ }
+
+ fn to_possible_value(&self) -> Option<PossibleValue> {
+ match self {
+ WriterVersionArgs::Parquet1_0 => Some(PossibleValue::new("1.0")),
+ WriterVersionArgs::Parquet2_0 => Some(PossibleValue::new("2.0")),
+ }
+ }
+}
+
+impl From<WriterVersionArgs> for WriterVersion {
+ fn from(value: WriterVersionArgs) -> Self {
+ match value {
+ WriterVersionArgs::Parquet1_0 => Self::PARQUET_1_0,
+ WriterVersionArgs::Parquet2_0 => Self::PARQUET_2_0,
+ }
+ }
+}
+
+#[derive(Debug, Parser)]
+#[clap(author, version, about("Read and write parquet file with potentially different settings"), long_about = None)]
+struct Args {
+ /// Path to input parquet file.
+ #[clap(short, long)]
+ input: String,
+
+ /// Path to output parquet file.
+ #[clap(short, long)]
+ output: String,
+
+ /// Compression used.
+ #[clap(long, value_enum)]
+ compression: Option<CompressionArgs>,
+
+ /// Sets maximum number of rows in a row group.
+ #[clap(long)]
+ max_row_group_size: Option<usize>,
+
+ /// Sets best effort maximum number of rows in a data page.
+ #[clap(long)]
+ data_page_row_count_limit: Option<usize>,
+
+ /// Sets best effort maximum size of a data page in bytes.
+ #[clap(long)]
+ data_pagesize_limit: Option<usize>,
+
+ /// Sets max statistics size for any column.
+ ///
+ /// Applicable only if statistics are enabled.
+ #[clap(long)]
+ max_statistics_size: Option<usize>,
+
+ /// Sets best effort maximum dictionary page size, in bytes.
+ #[clap(long)]
+ dictionary_pagesize_limit: Option<usize>,
+
+ /// Sets whether bloom filter is enabled for any column.
+ #[clap(long)]
+ bloom_filter_enabled: Option<bool>,
+
+ /// Sets bloom filter false positive probability (fpp) for any column.
+ #[clap(long)]
+ bloom_filter_fpp: Option<f64>,
+
+ /// Sets number of distinct values (ndv) for bloom filter for any column.
+ #[clap(long)]
+ bloom_filter_ndv: Option<u64>,
+
+ /// Sets flag to enable/disable dictionary encoding for any column.
+ #[clap(long)]
+ dictionary_enabled: Option<bool>,
+
+ /// Sets flag to enable/disable statistics for any column.
+ #[clap(long)]
+ statistics_enabled: Option<EnabledStatisticsArgs>,
+
+ /// Sets writer version.
+ #[clap(long)]
+ writer_version: Option<WriterVersionArgs>,
+}
+
+fn main() {
+ let args = Args::parse();
+
+ // read key-value metadata
+ let parquet_reader = SerializedFileReader::new(
+ File::open(&args.input).expect("Unable to open input file"),
+ )
+ .expect("Failed to create reader");
+ let kv_md = parquet_reader
+ .metadata()
+ .file_metadata()
+ .key_value_metadata()
+ .cloned();
+
+ // create actual parquet reader
+ let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(
+ File::open(args.input).expect("Unable to open input file"),
+ )
+ .expect("parquet open")
+ .build()
+ .expect("parquet open");
+
+ let mut writer_properties_builder =
+ WriterProperties::builder().set_key_value_metadata(kv_md);
+ if let Some(value) = args.compression {
+ writer_properties_builder =
+ writer_properties_builder.set_compression(value.into());
+ }
+ if let Some(value) = args.max_row_group_size {
+ writer_properties_builder =
+ writer_properties_builder.set_max_row_group_size(value);
+ }
+ if let Some(value) = args.data_page_row_count_limit {
+ writer_properties_builder =
+ writer_properties_builder.set_data_page_row_count_limit(value);
+ }
+ if let Some(value) = args.data_pagesize_limit {
+ writer_properties_builder =
+ writer_properties_builder.set_data_pagesize_limit(value);
+ }
+ if let Some(value) = args.dictionary_pagesize_limit {
+ writer_properties_builder =
+ writer_properties_builder.set_dictionary_pagesize_limit(value);
+ }
+ if let Some(value) = args.max_statistics_size {
+ writer_properties_builder =
+ writer_properties_builder.set_max_statistics_size(value);
+ }
+ if let Some(value) = args.bloom_filter_enabled {
+ writer_properties_builder =
+ writer_properties_builder.set_bloom_filter_enabled(value);
+
+ if value {
+ if let Some(value) = args.bloom_filter_fpp {
+ writer_properties_builder =
+ writer_properties_builder.set_bloom_filter_fpp(value);
+ }
+ if let Some(value) = args.bloom_filter_ndv {
+ writer_properties_builder =
+ writer_properties_builder.set_bloom_filter_ndv(value);
+ }
+ }
+ }
+ if let Some(value) = args.dictionary_enabled {
+ writer_properties_builder =
+ writer_properties_builder.set_dictionary_enabled(value);
+ }
+ if let Some(value) = args.statistics_enabled {
+ writer_properties_builder =
+ writer_properties_builder.set_statistics_enabled(value.into());
+ }
+ if let Some(value) = args.writer_version {
+ writer_properties_builder =
+ writer_properties_builder.set_writer_version(value.into());
+ }
+ let writer_properties = writer_properties_builder.build();
+ let mut parquet_writer = ArrowWriter::try_new(
+ File::create(&args.output).expect("Unable to open output file"),
+ parquet_reader.schema(),
+ Some(writer_properties),
+ )
+ .expect("create arrow writer");
+
+ for maybe_batch in parquet_reader {
+ let batch = maybe_batch.expect("reading batch");
+ parquet_writer.write(&batch).expect("writing data");
+ }
+
+ parquet_writer.close().expect("finalizing file");
+}