You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/04 18:09:56 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #9064: ARROW-11074: [Rust][DataFusion] Implement predicate push-down for parquet tables

jorgecarleitao commented on a change in pull request #9064:
URL: https://github.com/apache/arrow/pull/9064#discussion_r551156780



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,477 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),

Review comment:
       I would error or `panic!` here or before that, or validate that the predicate is a boolean array.

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,477 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                *statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: Operator) -> Operator {
+        if !self.reverse_operator {
+            return op;
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op,
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(
+    expr: &Expr,
+    parquet_schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, *op, right),
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+
+    if op == Operator::And || op == Operator::Or {
+        let left_expr =
+            build_predicate_expression(left, parquet_schema, stat_column_req)?;
+        let right_expr =
+            build_predicate_expression(right, parquet_schema, stat_column_req)?;
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let result = expressions::binary(left_expr, op.clone(), right_expr, &stat_schema);
+        return result;
+    }
+
+    let mut expr_builder = match PhysicalExpressionBuilder::try_new(
+        left,
+        right,
+        parquet_schema,
+        stat_column_req,
+    ) {
+        Ok(builder) => builder,
+        // allow partial failure in predicate expression generation
+        // this can still produce a useful predicate when multiple conditions are joined using AND
+        Err(_) => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    let corrected_op = expr_builder.correct_operator(op);
+    let statistics_expr = match corrected_op {
+        Operator::Eq => {
+            let min_column_name = expr_builder.add_min_column();
+            let max_column_name = expr_builder.add_max_column();
+            // column = literal => (min, max) = literal => min <= literal && literal <= max
+            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
+            expr_builder
+                .scalar_expr()
+                .gt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    min_column_name.as_str(),
+                )?)
+                .and(expr_builder.scalar_expr().lt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    max_column_name.as_str(),
+                )?))
+        }
+        Operator::Gt => {
+            let max_column_name = expr_builder.add_max_column();
+            // column > literal => (min, max) > literal => max > literal
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt(expr_builder.scalar_expr().clone())
+        }
+        Operator::GtEq => {
+            // column >= literal => (min, max) >= literal => max >= literal
+            let max_column_name = expr_builder.add_max_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt_eq(expr_builder.scalar_expr().clone())
+        }
+        Operator::Lt => {
+            // column < literal => (min, max) < literal => min < literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt(expr_builder.scalar_expr().clone())
+        }
+        Operator::LtEq => {
+            // column <= literal => (min, max) <= literal => min <= literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt_eq(expr_builder.scalar_expr().clone())
+        }
+        // other expressions are not supported
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    expr_builder.build(&statistics_expr)
+}
+
+/// replaces a column with an old name with a new name in an expression
+fn rewrite_column_expr(
+    expr: &Expr,
+    column_old_name: &str,
+    column_new_name: &str,
+) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite_column_expr(e, column_old_name, column_new_name))
+        .collect::<Result<Vec<_>>>()?;
+
+    if let Expr::Column(name) = expr {
+        if name == column_old_name {
+            return Ok(Expr::Column(column_new_name.to_string()));
+        }
+    }
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+enum StatisticsType {
+    Min,
+    Max,
+}
+
+fn build_null_array(data_type: &DataType, length: usize) -> ArrayRef {
+    let mut null_bitmap_builder = BooleanBufferBuilder::new(length);
+    null_bitmap_builder.append_n(length, false);
+    let array_data = ArrayData::builder(data_type.clone())
+        .len(length)
+        .null_bit_buffer(null_bitmap_builder.finish())
+        .build();
+    make_array(array_data)
+}
+
+fn build_statistics_array(

Review comment:
       I would have split this in N functions, one per array type (via generics), and write `build_statistics_array` as simply `match data_type { each implementation }`.
   
   This would follow the convention in other places and reduces the risk of mistakes, particularly in matching datatypes.

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,477 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                *statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: Operator) -> Operator {
+        if !self.reverse_operator {
+            return op;
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op,
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(
+    expr: &Expr,
+    parquet_schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, *op, right),
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+
+    if op == Operator::And || op == Operator::Or {
+        let left_expr =
+            build_predicate_expression(left, parquet_schema, stat_column_req)?;
+        let right_expr =
+            build_predicate_expression(right, parquet_schema, stat_column_req)?;
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let result = expressions::binary(left_expr, op.clone(), right_expr, &stat_schema);
+        return result;
+    }
+
+    let mut expr_builder = match PhysicalExpressionBuilder::try_new(
+        left,
+        right,
+        parquet_schema,
+        stat_column_req,
+    ) {
+        Ok(builder) => builder,
+        // allow partial failure in predicate expression generation
+        // this can still produce a useful predicate when multiple conditions are joined using AND
+        Err(_) => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    let corrected_op = expr_builder.correct_operator(op);
+    let statistics_expr = match corrected_op {
+        Operator::Eq => {
+            let min_column_name = expr_builder.add_min_column();
+            let max_column_name = expr_builder.add_max_column();
+            // column = literal => (min, max) = literal => min <= literal && literal <= max
+            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
+            expr_builder
+                .scalar_expr()
+                .gt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    min_column_name.as_str(),
+                )?)
+                .and(expr_builder.scalar_expr().lt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    max_column_name.as_str(),
+                )?))
+        }
+        Operator::Gt => {
+            let max_column_name = expr_builder.add_max_column();
+            // column > literal => (min, max) > literal => max > literal
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt(expr_builder.scalar_expr().clone())
+        }
+        Operator::GtEq => {
+            // column >= literal => (min, max) >= literal => max >= literal
+            let max_column_name = expr_builder.add_max_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt_eq(expr_builder.scalar_expr().clone())
+        }
+        Operator::Lt => {
+            // column < literal => (min, max) < literal => min < literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt(expr_builder.scalar_expr().clone())
+        }
+        Operator::LtEq => {
+            // column <= literal => (min, max) <= literal => min <= literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt_eq(expr_builder.scalar_expr().clone())
+        }
+        // other expressions are not supported
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    expr_builder.build(&statistics_expr)
+}
+
+/// replaces a column with an old name with a new name in an expression
+fn rewrite_column_expr(
+    expr: &Expr,
+    column_old_name: &str,
+    column_new_name: &str,
+) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite_column_expr(e, column_old_name, column_new_name))
+        .collect::<Result<Vec<_>>>()?;
+
+    if let Expr::Column(name) = expr {
+        if name == column_old_name {
+            return Ok(Expr::Column(column_new_name.to_string()));
+        }
+    }
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+enum StatisticsType {
+    Min,
+    Max,
+}
+
+fn build_null_array(data_type: &DataType, length: usize) -> ArrayRef {
+    let mut null_bitmap_builder = BooleanBufferBuilder::new(length);
+    null_bitmap_builder.append_n(length, false);
+    let array_data = ArrayData::builder(data_type.clone())
+        .len(length)
+        .null_bit_buffer(null_bitmap_builder.finish())
+        .build();
+    make_array(array_data)
+}
+
+fn build_statistics_array(
+    statistics: &[Option<&ParquetStatistics>],
+    statistics_type: StatisticsType,
+    data_type: &DataType,
+) -> ArrayRef {
+    let statistics_count = statistics.len();
+    // this should be handled by the condition below
+    // if statistics_count < 1 {
+    //     return build_null_array(data_type, 0);
+    // }
+
+    let first_group_stats = statistics.iter().find(|s| s.is_some());
+    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
+        // found first row group with statistics defined
+        statistics
+    } else {
+        // no row group has statistics defined
+        return build_null_array(data_type, statistics_count);
+    };
+
+    let (data_size, arrow_type) = match first_group_stats {
+        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
+        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
+        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
+        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
+        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
+            (0, DataType::Utf8)
+        }
+        _ => {
+            // type of statistics not supported
+            return build_null_array(data_type, statistics_count);
+        }
+    };
+
+    let statistics = statistics.iter().map(|s| {
+        s.filter(|s| s.has_min_max_set())
+            .map(|s| match statistics_type {
+                StatisticsType::Min => s.min_bytes(),
+                StatisticsType::Max => s.max_bytes(),
+            })
+    });
+
+    if arrow_type == DataType::Utf8 {
+        let data_size = statistics
+            .clone()
+            .map(|x| x.map(|b| b.len()).unwrap_or(0))
+            .sum();
+        let mut builder =
+            arrow::array::StringBuilder::with_capacity(statistics_count, data_size);
+        let string_statistics =
+            statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok()));
+        for maybe_string in string_statistics {
+            match maybe_string {
+                Some(string_value) => builder.append_value(string_value).unwrap(),
+                None => builder.append_null().unwrap(),
+            };
+        }
+        return Arc::new(builder.finish());
+    }
+
+    let mut data_buffer = MutableBuffer::new(statistics_count * data_size);
+    let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count);
+    let mut null_count = 0;
+    for s in statistics {
+        if let Some(stat_data) = s {
+            bitmap_builder.append(true);
+            data_buffer.extend_from_slice(stat_data);
+        } else {
+            bitmap_builder.append(false);
+            data_buffer.resize(data_buffer.len() + data_size);
+            null_count += 1;
+        }
+    }
+
+    let mut builder = ArrayData::builder(arrow_type)
+        .len(statistics_count)
+        .add_buffer(data_buffer.into());
+    if null_count > 0 {
+        builder = builder.null_bit_buffer(bitmap_builder.finish());
+    }
+    let array_data = builder.build();
+    let statistics_array = make_array(array_data);
+    if statistics_array.data_type() == data_type {
+        return statistics_array;
+    }

