You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/03 12:17:00 UTC

[GitHub] [arrow] Dandandan commented on a change in pull request #9064: ARROW-11074: [Rust][DataFusion] Implement predicate push-down for parquet tables

Dandandan commented on a change in pull request #9064:
URL: https://github.com/apache/arrow/pull/9064#discussion_r550998137



##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -209,6 +251,479 @@ impl ParquetExec {
     }
 }
 
+#[derive(Debug, Clone)]
+/// Predicate builder used for generating of predicate functions, used to filter row group metadata
+pub struct PredicateExpressionBuilder {
+    parquet_schema: Schema,
+    predicate_expr: Arc<dyn PhysicalExpr>,
+    stat_column_req: Vec<(String, StatisticsType, Field)>,
+}
+
+impl PredicateExpressionBuilder {
+    /// Try to create a new instance of PredicateExpressionBuilder
+    pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> {
+        // build predicate expression once
+        let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new();
+        let predicate_expr =
+            build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?;
+
+        Ok(Self {
+            parquet_schema,
+            predicate_expr,
+            stat_column_req,
+        })
+    }
+
+    /// Generate a predicate function used to filter row group metadata
+    pub fn build_row_group_predicate(
+        &self,
+        row_group_metadata: &[RowGroupMetaData],
+    ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> {
+        // build statistics record batch
+        let predicate_result = build_row_group_record_batch(
+            row_group_metadata,
+            &self.parquet_schema,
+            &self.stat_column_req,
+        )
+        .and_then(|statistics_batch| {
+            // execute predicate expression
+            self.predicate_expr.evaluate(&statistics_batch)
+        })
+        .and_then(|v| match v {
+            ColumnarValue::Array(array) => Ok(array),
+            ColumnarValue::Scalar(_) => Err(DataFusionError::Plan(
+                "predicate expression didn't return an array".to_string(),
+            )),
+        });
+
+        let predicate_array = match predicate_result {
+            Ok(array) => array,
+            _ => return Box::new(|_r, _i| true),
+        };
+
+        let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>();
+        match predicate_array {
+            // return row group predicate function
+            Some(array) => {
+                let predicate_values =
+                    array.iter().map(|x| x.unwrap_or(false)).collect::<Vec<_>>();
+                Box::new(move |_, i| predicate_values[i])
+            }
+            // predicate result is not a BooleanArray
+            _ => Box::new(|_r, _i| true),
+        }
+    }
+}
+
+fn build_row_group_record_batch(
+    row_groups: &[RowGroupMetaData],
+    parquet_schema: &Schema,
+    stat_column_req: &Vec<(String, StatisticsType, Field)>,
+) -> Result<RecordBatch> {
+    let mut fields = Vec::<Field>::new();
+    let mut arrays = Vec::<ArrayRef>::new();
+    for (column_name, statistics_type, stat_field) in stat_column_req {
+        if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) {
+            let statistics = row_groups
+                .iter()
+                .map(|g| g.column(column_index).statistics())
+                .collect::<Vec<_>>();
+            let array = build_statistics_array(
+                &statistics,
+                statistics_type,
+                stat_field.data_type(),
+            );
+            fields.push(stat_field.clone());
+            arrays.push(array);
+        }
+    }
+    let schema = Arc::new(Schema::new(fields));
+    RecordBatch::try_new(schema, arrays)
+        .map_err(|err| DataFusionError::Plan(err.to_string()))
+}
+
+struct PhysicalExpressionBuilder<'a> {
+    column_name: String,
+    column_expr: &'a Expr,
+    scalar_expr: &'a Expr,
+    parquet_field: &'a Field,
+    statistics_fields: Vec<Field>,
+    stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    reverse_operator: bool,
+}
+
+impl<'a> PhysicalExpressionBuilder<'a> {
+    fn try_new(
+        left: &'a Expr,
+        right: &'a Expr,
+        parquet_schema: &'a Schema,
+        stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>,
+    ) -> Result<Self> {
+        // find column name; input could be a more complicated expression
+        let mut left_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(left, &mut left_columns)?;
+        let mut right_columns = HashSet::<String>::new();
+        utils::expr_to_column_names(right, &mut right_columns)?;
+        let (column_expr, scalar_expr, column_names, reverse_operator) =
+            match (left_columns.len(), right_columns.len()) {
+                (1, 0) => (left, right, left_columns, false),
+                (0, 1) => (right, left, right_columns, true),
+                _ => {
+                    // if more than one column used in expression - not supported
+                    return Err(DataFusionError::Plan(
+                        "Multi-column expressions are not currently supported"
+                            .to_string(),
+                    ));
+                }
+            };
+        let column_name = column_names.iter().next().unwrap().clone();
+        let field = match parquet_schema.column_with_name(&column_name) {
+            Some((_, f)) => f,
+            _ => {
+                // field not found in parquet schema
+                return Err(DataFusionError::Plan(
+                    "Field not found in parquet schema".to_string(),
+                ));
+            }
+        };
+
+        Ok(Self {
+            column_name,
+            column_expr,
+            scalar_expr,
+            parquet_field: field,
+            statistics_fields: Vec::new(),
+            stat_column_req,
+            reverse_operator,
+        })
+    }
+
+    fn correct_operator(&self, op: &Operator) -> Operator {
+        if !self.reverse_operator {
+            return op.clone();
+        }
+
+        match op {
+            Operator::Lt => Operator::Gt,
+            Operator::Gt => Operator::Lt,
+            Operator::LtEq => Operator::GtEq,
+            Operator::GtEq => Operator::LtEq,
+            _ => op.clone(),
+        }
+    }
+
+    fn column_expr(&self) -> &Expr {
+        self.column_expr
+    }
+
+    fn scalar_expr(&self) -> &Expr {
+        self.scalar_expr
+    }
+
+    fn column_name(&self) -> &String {
+        &self.column_name
+    }
+
+    fn is_stat_column_missing(&self, statistics_type: &StatisticsType) -> bool {
+        self.stat_column_req
+            .iter()
+            .filter(|(c, t, _f)| c == &self.column_name && t == statistics_type)
+            .count()
+            == 0
+    }
+
+    fn add_min_column(&mut self) -> String {
+        let min_field = Field::new(
+            format!("{}_min", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let min_column_name = min_field.name().clone();
+        self.statistics_fields.push(min_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Min) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Min,
+                min_field,
+            ));
+        }
+        min_column_name
+    }
+
+    fn add_max_column(&mut self) -> String {
+        let max_field = Field::new(
+            format!("{}_max", self.column_name).as_str(),
+            self.parquet_field.data_type().clone(),
+            self.parquet_field.is_nullable(),
+        );
+        let max_column_name = max_field.name().clone();
+        self.statistics_fields.push(max_field.clone());
+        if self.is_stat_column_missing(&StatisticsType::Max) {
+            // only add statistics column if not previously added
+            self.stat_column_req.push((
+                self.column_name.to_string(),
+                StatisticsType::Max,
+                max_field,
+            ));
+        }
+        max_column_name
+    }
+
+    fn build(&self, statistics_expr: &Expr) -> Result<Arc<dyn PhysicalExpr>> {
+        let execution_context_state = ExecutionContextState {
+            datasources: HashMap::new(),
+            scalar_functions: HashMap::new(),
+            var_provider: HashMap::new(),
+            aggregate_functions: HashMap::new(),
+            config: ExecutionConfig::new(),
+        };
+        let schema = Schema::new(self.statistics_fields.clone());
+        DefaultPhysicalPlanner::default().create_physical_expr(
+            statistics_expr,
+            &schema,
+            &execution_context_state,
+        )
+    }
+}
+
+/// Translate logical filter expression into parquet statistics physical filter expression
+fn build_predicate_expression(
+    expr: &Expr,
+    parquet_schema: &Schema,
+    stat_column_req: &mut Vec<(String, StatisticsType, Field)>,
+) -> Result<Arc<dyn PhysicalExpr>> {
+    // predicate expression can only be a binary expression
+    let (left, op, right) = match expr {
+        Expr::BinaryExpr { left, op, right } => (left, op, right),
+        _ => {
+            return Ok(expressions::lit(ScalarValue::Boolean(Some(true))));
+        }
+    };
+
+    if op == &Operator::And || op == &Operator::Or {

Review comment:
       I think we should add a `Copy` to the `Operator` enum so we can dereference `op`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org