You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/10 06:06:44 UTC

[GitHub] [spark] cloud-fan commented on a change in pull request #28733: [SPARK-31705][SQL] Push more possible predicates through Join via CNF conversion

cloud-fan commented on a change in pull request #28733:
URL: https://github.com/apache/spark/pull/28733#discussion_r437879694



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
##########
@@ -198,6 +200,90 @@ trait PredicateHelper {
     case e: Unevaluable => false
     case e => e.children.forall(canEvaluateWithinJoin)
   }
+
+  /**
+   * Convert an expression into conjunctive normal form.
+   * Definition and algorithm: https://en.wikipedia.org/wiki/Conjunctive_normal_form
+   * CNF can explode exponentially in the size of the input expression when converting Or clauses.
+   * Use a configuration MAX_CNF_NODE_COUNT to prevent such cases.
+   *
+   * @param condition to be conversed into CNF.
+   * @return If the number of expressions exceeds threshold on converting Or, return Seq.empty.
+   *         If the conversion repeatedly expands nondeterministic expressions, return Seq.empty.
+   *         Otherwise, return the converted result as sequence of disjunctive expressions.
+   */
+  def conjunctiveNormalForm(condition: Expression): Seq[Expression] = {
+    val postOrderNodes = postOrderTraversal(condition)
+    val resultStack = new mutable.Stack[Seq[Expression]]
+    val maxCnfNodeCount = SQLConf.get.maxCnfNodeCount
+    // Bottom up approach to get CNF of sub-expressions
+    while (postOrderNodes.nonEmpty) {
+      val cnf = postOrderNodes.pop() match {
+        case _: And =>
+          val right: Seq[Expression] = resultStack.pop()
+          val left: Seq[Expression] = resultStack.pop()
+          left ++ right
+        case _: Or =>
+          // For each side, there is no need to expand predicates of the same references.
+          // So here we can aggregate predicates of the same references as one single predicate,
+          // for reducing the size of pushed down predicates and corresponding codegen.
+          val right = aggregateExpressionsOfSameQualifiers(resultStack.pop())
+          val left = aggregateExpressionsOfSameQualifiers(resultStack.pop())
+          // Stop the loop whenever the result exceeds the `maxCnfNodeCount`
+          if (left.size * right.size > maxCnfNodeCount) {
+            Seq.empty
+          } else {
+            for {x <- left; y <- right} yield Or(x, y)
+          }
+        case other => other :: Nil
+      }
+      if (cnf.isEmpty) {
+        return Seq.empty
+      }
+      if (resultStack.length != 1) {
+        logWarning("The length of CNF conversion result stack is supposed to be 1. There might " +
+          "be something wrong with CNF conversion.")
+      }
+      resultStack.push(cnf)
+    }
+    resultStack.top
+  }
+
+  private def aggregateExpressionsOfSameQualifiers(

Review comment:
       nit: `groupByExprsByQualifier`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org