You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ya...@apache.org on 2024/01/24 05:47:21 UTC
(arrow-datafusion) branch main updated: feat: Support parquet bloom filter pruning for decimal128 (#8930)
This is an automated email from the ASF dual-hosted git repository.
yangjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 19ca7d2341 feat: Support parquet bloom filter pruning for decimal128 (#8930)
19ca7d2341 is described below
commit 19ca7d2341f8437b2bd40f8fcf29e9cf93327aa8
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Wed Jan 24 13:47:16 2024 +0800
feat: Support parquet bloom filter pruning for decimal128 (#8930)
* [Pruning] Support parquet bloom filter pruning for decimal128
* rebase
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---------
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../datasource/physical_plan/parquet/row_groups.rs | 80 ++++++++++++++++++----
datafusion/core/tests/parquet/mod.rs | 28 ++++++++
datafusion/core/tests/parquet/row_group_pruning.rs | 33 +++++++++
3 files changed, 127 insertions(+), 14 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index c519d41aad..fa9523a763 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -19,6 +19,8 @@ use arrow::{array::ArrayRef, datatypes::Schema};
use arrow_array::BooleanArray;
use arrow_schema::FieldRef;
use datafusion_common::{Column, ScalarValue};
+use parquet::basic::Type;
+use parquet::data_type::Decimal;
use parquet::file::metadata::ColumnChunkMetaData;
use parquet::schema::types::SchemaDescriptor;
use parquet::{
@@ -143,7 +145,10 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
continue;
}
};
- column_sbbf.insert(column_name.to_string(), bf);
+ let physical_type =
+ builder.parquet_schema().column(column_idx).physical_type();
+
+ column_sbbf.insert(column_name.to_string(), (bf, physical_type));
}
let stats = BloomFilterStatistics { column_sbbf };
@@ -169,8 +174,8 @@ pub(crate) async fn prune_row_groups_by_bloom_filters<
/// Implements `PruningStatistics` for Parquet Split Block Bloom Filters (SBBF)
struct BloomFilterStatistics {
- /// Maps column name to the parquet bloom filter
- column_sbbf: HashMap<String, Sbbf>,
+ /// Maps column name to the parquet bloom filter and parquet physical type
+ column_sbbf: HashMap<String, (Sbbf, Type)>,
}
impl PruningStatistics for BloomFilterStatistics {
@@ -200,7 +205,7 @@ impl PruningStatistics for BloomFilterStatistics {
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
- let sbbf = self.column_sbbf.get(column.name.as_str())?;
+ let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?;
// Bloom filters are probabilistic data structures that can return false
// positives (i.e. it might return true even if the value is not
@@ -209,16 +214,63 @@ impl PruningStatistics for BloomFilterStatistics {
let known_not_present = values
.iter()
- .map(|value| match value {
- ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
- ScalarValue::Boolean(Some(v)) => sbbf.check(v),
- ScalarValue::Float64(Some(v)) => sbbf.check(v),
- ScalarValue::Float32(Some(v)) => sbbf.check(v),
- ScalarValue::Int64(Some(v)) => sbbf.check(v),
- ScalarValue::Int32(Some(v)) => sbbf.check(v),
- ScalarValue::Int16(Some(v)) => sbbf.check(v),
- ScalarValue::Int8(Some(v)) => sbbf.check(v),
- _ => true,
+ .map(|value| {
+ match value {
+ ScalarValue::Utf8(Some(v)) => sbbf.check(&v.as_str()),
+ ScalarValue::Boolean(Some(v)) => sbbf.check(v),
+ ScalarValue::Float64(Some(v)) => sbbf.check(v),
+ ScalarValue::Float32(Some(v)) => sbbf.check(v),
+ ScalarValue::Int64(Some(v)) => sbbf.check(v),
+ ScalarValue::Int32(Some(v)) => sbbf.check(v),
+ ScalarValue::Int16(Some(v)) => sbbf.check(v),
+ ScalarValue::Int8(Some(v)) => sbbf.check(v),
+ ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
+ Type::INT32 => {
+ //https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
+ // All physical type are little-endian
+ if *p > 9 {
+ //DECIMAL can be used to annotate the following types:
+ //
+ // int32: for 1 <= precision <= 9
+ // int64: for 1 <= precision <= 18
+ return true;
+ }
+ let b = (*v as i32).to_le_bytes();
+ // Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
+ let decimal = Decimal::Int32 {
+ value: b,
+ precision: *p as i32,
+ scale: *s as i32,
+ };
+ sbbf.check(&decimal)
+ }
+ Type::INT64 => {
+ if *p > 18 {
+ return true;
+ }
+ let b = (*v as i64).to_le_bytes();
+ let decimal = Decimal::Int64 {
+ value: b,
+ precision: *p as i32,
+ scale: *s as i32,
+ };
+ sbbf.check(&decimal)
+ }
+ Type::FIXED_LEN_BYTE_ARRAY => {
+ // keep with from_bytes_to_i128
+ let b = v.to_be_bytes().to_vec();
+ // Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
+ let decimal = Decimal::Bytes {
+ value: b.into(),
+ precision: *p as i32,
+ scale: *s as i32,
+ };
+ sbbf.check(&decimal)
+ }
+ _ => true,
+ },
+ _ => true,
+ }
})
// The row group doesn't contain any of the values if
// all the checks are false
diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs
index 0602b4d4c5..b056db6a0b 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -66,7 +66,10 @@ enum Scenario {
Int32Range,
Float64,
Decimal,
+ DecimalBloomFilterInt32,
+ DecimalBloomFilterInt64,
DecimalLargePrecision,
+ DecimalLargePrecisionBloomFilter,
PeriodsInColumnNames,
}
@@ -549,6 +552,22 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2),
]
}
+ Scenario::DecimalBloomFilterInt32 => {
+ // decimal record batch
+ vec![
+ make_decimal_batch(vec![100, 200, 300, 400, 500], 6, 2),
+ make_decimal_batch(vec![100, 200, 300, 400, 600], 6, 2),
+ make_decimal_batch(vec![100, 200, 300, 400, 600], 6, 2),
+ ]
+ }
+ Scenario::DecimalBloomFilterInt64 => {
+ // decimal record batch
+ vec![
+ make_decimal_batch(vec![100, 200, 300, 400, 500], 9, 2),
+ make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
+ make_decimal_batch(vec![100, 200, 300, 400, 600], 9, 2),
+ ]
+ }
Scenario::DecimalLargePrecision => {
// decimal record batch with large precision,
// and the data will stored as FIXED_LENGTH_BYTE_ARRAY
@@ -558,6 +577,15 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 38, 2),
]
}
+ Scenario::DecimalLargePrecisionBloomFilter => {
+ // decimal record batch with large precision,
+ // and the data will stored as FIXED_LENGTH_BYTE_ARRAY
+ vec![
+ make_decimal_batch(vec![100000, 200000, 300000, 400000, 500000], 38, 5),
+ make_decimal_batch(vec![-100000, 200000, 300000, 400000, 600000], 38, 5),
+ make_decimal_batch(vec![100000, 200000, 300000, 400000, 600000], 38, 5),
+ ]
+ }
Scenario::PeriodsInColumnNames => {
vec![
// all frontend
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs
index c8cac5dd9b..449a311777 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -599,6 +599,39 @@ async fn prune_decimal_in_list() {
.with_expected_rows(6)
.test_row_group_prune()
.await;
+
+ // test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6}
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::DecimalBloomFilterInt32)
+ .with_query("SELECT * FROM t where decimal_col in (5)")
+ .with_expected_errors(Some(0))
+ .with_pruned_by_stats(Some(0))
+ .with_pruned_by_bloom_filter(Some(2))
+ .with_expected_rows(1)
+ .test_row_group_prune()
+ .await;
+
+ // test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6}
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::DecimalBloomFilterInt64)
+ .with_query("SELECT * FROM t where decimal_col in (5)")
+ .with_expected_errors(Some(0))
+ .with_pruned_by_stats(Some(0))
+ .with_pruned_by_bloom_filter(Some(2))
+ .with_expected_rows(1)
+ .test_row_group_prune()
+ .await;
+
+ // test data -> r1: {1,2,3,4,5}, r2: {1,2,3,4,6}, r3: {1,2,3,4,6}
+ RowGroupPruningTest::new()
+ .with_scenario(Scenario::DecimalLargePrecisionBloomFilter)
+ .with_query("SELECT * FROM t where decimal_col in (5)")
+ .with_expected_errors(Some(0))
+ .with_pruned_by_stats(Some(0))
+ .with_pruned_by_bloom_filter(Some(2))
+ .with_expected_rows(1)
+ .test_row_group_prune()
+ .await;
}
#[tokio::test]