You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/03/24 19:16:18 UTC

[impala] 02/02: IMPALA-9183: Convert disjunctive predicates to conjunctive normal form

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1411ca6a00d408956bd63d20995d13c3e6ded1b1
Author: Aman Sinha <am...@cloudera.com>
AuthorDate: Thu Jan 2 15:42:17 2020 -0800

    IMPALA-9183: Convert disjunctive predicates to conjunctive normal form
    
    Added an expression rewrite rule to convert a disjunctive predicate to
    conjunctive normal form (CNF). Converting to CNF enables multi-table
    predicates that were only evaluated by a Join operator to be converted
    into either single-table conjuncts that are eligible for predicate pushdown
    to the scan operator or other multi-table conjuncts that are eligible to
    be pushed to a Join below. This helps improve performance for such queries.
    
    Since converting to CNF expands the number of expressions, we place a
    limit on the maximum number of CNF exprs (each AND is counted as 1 CNF expr)
    that are considered. Once the MAX_CNF_EXPRS limit (default is unlimited) is
    exceeded, whatever expression was supplied to the rule is returned without
    further transformation. A setting of -1 or 0 allows unlimited number of
    CNF exprs to be created upto int32 max. Another option ENABLE_CNF_REWRITES
    enables or disables the entire rewrite. This is False by default until we
    have done more thorough functional testing (tracking JIRA IMPALA-9539).
    
    Examples of rewrites:
     original: (a AND b) OR c
     rewritten: (a OR c) AND (b OR c)
    
     original: (a AND b) OR (c AND d)
     rewritten: (a OR c) AND (a OR d) AND (b OR c) AND (b OR d)
    
     original: NOT(a OR b)
     rewritten: NOT(a) AND NOT(b)
    
    Testing:
     - Added new unit tests with variations of disjunctive predicates
       and verified their Explain plans
     - Manually tested the result correctness on impala shell by running
       these queries with ENABLE_CNF_REWRITES enabled and disabled
     - Added TPC-H q7, q19 and TPC-DS q13 with the CNF rewrite enabled
     - Preliminary performance testing of TPC-DS q13 on a 10TB scale factor
       shows almost 5x improvement:
          Original baseline: 47.5 sec
          With this patch and CNF rewrite enabled: 9.4 sec
    
    Change-Id: I5a03cd7239333aaf375416ef5f2b7608fcd4a072
    Reviewed-on: http://gerrit.cloudera.org:8080/15462
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/query-options-test.cc               |   1 +
 be/src/service/query-options.cc                    |  16 ++
 be/src/service/query-options.h                     |   6 +-
 common/thrift/ImpalaInternalService.thrift         |   6 +
 common/thrift/ImpalaService.thrift                 |   9 +
 .../java/org/apache/impala/analysis/Analyzer.java  |   5 +
 .../apache/impala/rewrite/ConvertToCNFRule.java    | 174 +++++++++++++
 .../impala/analysis/ExprRewriteRulesTest.java      |  20 ++
 .../org/apache/impala/planner/PlannerTest.java     |  10 +
 .../queries/PlannerTest/convert-to-cnf.test        | 289 +++++++++++++++++++++
 .../queries/PlannerTest/tpcds-all.test             | 119 +++++++++
 .../queries/PlannerTest/tpch-all.test              | 178 +++++++++++++
 12 files changed, 831 insertions(+), 2 deletions(-)

diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 0cc30ed..9720dc4 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -247,6 +247,7 @@ TEST(QueryOptions, SetIntOptions) {
       {MAKE_OPTIONDEF(thread_reservation_aggregate_limit), {-1, I32_MAX}},
       {MAKE_OPTIONDEF(statement_expression_limit),
           {MIN_STATEMENT_EXPRESSION_LIMIT, I32_MAX}},
+      {MAKE_OPTIONDEF(max_cnf_exprs),                  {-1, I32_MAX}},
   };
   for (const auto& test_case : case_set) {
     const OptionDef<int32_t>& option_def = test_case.first;
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 7042d61..690c3da 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -531,6 +531,10 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_enable_expr_rewrites(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::ENABLE_CNF_REWRITES: {
+        query_options->__set_enable_cnf_rewrites(IsTrue(value));
+        break;
+      }
       case TImpalaQueryOptions::DECIMAL_V2: {
         query_options->__set_decimal_v2(IsTrue(value));
         break;
@@ -924,6 +928,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_preagg_bytes_limit(preagg_bytes_limit);
         break;
       }
+      case TImpalaQueryOptions::MAX_CNF_EXPRS: {
+        StringParser::ParseResult result;
+        const int32_t requested_max_cnf_exprs =
+            StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
+        if (result != StringParser::PARSE_SUCCESS || requested_max_cnf_exprs < -1) {
+          return Status(
+              Substitute("Invalid max cnf exprs : '$0'. "
+                         "Only -1 and non-negative numbers are allowed.", value));
+        }
+        query_options->__set_max_cnf_exprs(requested_max_cnf_exprs);
+        break;
+      }
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 5bbd51a..6413505 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::PREAGG_BYTES_LIMIT + 1);\
+      TImpalaQueryOptions::MAX_CNF_EXPRS + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -114,6 +114,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(strict_mode, STRICT_MODE, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT, TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(enable_expr_rewrites, ENABLE_EXPR_REWRITES, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(enable_cnf_rewrites, ENABLE_CNF_REWRITES, TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(decimal_v2, DECIMAL_V2, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(parquet_dictionary_filtering, PARQUET_DICTIONARY_FILTERING,\
       TQueryOptionLevel::ADVANCED)\
@@ -194,7 +195,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(mem_limit_executors, MEM_LIMIT_EXECUTORS, TQueryOptionLevel::DEVELOPMENT)\
   QUERY_OPT_FN(broadcast_bytes_limit, BROADCAST_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
-  QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)
+  QUERY_OPT_FN(preagg_bytes_limit, PREAGG_BYTES_LIMIT, TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(max_cnf_exprs, MAX_CNF_EXPRS, TQueryOptionLevel::ADVANCED)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index c70e682..02704bf 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -410,6 +410,12 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   99: optional i64 preagg_bytes_limit = -1;
+
+  // See comment in ImpalaService.thrift
+  100: optional bool enable_cnf_rewrites = false;
+
+  // See comment in ImpalaService.thrift
+  101: optional i32 max_cnf_exprs = 0;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 7065715..2e5fa7d 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -507,6 +507,15 @@ enum TImpalaQueryOptions {
   // The max reservation that each grouping class in a preaggregation will use.
   // 0 or -1 means this has no effect.
   PREAGG_BYTES_LIMIT = 98
+
+  // Indicates whether the FE should rewrite disjunctive predicates to conjunctive
+  // normal form (CNF) for optimization purposes. Default is False.
+  ENABLE_CNF_REWRITES = 99
+
+  // The max number of conjunctive normal form (CNF) exprs to create when converting
+  // a disjunctive expression to CNF. Each AND counts as 1 expression. A value of
+  // -1 or 0 means no limit. Default is 0 (unlimited).
+  MAX_CNF_EXPRS = 100
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index be26b3e..f748f92 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -65,6 +65,7 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.planner.PlanNode;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
+import org.apache.impala.rewrite.ConvertToCNFRule;
 import org.apache.impala.rewrite.EqualityDisjunctsToInRule;
 import org.apache.impala.rewrite.ExprRewriteRule;
 import org.apache.impala.rewrite.ExprRewriter;
@@ -481,6 +482,10 @@ public class Analyzer {
         rules.add(FoldConstantsRule.INSTANCE);
         rules.add(NormalizeExprsRule.INSTANCE);
         rules.add(ExtractCommonConjunctRule.INSTANCE);
+        if (queryCtx.getClient_request().getQuery_options().isEnable_cnf_rewrites()) {
+          rules.add(new ConvertToCNFRule(queryCtx.getClient_request().getQuery_options()
+                  .getMax_cnf_exprs(),true));
+        }
         // Relies on FoldConstantsRule and NormalizeExprsRule.
         rules.add(SimplifyConditionalsRule.INSTANCE);
         rules.add(EqualityDisjunctsToInRule.INSTANCE);
diff --git a/fe/src/main/java/org/apache/impala/rewrite/ConvertToCNFRule.java b/fe/src/main/java/org/apache/impala/rewrite/ConvertToCNFRule.java
new file mode 100644
index 0000000..da8f2ac
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/rewrite/ConvertToCNFRule.java
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.rewrite;
+
+import org.apache.impala.analysis.Analyzer;
+import org.apache.impala.analysis.CompoundPredicate;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.analysis.Predicate;
+import org.apache.impala.analysis.TupleId;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This rule converts a predicate to conjunctive normal form (CNF).
+ * Converting to CNF enables multi-table predicates that were only
+ * evaluated by a Join operator to be converted into either single-table
+ * conjuncts that are eligible for predicate pushdown to the scan
+ * operator or other multi-table conjuncts that are eligible to be pushed
+ * to a Join below.
+ *
+ * By default, we do this conversion if the expressions in the predicate
+ * reference 2 or more tables but depending on the forMultiTablesOnly flag,
+ * it can work for single table predicates also.
+ *
+ * Since converting to CNF expands the number of exprs, we place a limit on the
+ * maximum number of CNF exprs (each AND is counted as 1 CNF expr) that are considered;
+ * once this limit is exceeded, whatever expression was supplied is returned without
+ * further transformation. Note that in some systems (e.g Hive) the original OR
+ * predicate is returned when such a limit is reached. However, Impala's rewrite
+ * rule is applied bottom-up in ExprRewriter, so we may not have access to the
+ * top level OR predicate when this limit is reached.
+ *
+ * The rule does one expansion at a time, so for complex predicates it relies on
+ * the ExprRewriter's iterative invocations for expanding in successive steps as shown
+ * in example 2 below.
+ *
+ * Currently, this rule handles the following common pattern:
+ *  1.
+ *  original: (a AND b) OR c
+ *  rewritten: (a OR c) AND (b OR c)
+ *  2.
+ *  if 'c' is another compound predicate, a subsequent application of this
+ *  rule would again convert to CNF:
+ *  original: (a AND b) OR (c AND d)
+ *  first rewrite: (a OR (c AND d)) AND (b OR (c AND d))
+ *  subsequent rewrite: (a OR c) AND (a OR d) AND (b OR c) AND (b OR d)
+ *  3.
+ *  original: NOT(a OR b)
+ *  rewritten: NOT(a) AND NOT(b)  (by De Morgan's theorem)
+ *
+ *  Following predicates are already in CNF, so no conversion is done:
+ *   a OR b where 'a' and 'b' are not CompoundPredicates
+ *   a AND b
+ */
+public class ConvertToCNFRule implements ExprRewriteRule {
+
+  // maximum number of CNF exprs (each AND is counted as 1) allowed
+  private final int maxCnfExprs_;
+  // current number of CNF exprs
+  private int numCnfExprs_ = 0;
+
+  // flag that convert a disjunct to CNF only if the predicate involves
+  // 2 or more tables. By default, we do this only for multi table case
+  // but for unit testing it is useful to disable this
+  private final boolean forMultiTablesOnly_;
+
+  @Override
+  public Expr apply(Expr expr, Analyzer analyzer) {
+    return convertToCNF(expr);
+  }
+
+  private Expr convertToCNF(Expr pred) {
+    if (!(pred instanceof CompoundPredicate)) {
+      return pred;
+    }
+
+    if (maxCnfExprs_ > 0 && numCnfExprs_ >= maxCnfExprs_) {
+      // max allowed CNF exprs has been reached .. in this case we
+      // return the supplied predicate (also see related comments
+      // in the class level comments above)
+      return pred;
+    }
+
+    CompoundPredicate cpred = (CompoundPredicate) pred;
+    if (cpred.getOp() == CompoundPredicate.Operator.AND) {
+      // this is already a conjunct
+      return cpred;
+    } else if (cpred.getOp() == CompoundPredicate.Operator.OR
+            || cpred.getOp() == CompoundPredicate.Operator.NOT) {
+      if (forMultiTablesOnly_) {
+        // check if this predicate references one or more tuples. If only 1 tuple,
+        // we can skip the rewrite since the disjunct can be pushed down as-is
+        List<TupleId> tids = new ArrayList<>();
+        cpred.getIds(tids, null);
+        if (tids.size() <= 1) {
+          return cpred;
+        }
+      }
+      if (cpred.getOp() == CompoundPredicate.Operator.OR) {
+        Expr lhs = cpred.getChild(0);
+        Expr rhs = cpred.getChild(1);
+        if (lhs instanceof CompoundPredicate &&
+            ((CompoundPredicate)lhs).getOp() == CompoundPredicate.Operator.AND) {
+          // predicate: (a AND b) OR c
+          // convert to (a OR c) AND (b OR c)
+          return createPredAndIncrementCount(lhs.getChild(0), rhs,
+                  lhs.getChild(1), rhs);
+        } else if (rhs instanceof CompoundPredicate &&
+            ((CompoundPredicate)rhs).getOp() == CompoundPredicate.Operator.AND) {
+          // predicate: a OR (b AND c)
+          // convert to (a OR b) AND (a or c)
+          return createPredAndIncrementCount(lhs, rhs.getChild(0),
+                  lhs, rhs.getChild(1));
+        }
+      } else if (cpred.getOp() == CompoundPredicate.Operator.NOT) {
+        Expr child = cpred.getChild(0);
+        if (child instanceof CompoundPredicate &&
+            ((CompoundPredicate) child).getOp() == CompoundPredicate.Operator.OR) {
+          // predicate: NOT (a OR b)
+          // convert to: NOT(a) AND NOT(b)
+          Expr lhs = ((CompoundPredicate) child).getChild(0);
+          Expr rhs = ((CompoundPredicate) child).getChild(1);
+          Expr lhs1 = new CompoundPredicate(CompoundPredicate.Operator.NOT, lhs, null);
+          Expr rhs1 = new CompoundPredicate(CompoundPredicate.Operator.NOT, rhs, null);
+          Predicate newPredicate =
+              (CompoundPredicate) CompoundPredicate.createConjunction(lhs1, rhs1);
+          numCnfExprs_++;
+          return newPredicate;
+        }
+      }
+    }
+    return pred;
+  }
+
+  /**
+   * Compose 2 disjunctive predicates using supplied exprs and combine
+   * the disjuncts into a top level conjunct. Increment the CNF exprs count.
+   */
+  private Predicate createPredAndIncrementCount(Expr first_lhs, Expr second_lhs,
+                                                Expr first_rhs, Expr second_rhs) {
+    List<Expr> disjuncts = Arrays.asList(first_lhs, second_lhs);
+    Expr lhs1 = (CompoundPredicate)
+        CompoundPredicate.createDisjunctivePredicate(disjuncts);
+    disjuncts = Arrays.asList(first_rhs, second_rhs);
+    Expr rhs1 = (CompoundPredicate)
+        CompoundPredicate.createDisjunctivePredicate(disjuncts);
+    Predicate newPredicate = (CompoundPredicate)
+        CompoundPredicate.createConjunction(lhs1, rhs1);
+    numCnfExprs_++;
+    return newPredicate;
+  }
+
+  public ConvertToCNFRule(int maxCnfExprs, boolean forMultiTablesOnly) {
+    maxCnfExprs_ = maxCnfExprs;
+    forMultiTablesOnly_ = forMultiTablesOnly;
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
index 0635210..2276779 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java
@@ -32,6 +32,7 @@ import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.QueryFixture;
 import org.apache.impala.common.SqlCastException;
 import org.apache.impala.rewrite.BetweenToCompoundRule;
+import org.apache.impala.rewrite.ConvertToCNFRule;
 import org.apache.impala.rewrite.EqualityDisjunctsToInRule;
 import org.apache.impala.rewrite.ExprRewriteRule;
 import org.apache.impala.rewrite.ExprRewriter;
@@ -804,4 +805,23 @@ public class ExprRewriteRulesTest extends FrontendTestBase {
     // are not simplified
     RewritesOk("nullif(1 + int_col, 1 + int_col)", rules, "NULL");
   }
+
+  @Test
+  public void testConvertToCNFRule() throws ImpalaException {
+    ExprRewriteRule rule = new ConvertToCNFRule(-1, false);
+
+    RewritesOk("(int_col > 10 AND int_col < 20) OR float_col < 5.0", rule,
+            "int_col < 20 OR float_col < 5.0 AND int_col > 10 OR float_col < 5.0");
+    RewritesOk("float_col < 5.0 OR (int_col > 10 AND int_col < 20)", rule,
+            "float_col < 5.0 OR int_col < 20 AND float_col < 5.0 OR int_col > 10");
+    RewritesOk("(int_col > 10 AND float_col < 5.0) OR " +
+            "(int_col < 20 AND float_col > 15.0)", rule,
+            "float_col < 5.0 OR float_col > 15.0 AND " +
+                    "float_col < 5.0 OR int_col < 20 " +
+                    "AND int_col > 10 OR float_col > 15.0 AND int_col > 10 " +
+                    "OR int_col < 20");
+    RewritesOk("NOT(int_col > 10 OR int_col < 20)", rule,
+            "NOT int_col < 20 AND NOT int_col > 10");
+  }
+
 }
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index a19a584..67d8e9e 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1028,4 +1028,14 @@ public class PlannerTest extends PlannerTestBase {
     options.setExplain_level(TExplainLevel.EXTENDED);
     runPlannerTestFile("preagg-bytes-limit", "tpch_parquet", options);
   }
+
+  /**
+   * Check conversion of predicates to conjunctive normal form.
+   */
+  @Test
+  public void testConvertToCNF() {
+    TQueryOptions options = new TQueryOptions();
+    options.setEnable_cnf_rewrites(true);
+    runPlannerTestFile("convert-to-cnf", "tpch_parquet", options);
+  }
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/convert-to-cnf.test b/testdata/workloads/functional-planner/queries/PlannerTest/convert-to-cnf.test
new file mode 100644
index 0000000..272a2b4
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/convert-to-cnf.test
@@ -0,0 +1,289 @@
+# test conversion of disjunctive predicate to conjunctive normal form
+# inner join
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and ((l_suppkey > 10 and o_custkey > 20)
+      or (l_suppkey > 30 and o_custkey > 40))
+ and l_partkey > 0;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  other predicates: l_suppkey > 10 OR o_custkey > 40, o_custkey > 20 OR l_suppkey > 30
+|  runtime filters: RF000 <- o_orderkey
+|  row-size=40B cardinality=57.58K
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: o_custkey > 20 OR o_custkey > 40
+|     row-size=16B cardinality=150.00K
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.99MB
+   predicates: l_partkey > 0, l_suppkey > 10 OR l_suppkey > 30
+   runtime filters: RF000 -> l_orderkey
+   row-size=24B cardinality=600.12K
+====
+
+# outer join
+select count(*) from lineitem left outer join orders
+ on l_orderkey = o_orderkey
+ where ((l_suppkey > 10 and o_custkey > 20)
+      or (l_suppkey > 30 and o_custkey > 40))
+ and l_partkey > 0;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [LEFT OUTER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  other predicates: l_suppkey > 10 OR o_custkey > 40, o_custkey > 20 OR l_suppkey > 30, o_custkey > 20 OR o_custkey > 40
+|  row-size=40B cardinality=600.12K
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: o_custkey > 20 OR o_custkey > 40
+|     row-size=16B cardinality=150.00K
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.99MB
+   predicates: l_partkey > 0, l_suppkey > 10 OR l_suppkey > 30
+   row-size=24B cardinality=600.12K
+====
+
+# BETWEEN predicate within each side of the OR
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and ((l_suppkey between 10 and 30 and o_custkey > 20)
+      or (l_suppkey between 30 and 50 and o_custkey > 40))
+ and l_partkey > 0;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  other predicates: l_suppkey <= 30 OR o_custkey > 40, l_suppkey >= 10 OR o_custkey > 40, o_custkey > 20 OR l_suppkey <= 50, o_custkey > 20 OR l_suppkey >= 30
+|  runtime filters: RF000 <- o_orderkey
+|  row-size=40B cardinality=57.58K
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: o_custkey > 20 OR o_custkey > 40
+|     row-size=16B cardinality=150.00K
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.99MB
+   predicates: l_partkey > 0, l_suppkey <= 30 OR l_suppkey >= 30 AND l_suppkey <= 50, l_suppkey >= 10 OR l_suppkey >= 30 AND l_suppkey <= 50
+   runtime filters: RF000 -> l_orderkey
+   row-size=24B cardinality=600.12K
+====
+
+# equality predicates on l_suppkey should eventually be converted to IN
+# after CNF rewrite enables another rule to be applied
+# IN predicates on o_custkey should be preserved
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and ((l_suppkey = 10 and o_custkey in (20, 21))
+      or (l_suppkey = 30 and o_custkey in (40, 41))
+      or (l_suppkey = 50 and o_custkey in (60, 61)))
+ and l_partkey > 0;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  other predicates: l_suppkey IN (10, 30) OR o_custkey IN (60, 61), o_custkey IN (20, 21, 40, 41) OR l_suppkey = 50, l_suppkey = 10 OR o_custkey IN (40, 41) OR l_suppkey = 50, o_custkey IN (20, 21) OR l_suppkey = 30 OR l_suppkey = 50, l_suppkey = 10 OR o_custkey IN (40, 41) OR o_custkey IN (60, 61), o_custkey IN (20, 21) OR l_suppkey = 30 OR o_custkey IN (60, 61)
+|  runtime filters: RF000 <- o_orderkey
+|  row-size=40B cardinality=1
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: o_custkey IN (20, 21, 40, 41, 60, 61)
+|     row-size=16B cardinality=91
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.99MB
+   predicates: l_partkey > 0, l_suppkey IN (10, 30, 50)
+   runtime filters: RF000 -> l_orderkey
+   row-size=24B cardinality=586
+====
+
+# NOT predicate
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and not ((l_suppkey > 10 and o_custkey > 20)
+      or (l_suppkey > 30 and o_custkey > 40))
+ and l_partkey > 0;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = l_orderkey
+|  other predicates: NOT o_custkey > 20 OR o_custkey > 40 AND o_custkey > 20 OR l_suppkey > 30 AND l_suppkey > 10 OR o_custkey > 40 AND l_suppkey > 10 OR l_suppkey > 30
+|  runtime filters: RF000 <- l_orderkey
+|  row-size=40B cardinality=600.12K
+|
+|--00:SCAN HDFS [tpch_parquet.lineitem]
+|     HDFS partitions=1/1 files=3 size=193.99MB
+|     predicates: l_partkey > 0
+|     row-size=24B cardinality=600.12K
+|
+01:SCAN HDFS [tpch_parquet.orders]
+   HDFS partitions=1/1 files=2 size=54.21MB
+   runtime filters: RF000 -> o_orderkey
+   row-size=16B cardinality=1.50M
+====
+
+# set the max_cnf_exprs limit
+# in this case partial conversion is expected
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and ((l_suppkey between 10 and 50 and o_custkey between 20 and 40)
+      or (l_suppkey between 30 and 90  and o_custkey between 60 and 100))
+ and l_partkey > 0;
+---- QUERYOPTIONS
+MAX_CNF_EXPRS=4
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  other predicates: o_custkey <= 40 OR l_suppkey >= 30 AND l_suppkey <= 90, l_suppkey <= 50 OR (l_suppkey >= 30 AND l_suppkey <= 90 AND o_custkey >= 60 AND o_custkey <= 100), l_suppkey >= 10 OR (l_suppkey >= 30 AND l_suppkey <= 90 AND o_custkey >= 60 AND o_custkey <= 100), o_custkey >= 20 OR (l_suppkey >= 30 AND l_suppkey <= 90 AND o_custkey >= 60 AND o_custkey <= 100)
+|  runtime filters: RF000 <- o_orderkey
+|  row-size=40B cardinality=57.58K
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: o_custkey <= 40 OR o_custkey >= 60 AND o_custkey <= 100
+|     row-size=16B cardinality=150.00K
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.99MB
+   predicates: l_partkey > 0
+   runtime filters: RF000 -> l_orderkey
+   row-size=24B cardinality=600.12K
+====
+
+# reset the max_cnf_exprs limit to unlimited (-1)
+# in this case full conversion is expected
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and ((l_suppkey between 10 and 50 and o_custkey between 20 and 40)
+      or (l_suppkey between 30 and 90  and o_custkey between 60 and 100))
+ and l_partkey > 0;
+---- QUERYOPTIONS
+MAX_CNF_EXPRS=-1
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  other predicates: l_suppkey <= 50 OR o_custkey <= 100, l_suppkey <= 50 OR o_custkey >= 60, l_suppkey >= 10 OR o_custkey <= 100, l_suppkey >= 10 OR o_custkey >= 60, o_custkey <= 40 OR l_suppkey <= 90, o_custkey <= 40 OR l_suppkey >= 30, o_custkey >= 20 OR l_suppkey <= 90, o_custkey >= 20 OR l_suppkey >= 30
+|  runtime filters: RF000 <- o_orderkey
+|  row-size=40B cardinality=57.58K
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: o_custkey <= 40 OR o_custkey >= 60 AND o_custkey <= 100, o_custkey >= 20 OR o_custkey >= 60 AND o_custkey <= 100
+|     row-size=16B cardinality=150.00K
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.99MB
+   predicates: l_partkey > 0, l_suppkey <= 50 OR l_suppkey >= 30 AND l_suppkey <= 90, l_suppkey >= 10 OR l_suppkey >= 30 AND l_suppkey <= 90
+   runtime filters: RF000 -> l_orderkey
+   row-size=24B cardinality=600.12K
+====
+
+# same as above, but set max_cnf_exprs to 0 (also implies unlimited)
+# in this case full conversion is expected
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and ((l_suppkey between 10 and 50 and o_custkey between 20 and 40)
+      or (l_suppkey between 30 and 90  and o_custkey between 60 and 100))
+ and l_partkey > 0;
+---- QUERYOPTIONS
+MAX_CNF_EXPRS=0
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  other predicates: l_suppkey <= 50 OR o_custkey <= 100, l_suppkey <= 50 OR o_custkey >= 60, l_suppkey >= 10 OR o_custkey <= 100, l_suppkey >= 10 OR o_custkey >= 60, o_custkey <= 40 OR l_suppkey <= 90, o_custkey <= 40 OR l_suppkey >= 30, o_custkey >= 20 OR l_suppkey <= 90, o_custkey >= 20 OR l_suppkey >= 30
+|  runtime filters: RF000 <- o_orderkey
+|  row-size=40B cardinality=57.58K
+|
+|--01:SCAN HDFS [tpch_parquet.orders]
+|     HDFS partitions=1/1 files=2 size=54.21MB
+|     predicates: o_custkey <= 40 OR o_custkey >= 60 AND o_custkey <= 100, o_custkey >= 20 OR o_custkey >= 60 AND o_custkey <= 100
+|     row-size=16B cardinality=150.00K
+|
+00:SCAN HDFS [tpch_parquet.lineitem]
+   HDFS partitions=1/1 files=3 size=193.99MB
+   predicates: l_partkey > 0, l_suppkey <= 50 OR l_suppkey >= 30 AND l_suppkey <= 90, l_suppkey >= 10 OR l_suppkey >= 30 AND l_suppkey <= 90
+   runtime filters: RF000 -> l_orderkey
+   row-size=24B cardinality=600.12K
+====
+
+# disable the rewrite, so no conversion is expected
+select count(*) from lineitem, orders
+ where l_orderkey = o_orderkey
+ and ((l_suppkey between 10 and 50 and o_custkey between 20 and 40)
+      or (l_suppkey between 30 and 90  and o_custkey between 60 and 100))
+ and l_partkey > 0;
+---- QUERYOPTIONS
+ENABLE_CNF_REWRITES=false
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|  row-size=8B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: o_orderkey = l_orderkey
+|  other predicates: ((l_suppkey >= 10 AND l_suppkey <= 50 AND o_custkey >= 20 AND o_custkey <= 40) OR (l_suppkey >= 30 AND l_suppkey <= 90 AND o_custkey >= 60 AND o_custkey <= 100))
+|  runtime filters: RF000 <- l_orderkey
+|  row-size=40B cardinality=600.12K
+|
+|--00:SCAN HDFS [tpch_parquet.lineitem]
+|     HDFS partitions=1/1 files=3 size=193.99MB
+|     predicates: l_partkey > 0
+|     row-size=24B cardinality=600.12K
+|
+01:SCAN HDFS [tpch_parquet.orders]
+   HDFS partitions=1/1 files=2 size=54.21MB
+   runtime filters: RF000 -> o_orderkey
+   row-size=16B cardinality=1.50M
+====
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
index 99761af..1382e42 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds-all.test
@@ -770,6 +770,125 @@ PLAN-ROOT SINK
    runtime filters: RF002 -> store_sales.ss_store_sk
    row-size=8B cardinality=84.40K
 ====
+# TPCDS-Q13 (with CNF rewrite enabled)
+select avg(ss_quantity)
+       ,avg(ss_ext_sales_price)
+       ,avg(ss_ext_wholesale_cost)
+       ,sum(ss_ext_wholesale_cost)
+ from store_sales
+     ,store
+     ,customer_demographics
+     ,household_demographics
+     ,customer_address
+     ,date_dim
+ where s_store_sk = ss_store_sk
+ and  ss_sold_date_sk = d_date_sk and d_year = 2001
+ and((ss_hdemo_sk=hd_demo_sk
+  and cd_demo_sk = ss_cdemo_sk
+  and cd_marital_status = 'M'
+  and cd_education_status = 'Advanced Degree'
+  and ss_sales_price between 100.00 and 150.00
+  and hd_dep_count = 3
+     )or
+     (ss_hdemo_sk=hd_demo_sk
+  and cd_demo_sk = ss_cdemo_sk
+  and cd_marital_status = 'S'
+  and cd_education_status = 'College'
+  and ss_sales_price between 50.00 and 100.00
+  and hd_dep_count = 1
+     ) or
+     (ss_hdemo_sk=hd_demo_sk
+  and cd_demo_sk = ss_cdemo_sk
+  and cd_marital_status = 'W'
+  and cd_education_status = '2 yr Degree'
+  and ss_sales_price between 150.00 and 200.00
+  and hd_dep_count = 1
+     ))
+ and((ss_addr_sk = ca_address_sk
+  and ca_country = 'United States'
+  and ca_state in ('TX', 'OH', 'TX')
+  and ss_net_profit between 100 and 200
+     ) or
+     (ss_addr_sk = ca_address_sk
+  and ca_country = 'United States'
+  and ca_state in ('OR', 'NM', 'KY')
+  and ss_net_profit between 150 and 300
+     ) or
+     (ss_addr_sk = ca_address_sk
+  and ca_country = 'United States'
+  and ca_state in ('VA', 'TX', 'MS')
+  and ss_net_profit between 50 and 250
+     ))
+---- QUERYOPTIONS
+ENABLE_CNF_REWRITES=true
+---- PLAN
+Max Per-Host Resource Reservation: Memory=45.01MB Threads=7
+Per-Host Resource Estimates: Memory=461MB
+PLAN-ROOT SINK
+|
+11:AGGREGATE [FINALIZE]
+|  output: avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost)
+|  row-size=40B cardinality=1
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: ss_store_sk = s_store_sk
+|  runtime filters: RF000 <- s_store_sk
+|  row-size=142B cardinality=2.55K
+|
+|--01:SCAN HDFS [tpcds.store]
+|     HDFS partitions=1/1 files=1 size=3.08KB
+|     row-size=4B cardinality=12
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: cd_demo_sk = ss_cdemo_sk
+|  other predicates: cd_education_status = 'Advanced Degree' OR cd_marital_status = 'S' AND cd_education_status = 'College' OR hd_dep_count = 1, cd_marital_status = 'M' OR cd_marital_status = 'S' AND cd_education_status = 'College' OR hd_dep_count = 1, hd_dep_count = 3 OR cd_education_status = 'College' OR hd_dep_count = 1, cd_education_status = 'Advanced Degree' OR hd_dep_count = 1 OR hd_dep_count = 1, hd_dep_count IN (3, 1) OR cd_education_status = '2 yr Degree', hd_dep_count = 3 OR cd [...]
+|  runtime filters: RF002 <- ss_cdemo_sk
+|  row-size=138B cardinality=2.55K
+|
+|--08:HASH JOIN [INNER JOIN]
+|  |  hash predicates: ss_hdemo_sk = hd_demo_sk
+|  |  other predicates: hd_dep_count IN (3, 1) OR ss_sales_price <= 200.00, hd_dep_count IN (3, 1) OR ss_sales_price >= 150.00, hd_dep_count = 3 OR ss_sales_price <= 100.00 OR hd_dep_count = 1, hd_dep_count = 3 OR ss_sales_price <= 100.00 OR ss_sales_price <= 200.00, hd_dep_count = 3 OR ss_sales_price <= 100.00 OR ss_sales_price >= 150.00, hd_dep_count = 3 OR ss_sales_price >= 50.00 OR hd_dep_count = 1, hd_dep_count = 3 OR ss_sales_price >= 50.00 OR ss_sales_price <= 200.00, hd_dep_count [...]
+|  |  runtime filters: RF004 <- hd_demo_sk
+|  |  row-size=99B cardinality=3.05K
+|  |
+|  |--03:SCAN HDFS [tpcds.household_demographics]
+|  |     HDFS partitions=1/1 files=1 size=148.10KB
+|  |     predicates: hd_dep_count IN (3, 1, 1)
+|  |     row-size=8B cardinality=2.16K
+|  |
+|  07:HASH JOIN [INNER JOIN]
+|  |  hash predicates: ss_sold_date_sk = d_date_sk
+|  |  runtime filters: RF006 <- d_date_sk
+|  |  row-size=91B cardinality=10.43K
+|  |
+|  |--05:SCAN HDFS [tpcds.date_dim]
+|  |     HDFS partitions=1/1 files=1 size=9.84MB
+|  |     predicates: d_year = 2001
+|  |     row-size=8B cardinality=373
+|  |
+|  06:HASH JOIN [INNER JOIN]
+|  |  hash predicates: ss_addr_sk = ca_address_sk
+|  |  other predicates: ca_state IN ('TX', 'OH', 'TX') OR ss_net_profit <= 300 OR ss_net_profit <= 250, ca_state IN ('TX', 'OH', 'TX') OR ss_net_profit <= 300 OR ss_net_profit >= 50, ca_state IN ('TX', 'OH', 'TX') OR ss_net_profit >= 150 OR ss_net_profit <= 250, ca_state IN ('TX', 'OH', 'TX') OR ss_net_profit >= 150 OR ss_net_profit >= 50, ss_net_profit <= 200 OR ca_state IN ('OR', 'NM', 'KY') OR ss_net_profit <= 250, ss_net_profit <= 200 OR ca_state IN ('OR', 'NM', 'KY') OR ss_net_profi [...]
+|  |  runtime filters: RF008 <- ca_address_sk
+|  |  row-size=83B cardinality=50.98K
+|  |
+|  |--04:SCAN HDFS [tpcds.customer_address]
+|  |     HDFS partitions=1/1 files=1 size=5.25MB
+|  |     predicates: ca_state IN ('TX', 'OH', 'TX', 'OR', 'NM', 'KY', 'VA', 'TX', 'MS'), ca_country = 'United States'
+|  |     row-size=43B cardinality=8.82K
+|  |
+|  00:SCAN HDFS [tpcds.store_sales]
+|     HDFS partitions=1824/1824 files=1824 size=346.60MB
+|     predicates: ss_net_profit <= 200 OR ss_net_profit <= 300 OR ss_net_profit <= 250, ss_net_profit <= 200 OR ss_net_profit <= 300 OR ss_net_profit >= 50, ss_net_profit <= 200 OR ss_net_profit >= 150 OR ss_net_profit <= 250, ss_net_profit <= 200 OR ss_net_profit >= 150 OR ss_net_profit >= 50, ss_net_profit >= 100 OR ss_net_profit <= 300 OR ss_net_profit <= 250, ss_net_profit >= 100 OR ss_net_profit <= 300 OR ss_net_profit >= 50, ss_net_profit >= 100 OR ss_net_profit >= 150 OR ss_net_pr [...]
+|     runtime filters: RF000 -> ss_store_sk, RF004 -> ss_hdemo_sk, RF006 -> ss_sold_date_sk, RF008 -> ss_addr_sk
+|     row-size=40B cardinality=288.04K
+|
+02:SCAN HDFS [tpcds.customer_demographics]
+   HDFS partitions=1/1 files=1 size=76.92MB
+   predicates: cd_marital_status = 'M' OR cd_marital_status = 'S' AND cd_education_status = 'College' OR cd_marital_status = 'W' AND cd_education_status = '2 yr Degree', cd_education_status = 'Advanced Degree' OR cd_marital_status = 'S' AND cd_education_status = 'College' OR cd_marital_status = 'W' AND cd_education_status = '2 yr Degree'
+   runtime filters: RF002 -> cd_demo_sk
+   row-size=39B cardinality=181.75K
+====
 # TPCDS-Q19
 select
   i_brand_id brand_id,
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index c72982b..c5592f8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -1520,6 +1520,119 @@ PLAN-ROOT SINK
    runtime filters: RF006 -> l_suppkey, RF008 -> l_orderkey
    row-size=54B cardinality=600.12K
 ====
+# TPCH-Q7 (with CNF rewrite enabled)
+# Q7 - Volume Shipping Query
+select
+  supp_nation,
+  cust_nation,
+  l_year,
+  sum(volume) as revenue
+from (
+  select
+    n1.n_name as supp_nation,
+    n2.n_name as cust_nation,
+    year(l_shipdate) as l_year,
+    l_extendedprice * (1 - l_discount) as volume
+  from
+    supplier,
+    lineitem,
+    orders,
+    customer,
+    nation n1,
+    nation n2
+  where
+    s_suppkey = l_suppkey
+    and o_orderkey = l_orderkey
+    and c_custkey = o_custkey
+    and s_nationkey = n1.n_nationkey
+    and c_nationkey = n2.n_nationkey
+    and (
+      (n1.n_name = 'FRANCE' and n2.n_name = 'GERMANY')
+      or (n1.n_name = 'GERMANY' and n2.n_name = 'FRANCE')
+    )
+    and l_shipdate between '1995-01-01' and '1996-12-31'
+  ) as shipping
+group by
+  supp_nation,
+  cust_nation,
+  l_year
+order by
+  supp_nation,
+  cust_nation,
+  l_year
+---- QUERYOPTIONS
+ENABLE_CNF_REWRITES=true
+---- PLAN
+Max Per-Host Resource Reservation: Memory=73.70MB Threads=7
+Per-Host Resource Estimates: Memory=648MB
+PLAN-ROOT SINK
+|
+12:SORT
+|  order by: supp_nation ASC, cust_nation ASC, l_year ASC
+|  row-size=58B cardinality=3.69K
+|
+11:AGGREGATE [FINALIZE]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  group by: n1.n_name, n2.n_name, year(l_shipdate)
+|  row-size=58B cardinality=3.69K
+|
+10:HASH JOIN [INNER JOIN]
+|  hash predicates: c_nationkey = n2.n_nationkey
+|  other predicates: n1.n_name = 'FRANCE' OR n2.n_name = 'FRANCE', n2.n_name = 'GERMANY' OR n1.n_name = 'GERMANY'
+|  runtime filters: RF000 <- n2.n_nationkey
+|  row-size=132B cardinality=3.69K
+|
+|--05:SCAN HDFS [tpch.nation n2]
+|     HDFS partitions=1/1 files=1 size=2.15KB
+|     predicates: n2.n_name IN ('GERMANY', 'FRANCE')
+|     row-size=21B cardinality=2
+|
+09:HASH JOIN [INNER JOIN]
+|  hash predicates: o_custkey = c_custkey
+|  runtime filters: RF002 <- c_custkey
+|  row-size=111B cardinality=46.06K
+|
+|--03:SCAN HDFS [tpch.customer]
+|     HDFS partitions=1/1 files=1 size=23.08MB
+|     runtime filters: RF000 -> c_nationkey
+|     row-size=10B cardinality=150.00K
+|
+08:HASH JOIN [INNER JOIN]
+|  hash predicates: s_nationkey = n1.n_nationkey
+|  runtime filters: RF004 <- n1.n_nationkey
+|  row-size=101B cardinality=46.06K
+|
+|--04:SCAN HDFS [tpch.nation n1]
+|     HDFS partitions=1/1 files=1 size=2.15KB
+|     predicates: n1.n_name IN ('FRANCE', 'GERMANY')
+|     row-size=21B cardinality=2
+|
+07:HASH JOIN [INNER JOIN]
+|  hash predicates: l_suppkey = s_suppkey
+|  runtime filters: RF006 <- s_suppkey
+|  row-size=80B cardinality=575.77K
+|
+|--00:SCAN HDFS [tpch.supplier]
+|     HDFS partitions=1/1 files=1 size=1.33MB
+|     runtime filters: RF004 -> s_nationkey
+|     row-size=10B cardinality=10.00K
+|
+06:HASH JOIN [INNER JOIN]
+|  hash predicates: l_orderkey = o_orderkey
+|  runtime filters: RF008 <- o_orderkey
+|  row-size=70B cardinality=575.77K
+|
+|--02:SCAN HDFS [tpch.orders]
+|     HDFS partitions=1/1 files=1 size=162.56MB
+|     runtime filters: RF002 -> o_custkey
+|     row-size=16B cardinality=1.50M
+|
+01:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipdate <= '1996-12-31', l_shipdate >= '1995-01-01'
+   runtime filters: RF006 -> l_suppkey, RF008 -> l_orderkey
+   row-size=54B cardinality=600.12K
+====
 # TPCH-Q8
 # Q8 - National Market Share Query
 select
@@ -4173,6 +4286,71 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=72B cardinality=801.95K
 ====
+# TPCH-Q19 (with CNF rewrite enabled)
+# Q19 - Discounted Revenue Query
+select
+  sum(l_extendedprice * (1 - l_discount)) as revenue
+from
+  lineitem,
+  part
+where
+  p_partkey = l_partkey
+  and (
+    (
+      p_brand = 'Brand#12'
+      and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
+      and l_quantity >= 1 and l_quantity <= 11
+      and p_size between 1 and 5
+      and l_shipmode in ('AIR', 'AIR REG')
+      and l_shipinstruct = 'DELIVER IN PERSON'
+    )
+    or
+    (
+      p_brand = 'Brand#23'
+      and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
+      and l_quantity >= 10 and l_quantity <= 20
+      and p_size between 1 and 10
+      and l_shipmode in ('AIR', 'AIR REG')
+      and l_shipinstruct = 'DELIVER IN PERSON'
+    )
+    or
+    (
+      p_brand = 'Brand#34'
+      and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
+      and l_quantity >= 20 and l_quantity <= 30
+      and p_size between 1 and 15
+      and l_shipmode in ('AIR', 'AIR REG')
+      and l_shipinstruct = 'DELIVER IN PERSON'
+    )
+  )
+---- QUERYOPTIONS
+ENABLE_CNF_REWRITES=true
+---- PLAN
+Max Per-Host Resource Reservation: Memory=18.94MB Threads=3
+Per-Host Resource Estimates: Memory=331MB
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: sum(l_extendedprice * (1 - l_discount))
+|  row-size=16B cardinality=1
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: l_partkey = p_partkey
+|  other predicates: l_quantity <= 11 OR l_quantity <= 20 OR p_size <= 15, l_quantity <= 11 OR l_quantity >= 10 OR p_size <= 15, l_quantity <= 11 OR p_size <= 10 OR l_quantity <= 30, l_quantity <= 11 OR p_size <= 10 OR l_quantity >= 20, l_quantity <= 11 OR p_size <= 10 OR p_size <= 15, l_quantity >= 1 OR l_quantity <= 20 OR p_size <= 15, l_quantity >= 1 OR l_quantity >= 10 OR p_size <= 15, l_quantity >= 1 OR p_size <= 10 OR l_quantity <= 30, l_quantity >= 1 OR p_size <= 10 OR l_quantity  [...]
+|  runtime filters: RF000 <- p_partkey
+|  row-size=124B cardinality=1.41K
+|
+|--01:SCAN HDFS [tpch.part]
+|     HDFS partitions=1/1 files=1 size=22.83MB
+|     predicates: p_brand = 'Brand#12' OR p_brand = 'Brand#23' AND p_container IN ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') OR p_brand = 'Brand#34' AND p_container IN ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG'), p_container IN ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG') OR p_brand = 'Brand#23' AND p_container IN ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK') OR p_brand = 'Brand#34' AND p_container IN ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG'), p_size >= 1, p_size <= 5 OR p_size <= 10 OR [...]
+|     row-size=52B cardinality=1.43K
+|
+00:SCAN HDFS [tpch.lineitem]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   predicates: l_shipmode IN ('AIR', 'AIR REG'), l_shipinstruct = 'DELIVER IN PERSON', l_quantity <= 11 OR l_quantity <= 20 OR l_quantity <= 30, l_quantity <= 11 OR l_quantity <= 20 OR l_quantity >= 20, l_quantity <= 11 OR l_quantity >= 10 OR l_quantity <= 30, l_quantity <= 11 OR l_quantity >= 10 OR l_quantity >= 20, l_quantity >= 1 OR l_quantity <= 20 OR l_quantity <= 30, l_quantity >= 1 OR l_quantity <= 20 OR l_quantity >= 20, l_quantity >= 1 OR l_quantity >= 10 OR l_quantity <= 30, l_ [...]
+   runtime filters: RF000 -> l_partkey
+   row-size=72B cardinality=197.63K
+====
 # TPCH-Q20
 # Q20 - Potential Part Promotion Query
 select