You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/05/29 17:27:13 UTC

[arrow-datafusion] 01/02: Add tokomak optimizer

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch tokomak_optimizer
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git

commit e637bd1ab2b115a37620cea4e94faaf7b947533c
Author: Daniel Heres <da...@gmail.com>
AuthorDate: Sat May 29 19:24:51 2021 +0200

    Add tokomak optimizer
---
 datafusion/Cargo.toml                      |   1 +
 datafusion/src/execution/context.rs        |   3 +-
 datafusion/src/optimizer/mod.rs            |   1 +
 datafusion/src/optimizer/simplification.rs | 467 +++++++++++++++++++++++++++++
 4 files changed, 471 insertions(+), 1 deletion(-)

diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 0668ec0..7bc25b4 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -53,6 +53,7 @@ paste = "^1.0"
 num_cpus = "1.13.0"
 chrono = "0.4"
 async-trait = "0.1.41"
+egg = "0.6.0"
 futures = "0.3"
 pin-project-lite= "^0.2.0"
 tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index cfd3b71..8a005ad 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -22,7 +22,7 @@ use crate::{
         information_schema::CatalogWithInformationSchema,
     },
     optimizer::{
-        eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder,
+        eliminate_limit::EliminateLimit, hash_build_probe_order::HashBuildProbeOrder,simplification::Tokomak
     },
     physical_optimizer::optimizer::PhysicalOptimizerRule,
 };