Review comment:
       This is only valid for primitive types. In general, I would recommend using `PrimitiveArray<T>::from_iter`, `BooleanArray::from_iter` and `StringArray::from_iter`.  Using `MutableBuffer` in this high level is prone to errors. E.g. if we add a filter for boolean types (e.g. eq and neq), this does not panic but the array is not valid (as the size is measured in bits, not bytes).

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,477 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                *statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: Operator) -> Operator {
+        if !self.reverse_operator {
+            return op;
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op,
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(
+    expr: &Expr,
+    parquet_schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, *op, right),
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+
+    if op == Operator::And || op == Operator::Or {
+        let left_expr =
+            build_predicate_expression(left, parquet_schema, stat_column_req)?;
+        let right_expr =
+            build_predicate_expression(right, parquet_schema, stat_column_req)?;
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let result = expressions::binary(left_expr, op.clone(), right_expr, &stat_schema);
+        return result;
+    }
+
+    let mut expr_builder = match PhysicalExpressionBuilder::try_new(
+        left,
+        right,
+        parquet_schema,
+        stat_column_req,
+    ) {
+        Ok(builder) => builder,
+        // allow partial failure in predicate expression generation
+        // this can still produce a useful predicate when multiple conditions are joined using AND
+        Err(_) => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    let corrected_op = expr_builder.correct_operator(op);
+    let statistics_expr = match corrected_op {
+        Operator::Eq => {
+            let min_column_name = expr_builder.add_min_column();
+            let max_column_name = expr_builder.add_max_column();
+            // column = literal => (min, max) = literal => min <= literal && literal <= max
+            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
+            expr_builder
+                .scalar_expr()
+                .gt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    min_column_name.as_str(),
+                )?)
+                .and(expr_builder.scalar_expr().lt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    max_column_name.as_str(),
+                )?))
+        }
+        Operator::Gt => {
+            let max_column_name = expr_builder.add_max_column();
+            // column > literal => (min, max) > literal => max > literal
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt(expr_builder.scalar_expr().clone())
+        }
+        Operator::GtEq => {
+            // column >= literal => (min, max) >= literal => max >= literal
+            let max_column_name = expr_builder.add_max_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt_eq(expr_builder.scalar_expr().clone())
+        }
+        Operator::Lt => {
+            // column < literal => (min, max) < literal => min < literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt(expr_builder.scalar_expr().clone())
+        }
+        Operator::LtEq => {
+            // column <= literal => (min, max) <= literal => min <= literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt_eq(expr_builder.scalar_expr().clone())
+        }
+        // other expressions are not supported
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    expr_builder.build(&statistics_expr)
+}
+
+/// replaces a column with an old name with a new name in an expression
+fn rewrite_column_expr(
+    expr: &Expr,
+    column_old_name: &str,
+    column_new_name: &str,
+) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite_column_expr(e, column_old_name, column_new_name))
+        .collect::<Result<Vec<_>>>()?;
+
+    if let Expr::Column(name) = expr {
+        if name == column_old_name {
+            return Ok(Expr::Column(column_new_name.to_string()));
+        }
+    }
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+#[derive(Debug, Copy, Clone, PartialEq)]
+enum StatisticsType {
+    Min,
+    Max,
+}
+
+fn build_null_array(data_type: &DataType, length: usize) -> ArrayRef {
+    let mut null_bitmap_builder = BooleanBufferBuilder::new(length);
+    null_bitmap_builder.append_n(length, false);
+    let array_data = ArrayData::builder(data_type.clone())
+        .len(length)
+        .null_bit_buffer(null_bitmap_builder.finish())
+        .build();
+    make_array(array_data)
+}
+
+fn build_statistics_array(
+    statistics: &[Option<&ParquetStatistics>],
+    statistics_type: StatisticsType,
+    data_type: &DataType,
+) -> ArrayRef {
+    let statistics_count = statistics.len();
+    // this should be handled by the condition below
+    // if statistics_count < 1 {
+    //     return build_null_array(data_type, 0);
+    // }
+
+    let first_group_stats = statistics.iter().find(|s| s.is_some());
+    let first_group_stats = if let Some(Some(statistics)) = first_group_stats {
+        // found first row group with statistics defined
+        statistics
+    } else {
+        // no row group has statistics defined
+        return build_null_array(data_type, statistics_count);
+    };
+
+    let (data_size, arrow_type) = match first_group_stats {
+        ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32),
+        ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64),
+        ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32),
+        ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64),
+        ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => {
+            (0, DataType::Utf8)
+        }
+        _ => {
+            // type of statistics not supported
+            return build_null_array(data_type, statistics_count);
+        }
+    };
+
+    let statistics = statistics.iter().map(|s| {
+        s.filter(|s| s.has_min_max_set())
+            .map(|s| match statistics_type {
+                StatisticsType::Min => s.min_bytes(),
+                StatisticsType::Max => s.max_bytes(),
+            })
+    });
+
+    if arrow_type == DataType::Utf8 {

Review comment:
       I would use a match, as this is a bit brittle against matching specific datatypes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org