You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/03 13:52:57 UTC

[GitHub] [arrow] alamb commented on a change in pull request #9064: ARROW-11074: [Rust][DataFusion] Implement predicate push-down for parquet tables

alamb commented on a change in pull request #9064:
URL: https://github.com/apache/arrow/pull/9064#discussion_r551006563



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,479 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {

Review comment:
       nit: probably this doesn't need to be a `pub` struct given that it seems to be tied to the parquet scan implementation

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,479 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: &Operator) -> Operator {
+        if !self.reverse_operator {
+            return op.clone();
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op.clone(),
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: &StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(
+    expr: &Expr,
+    parquet_schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, op, right),
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+
+    if op == &Operator::And || op == &Operator::Or {
+        let left_expr =
+            build_predicate_expression(left, parquet_schema, stat_column_req)?;
+        let right_expr =
+            build_predicate_expression(right, parquet_schema, stat_column_req)?;
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let result = expressions::binary(left_expr, op.clone(), right_expr, &stat_schema);
+        return result;
+    }
+
+    let mut expr_builder = match PhysicalExpressionBuilder::try_new(
+        left,
+        right,
+        parquet_schema,
+        stat_column_req,
+    ) {
+        Ok(builder) => builder,
+        // allow partial failure in predicate expression generation
+        // this can still produce a useful predicate when multiple conditions are joined using AND
+        Err(_) => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    let corrected_op = expr_builder.correct_operator(op);
+    let statistics_expr = match corrected_op {
+        Operator::Eq => {
+            let min_column_name = expr_builder.add_min_column();
+            let max_column_name = expr_builder.add_max_column();
+            // column = literal => column = (min, max) => min <= literal && literal <= max

Review comment:
       these comments are quite helpful. Thank you

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,479 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: &Operator) -> Operator {
+        if !self.reverse_operator {
+            return op.clone();
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op.clone(),
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: &StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(
+    expr: &Expr,
+    parquet_schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, op, right),
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+
+    if op == &Operator::And || op == &Operator::Or {
+        let left_expr =
+            build_predicate_expression(left, parquet_schema, stat_column_req)?;
+        let right_expr =
+            build_predicate_expression(right, parquet_schema, stat_column_req)?;
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let result = expressions::binary(left_expr, op.clone(), right_expr, &stat_schema);
+        return result;
+    }
+
+    let mut expr_builder = match PhysicalExpressionBuilder::try_new(
+        left,
+        right,
+        parquet_schema,
+        stat_column_req,
+    ) {
+        Ok(builder) => builder,
+        // allow partial failure in predicate expression generation
+        // this can still produce a useful predicate when multiple conditions are joined using AND
+        Err(_) => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    let corrected_op = expr_builder.correct_operator(op);
+    let statistics_expr = match corrected_op {
+        Operator::Eq => {
+            let min_column_name = expr_builder.add_min_column();
+            let max_column_name = expr_builder.add_max_column();
+            // column = literal => column = (min, max) => min <= literal && literal <= max
+            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
+            expr_builder
+                .scalar_expr()
+                .gt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    min_column_name.as_str(),
+                )?)
+                .and(expr_builder.scalar_expr().lt_eq(rewrite_column_expr(
+                    expr_builder.column_expr(),
+                    expr_builder.column_name(),
+                    max_column_name.as_str(),
+                )?))
+        }
+        Operator::Gt => {
+            let max_column_name = expr_builder.add_max_column();
+            // column > literal => (min, max) > literal => max > literal
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt(expr_builder.scalar_expr().clone())
+        }
+        Operator::GtEq => {
+            // column >= literal => (min, max) >= literal => max >= literal
+            let max_column_name = expr_builder.add_max_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                max_column_name.as_str(),
+            )?
+            .gt_eq(expr_builder.scalar_expr().clone())
+        }
+        Operator::Lt => {
+            // (input < input) => (predicate)
+            // column < literal => (min, max) < literal => min < literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt(expr_builder.scalar_expr().clone())
+        }
+        Operator::LtEq => {
+            // (input <= input) => (predicate)
+            // column <= literal => (min, max) <= literal => min <= literal
+            let min_column_name = expr_builder.add_min_column();
+            rewrite_column_expr(
+                expr_builder.column_expr(),
+                expr_builder.column_name(),
+                min_column_name.as_str(),
+            )?
+            .lt_eq(expr_builder.scalar_expr().clone())
+        }
+        // other expressions are not supported
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    expr_builder.build(&statistics_expr)
+}
+
+/// replaces a column with an old name with a new name in an expression
+fn rewrite_column_expr(
+    expr: &Expr,
+    column_old_name: &str,
+    column_new_name: &str,
+) -> Result<Expr> {
+    let expressions = utils::expr_sub_expressions(&expr)?;
+    let expressions = expressions
+        .iter()
+        .map(|e| rewrite_column_expr(e, column_old_name, column_new_name))
+        .collect::<Result<Vec<_>>>()?;
+
+    if let Expr::Column(name) = expr {
+        if name == column_old_name {
+            return Ok(Expr::Column(column_new_name.to_string()));
+        }
+    }
+    utils::rewrite_expression(&expr, &expressions)
+}
+
+#[derive(Debug, Clone, PartialEq)]
+enum StatisticsType {
+    Min,
+    Max,
+}
+
+fn build_null_array(data_type: &DataType, length: usize) -> ArrayRef {

Review comment:
       I wonder if you could use `NullArray` here instead: https://github.com/apache/arrow/blob//rust/arrow/src/array/null.rs

##########
File path: rust/parquet/src/file/serialized_reader.rs
##########
@@ -137,6 +137,22 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
             metadata,
         })
     }
+
+    pub fn filter_row_groups(

Review comment:
       this is a fancy way to filter out the row groups -- it is probably worth adding documentation here.
   
   I don't know if there are assumptions in the parquet reader code that the row group metadata matches what was read from the file or not
   
   I suggest you consider filtering the row groups at the DataFusion (aka skip them in the datafusion physical operator) level rather than in the parquet reader level and avoid that potential problem completely. 

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,479 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: &Operator) -> Operator {
+        if !self.reverse_operator {
+            return op.clone();
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op.clone(),
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: &StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(

Review comment:
       I suggest copying the (nicely written) summary of your algorithm from this PR' description somewhere into this file
   
   It is probably good to mention the assumptions of this predicate expression -- which I think is that it will return `true` if a rowgroup *may* contain rows that match the predicate, and will return `false` if and only if all rows in the row group can *not* match the predicate.
   
   The idea of creating arrays of `(col1_min, col1_max, col2_min, col2_max ...)` is clever (and could likely be applied to sources other than parquet files). 

##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,479 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: &Operator) -> Operator {
+        if !self.reverse_operator {
+            return op.clone();
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op.clone(),
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: &StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(
+    expr: &Expr,
+    parquet_schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, op, right),
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+
+    if op == &Operator::And || op == &Operator::Or {
+        let left_expr =
+            build_predicate_expression(left, parquet_schema, stat_column_req)?;
+        let right_expr =
+            build_predicate_expression(right, parquet_schema, stat_column_req)?;
+        let stat_fields = stat_column_req
+            .iter()
+            .map(|(_, _, f)| f.clone())
+            .collect::<Vec<_>>();
+        let stat_schema = Schema::new(stat_fields);
+        let result = expressions::binary(left_expr, op.clone(), right_expr, &stat_schema);
+        return result;
+    }
+
+    let mut expr_builder = match PhysicalExpressionBuilder::try_new(
+        left,
+        right,
+        parquet_schema,
+        stat_column_req,
+    ) {
+        Ok(builder) => builder,
+        // allow partial failure in predicate expression generation
+        // this can still produce a useful predicate when multiple conditions are joined using AND
+        Err(_) => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+    let corrected_op = expr_builder.correct_operator(op);
+    let statistics_expr = match corrected_op {
+        Operator::Eq => {
+            let min_column_name = expr_builder.add_min_column();
+            let max_column_name = expr_builder.add_max_column();
+            // column = literal => column = (min, max) => min <= literal && literal <= max
+            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
+            expr_builder
+                .scalar_expr()
+                .gt_eq(rewrite_column_expr(

Review comment:
       stylistically, you might be able to hoist out the repeated calls to 
   
   ```
   rewrite_column_expr(
                       expr_builder.column_expr(),
                       expr_builder.column_name(),
                       max_column_name.as_str()
   ```
   
   and 
   
   ```
   rewrite_column_expr(
                       expr_builder.column_expr(),
                       expr_builder.column_name(),
                       min_column_name.as_str()
   ```
   
    by evaluating them once before the `match` expression:
   
   ```
   let min_col_expr = rewrite_column_expr(
                       expr_builder.column_expr(),
                       expr_builder.column_name(),
                       min_column_name.as_str());
   
   let max_col_expr = rewrite_column_expr(
                       expr_builder.column_expr(),
                       expr_builder.column_name(),
                       max_column_name.as_str())
   ```
   
   But they way you have it works well too




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org