@@ -648,6 +648,7 @@ impl ExecutionConfig {
             concurrency: num_cpus::get(),
             batch_size: 8192,
             optimizers: vec![
+                Arc::new(Tokomak::new()),
                 Arc::new(ConstantFolding::new()),
                 Arc::new(EliminateLimit::new()),
                 Arc::new(ProjectionPushDown::new()),
diff --git a/datafusion/src/optimizer/mod.rs b/datafusion/src/optimizer/mod.rs
index 2fb8a3d..018f5ef 100644
--- a/datafusion/src/optimizer/mod.rs
+++ b/datafusion/src/optimizer/mod.rs
@@ -25,4 +25,5 @@ pub mod hash_build_probe_order;
 pub mod limit_push_down;
 pub mod optimizer;
 pub mod projection_push_down;
+pub mod simplification;
 pub mod utils;
diff --git a/datafusion/src/optimizer/simplification.rs b/datafusion/src/optimizer/simplification.rs
new file mode 100644
index 0000000..51c8a5d
--- /dev/null
+++ b/datafusion/src/optimizer/simplification.rs
@@ -0,0 +1,467 @@
+use std::vec;
+
+use crate::{
+    logical_plan::LogicalPlan, optimizer::optimizer::OptimizerRule, scalar::ScalarValue,
+};
+use crate::{logical_plan::Operator, optimizer::utils};
+
+use crate::error::Result as DFResult;
+use crate::logical_plan::Expr;
+use crate::execution::context::ExecutionProps;
+use egg::{rewrite as rw, *};
+
+pub struct Tokomak {}
+
+impl Tokomak {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+pub type EGraph = egg::EGraph<Tokomak, ()>;
+
+pub fn rules() -> Vec<Rewrite<TokomakExpr, ()>> {
+    return vec![
+        rw!("commute-add"; "(+ ?x ?y)" => "(+ ?y ?x)"),
+        rw!("commute-mul"; "(* ?x ?y)" => "(* ?y ?x)"),
+        rw!("commute-and"; "(and ?x ?y)" => "(and ?y ?x)"),
+        rw!("commute-or"; "(or ?x ?y)" => "(or ?y ?x)"),
+        rw!("commute-eq"; "(= ?x ?y)" => "(= ?y ?x)"),
+        rw!("commute-neq"; "(<> ?x ?y)" => "(<> ?y ?x)"),
+        rw!("converse-gt"; "(> ?x ?y)"=> "(< ?y ?x)"),
+        rw!("converse-gte"; "(>= ?x ?y)"=> "(<= ?y ?x)"),
+        rw!("converse-lt"; "(< ?x ?y)"=> "(> ?y ?x)"),
+        rw!("converse-lte"; "(<= ?x ?y)"=> "(>= ?x ?y)"),
+        rw!("add-0"; "(+ ?x 0)" => "?x"),
+        rw!("add-assoc"; "(+ (+ ?a ?b) ?c)" => "(+ ?a (+ ?b ?c))"),
+        rw!("minus-0"; "(- ?x 0)" => "?x"),
+        rw!("mul-0"; "(* ?x 0)" => "0"),
+        rw!("mul-1"; "(* ?x 1)" => "?x"),
+        rw!("div-1"; "(/ ?x 1)" => "?x"),
+        rw!("dist-and-or"; "(or (and ?a ?b) (and ?a ?c))" => "(and ?a (or ?b ?c))"),
+        rw!("dist-or-and"; "(and (or ?a ?b) (or ?a ?c))" => "(or ?a (and ?b ?c))"),
+        rw!("not-not"; "(not (not ?x))" => "?x"),
+        rw!("or-same"; "(or ?x ?x)" => "?x"),
+        rw!("and-same"; "(and ?x ?x)" => "?x"),
+        rw!("cancel-sub"; "(- ?a ?a)" => "0"),
+        rw!("and-true"; "(and true ?x)"=> "?x"),
+        rw!("0-minus"; "(- 0 ?x)"=> "(negative ?x)"),
+        rw!("and-false"; "(and false ?x)"=> "false"),
+        rw!("or-false"; "(or false ?x)"=> "?x"),
+        rw!("or-true"; "(or true ?x)"=> "true"),
+    ];
+}
+
+define_language! {
+    /// Supported expressions in Tokomak
+    pub enum TokomakExpr {
+        "+" = Plus([Id; 2]),
+        "-" = Minus([Id; 2]),
+        "*" = Multiply([Id; 2]),
+        "/" = Divide([Id; 2]),
+        "%" = Modulus([Id; 2]),
+        "not" = Not(Id),
+        "or" = Or([Id; 2]),
+        "and" = And([Id; 2]),
+        "=" = Eq([Id; 2]),
+        "<>" = NotEq([Id; 2]),
+        "<" = Lt([Id; 2]),
+        "<=" = LtEq([Id; 2]),
+        ">" = Gt([Id; 2]),
+        ">=" = GtEq([Id; 2]),
+
+        "is_not_null" = IsNotNull(Id),
+        "is_null" = IsNull(Id),
+        "negative" = Negative(Id),
+        "between" = Between([Id; 3]),
+        "between_inverted" = BetweenInverted([Id; 3]),
+        "like" = Like([Id; 2]),
+        "not_like" = NotLike([Id; 2]),
+        Bool(bool),
+        Int64(i64),
+        Utf8(String),
+        LargeUtf8(String),
+        Column(Symbol),
+    }
+}
+
+pub fn to_tokomak_expr(rec_expr: &mut RecExpr<TokomakExpr>, expr: Expr) -> Option<Id> {
+    match expr {
+        Expr::BinaryExpr { left, op, right } => {
+            let left = to_tokomak_expr(rec_expr, *left)?;
+            let right = to_tokomak_expr(rec_expr, *right)?;
+            let binary_expr = match op {
+                Operator::Eq => TokomakExpr::Eq,
+                Operator::NotEq => TokomakExpr::NotEq,
+                Operator::Lt => TokomakExpr::Lt,
+                Operator::LtEq => TokomakExpr::LtEq,
+                Operator::Gt => TokomakExpr::Gt,
+                Operator::GtEq => TokomakExpr::GtEq,
+                Operator::Plus => TokomakExpr::Plus,
+                Operator::Minus => TokomakExpr::Minus,
+                Operator::Multiply => TokomakExpr::Multiply,
+                Operator::Divide => TokomakExpr::Divide,
+                Operator::Modulus => TokomakExpr::Modulus,
+                Operator::And => TokomakExpr::And,
+                Operator::Or => TokomakExpr::Or,
+                Operator::Like => TokomakExpr::Like,
+                Operator::NotLike => TokomakExpr::NotLike,
+            };
+            Some(rec_expr.add(binary_expr([left, right])))
+        }
+        Expr::Column(c) => Some(rec_expr.add(TokomakExpr::Column(Symbol::from(c)))),
+        Expr::Literal(ScalarValue::Int64(Some(x))) => Some(rec_expr.add(TokomakExpr::Int64(x))),
+        Expr::Literal(ScalarValue::Utf8(Some(x))) => Some(rec_expr.add(TokomakExpr::Utf8(x))),
+        Expr::Literal(ScalarValue::LargeUtf8(Some(x))) => {
+            Some(rec_expr.add(TokomakExpr::LargeUtf8(x)))
+        }
+        Expr::Literal(ScalarValue::Boolean(Some(x))) => {
+            Some(rec_expr.add(TokomakExpr::Bool(x)))
+        }
+        Expr::Not(expr) => {
+            let left = to_tokomak_expr(rec_expr, *expr)?;
+            Some(rec_expr.add(TokomakExpr::Not(left)))
+        }
+        Expr::IsNull(expr) => {
+            let left = to_tokomak_expr(rec_expr, *expr)?;
+            Some(rec_expr.add(TokomakExpr::IsNull(left)))
+        }
+        Expr::IsNotNull(expr) => {
+            let left = to_tokomak_expr(rec_expr, *expr)?;
+            Some(rec_expr.add(TokomakExpr::IsNotNull(left)))
+        }
+
+        // not yet supported
+        _ => None,
+    }
+}
+
+fn to_exprs(rec_expr: &RecExpr<TokomakExpr>, id: Id) -> Expr {
+    let refs = rec_expr.as_ref();
+    let index: usize = id.into();
+    match refs[index] {
+        TokomakExpr::Plus(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Plus,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Minus(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Minus,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Divide(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Divide,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Modulus(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Modulus,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Not(id) => {
+            let l = to_exprs(&rec_expr, id);
+            Expr::Not(Box::new(l))
+        }
+        TokomakExpr::IsNotNull(id) => {
+            let l = to_exprs(&rec_expr, id);
+            Expr::IsNotNull(Box::new(l))
+        }
+        TokomakExpr::IsNull(id) => {
+            let l = to_exprs(&rec_expr, id);
+            Expr::IsNull(Box::new(l))
+        }
+        TokomakExpr::Negative(id) => {
+            let l = to_exprs(&rec_expr, id);
+            Expr::Negative(Box::new(l))
+        }
+
+        TokomakExpr::Between([expr, low, high]) => {
+            let left = to_exprs(&rec_expr, expr);
+            let low_expr = to_exprs(&rec_expr, low);
+            let high_expr = to_exprs(&rec_expr, high);
+
+            Expr::Between{
+                expr: Box::new(left),
+                negated: false,
+                low: Box::new(low_expr),
+                high: Box::new(high_expr),
+            }
+        }
+        TokomakExpr::BetweenInverted([expr, low, high]) => {
+            let left = to_exprs(&rec_expr, expr);
+            let low_expr = to_exprs(&rec_expr, low);
+            let high_expr = to_exprs(&rec_expr, high);
+
+            Expr::Between{
+                expr: Box::new(left),
+                negated: false,
+                low: Box::new(low_expr),
+                high: Box::new(high_expr),
+            }
+        }
+        TokomakExpr::Multiply(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Multiply,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Or(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Or,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::And(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::And,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Eq(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Eq,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::NotEq(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::NotEq,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Lt(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Lt,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::LtEq(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::LtEq,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Gt(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Gt,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::GtEq(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::GtEq,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::Like(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::Like,
+                right: Box::new(r),
+            }
+        }
+        TokomakExpr::NotLike(ids) => {
+            let l = to_exprs(&rec_expr, ids[0]);
+            let r = to_exprs(&rec_expr, ids[1]);
+
+            Expr::BinaryExpr {
+                left: Box::new(l),
+                op: Operator::NotLike,
+                right: Box::new(r),
+            }
+        }
+
+        TokomakExpr::Int64(i) => Expr::Literal(ScalarValue::Int64(Some(i))),
+        TokomakExpr::Utf8(ref i) => Expr::Literal(ScalarValue::Utf8(Some(i.clone()))),
+        TokomakExpr::LargeUtf8(ref i) => Expr::Literal(ScalarValue::LargeUtf8(Some(i.clone()))),
+        TokomakExpr::Column(col) => Expr::Column(col.to_string()),
+        TokomakExpr::Bool(b) => {
+            Expr::Literal(ScalarValue::Boolean(Some(b)))
+        }
+    }
+}
+
+impl OptimizerRule for Tokomak {
+    fn optimize(&self, plan: &LogicalPlan, props: &ExecutionProps) -> DFResult<LogicalPlan> {
+        let inputs = plan.inputs();
+        let new_inputs: Vec<LogicalPlan> = inputs
+            .iter()
+            .map(|plan| self.optimize(plan, props))
+            .collect::<DFResult<Vec<_>>>()?;
+        // optimize all expressions individual (for now)
+        let mut exprs = vec![];
+        for expr in plan.expressions().iter() {
+            let rec_expr = &mut RecExpr::default();
+            let tok_expr = to_tokomak_expr(rec_expr, expr.clone());
+            match tok_expr {
+                None => exprs.push(expr.clone()),
+                Some(_expr) => {
+                    let runner = Runner::<TokomakExpr, (), ()>::default()
+                        .with_expr(rec_expr)
+                        .run(&rules());
+
+                    let mut extractor = Extractor::new(&runner.egraph, AstSize);
+                    let (_, best_expr) = extractor.find_best(runner.roots[0]);
+                    let start = best_expr.as_ref().len() - 1;
+                    exprs.push(to_exprs(&best_expr, start.into()).clone());
+                }
+            }
+        }
+
+        utils::from_plan(plan, &exprs, &new_inputs)
+    }
+
+    fn name(&self) -> &str {
+        "tokomak"
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use super::*;
+    use crate::prelude::{CsvReadOptions, ExecutionConfig, ExecutionContext};
+    use egg::Runner;
+
+    #[test]
+    fn test_add_0() {
+        let expr = "(+ 0 (x))".parse().unwrap();
+        let runner = Runner::<TokomakExpr, (), ()>::default()
+            .with_expr(&expr)
+            .run(&rules());
+
+        let mut extractor = Extractor::new(&runner.egraph, AstSize);
+
+        let (_best_cost, best_expr) = extractor.find_best(runner.roots[0]);
+
+        assert_eq!(format!("{}", best_expr), "x")
+    }
+
+    #[test]
+    fn test_dist_and_or() {
+        let expr = "(or (or (and (= 1 2) foo) (and (= 1 2) bar)) (and (= 1 2) boo))"
+            .parse()
+            .unwrap();
+        let runner = Runner::<TokomakExpr, (), ()>::default()
+            .with_expr(&expr)
+            .run(&rules());
+
+        let mut extractor = Extractor::new(&runner.egraph, AstSize);
+
+        let (_, best_expr) = extractor.find_best(runner.roots[0]);
+
+        assert_eq!(
+            format!("{}", best_expr),
+            "(and (= 1 2) (or boo (or foo bar)))"
+        )
+    }
+
+    #[tokio::test]
+    async fn custom_optimizer() {
+        // register custom tokomak optimizer, verify that optimization took place
+
+        let mut ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().add_optimizer_rule(Arc::new(Tokomak::new())),
+        );
+
+        ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())
+            .unwrap();
+
+        // create a plan to run a SQL query
+        let lp = ctx
+            .sql("SELECT price*0-price from example")
+            .unwrap()
+            .to_logical_plan();
+
+        assert_eq!(
+            format!("{}", lp.display_indent()),
+            "Projection: (- #price)\
+            \n  TableScan: example projection=Some([0])"
+        )
+    }
+
+    #[tokio::test]
+    async fn custom_optimizer_filter() {
+        // register custom tokomak optimizer, verify that optimization took place
+
+        let mut ctx = ExecutionContext::with_config(
+            ExecutionConfig::new().add_optimizer_rule(Arc::new(Tokomak::new())),
+        );
+        ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new())
+            .unwrap();
+
+        // create a plan to run a SQL query
+        let lp = ctx
+            .sql("SELECT price from example WHERE (price=1 AND price=2) OR (price=1 AND price=3)")
+            .unwrap()
+            .to_logical_plan();
+
+        assert_eq!(
+            format!("{}", lp.display_indent()),
+            "Filter: #price Eq Int64(1) And #price Eq Int64(2) Or #price Eq Int64(3)\
+            \n  TableScan: example projection=Some([0])"
+        )
+    }
+}