You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/12 06:43:44 UTC

[GitHub] [spark] wangyum commented on a change in pull request #28805: [SPARK-28169][SQL] Convert scan predicate condition to CNF

wangyum commented on a change in pull request #28805:
URL: https://github.com/apache/spark/pull/28805#discussion_r439236301



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
##########
@@ -253,6 +253,58 @@ trait PredicateHelper extends Logging {
     expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq
   }
 
+  /**
+   * Convert an expression into conjunctive normal form for partition pruning.
+   * Definition and algorithm: https://en.wikipedia.org/wiki/Conjunctive_normal_form
+   * CNF can explode exponentially in the size of the input expression when converting [[Or]]
+   * clauses. Use a configuration [[SQLConf.MAX_CNF_NODE_COUNT]] to prevent such cases.
+   *
+   * @param condition to be converted into CNF.
+   * @return the CNF result as sequence of disjunctive expressions. If the number of expressions
+   *         exceeds threshold on converting `Or`, `Seq.empty` is returned.
+   */
+  def conjunctiveNormalFormForPartitionPruning(condition: Expression): Seq[Expression] = {
+    val postOrderNodes = postOrderTraversal(condition)
+    val resultStack = new mutable.Stack[Seq[Expression]]
+    val maxCnfNodeCount = SQLConf.get.maxCnfNodeCount
+    // Bottom up approach to get CNF of sub-expressions
+    while (postOrderNodes.nonEmpty) {
+      val cnf = postOrderNodes.pop() match {
+        case _: And =>
+          val right = resultStack.pop()
+          val left = resultStack.pop()
+          left ++ right
+        case _: Or =>
+          // For each side, there is no need to expand predicates of the same references.
+          // So here we can aggregate predicates of the same qualifier as one single predicate,
+          // for reducing the size of pushed down predicates and corresponding codegen.
+          val right = groupExpressionsByReference(resultStack.pop())
+          val left = groupExpressionsByReference(resultStack.pop())
+          // Stop the loop whenever the result exceeds the `maxCnfNodeCount`
+          if (left.size * right.size > maxCnfNodeCount) {
+            logInfo(s"As the result size exceeds the threshold $maxCnfNodeCount. " +
+              "The CNF conversion is skipped and returning Seq.empty now. To avoid this, you can " +
+              s"raise the limit ${SQLConf.MAX_CNF_NODE_COUNT.key}.")
+            return Seq.empty
+          } else {
+            for { x <- left; y <- right } yield Or(x, y)
+          }
+        case other => other :: Nil
+      }
+      resultStack.push(cnf)
+    }
+    if (resultStack.length != 1) {
+      logWarning("The length of CNF conversion result stack is supposed to be 1. There might " +
+        "be something wrong with CNF conversion.")
+      return Seq.empty
+    }
+    resultStack.top
+  }
+
+  private def groupExpressionsByReference(expressions: Seq[Expression]): Seq[Expression] = {
+    expressions.groupBy(_.references).map(_._2.reduceLeft(And)).toSeq
+  }
+

Review comment:
       Too many duplicate code here. Maybe you can organize code:
   ```scala
     def conjunctiveNormalFormAndGroupExpsByQualifier(condition: Expression): Seq[Expression] = {
       conjunctiveNormalForm(condition,
         (expressions: Seq[Expression]) =>
           expressions.groupBy(_.references.map(_.qualifier)).map(_._2.reduceLeft(And)).toSeq)
     }
   
     def conjunctiveNormalFormAndGroupExpsByReference(condition: Expression): Seq[Expression] = {
       conjunctiveNormalForm(condition,
         (expressions: Seq[Expression]) =>
           expressions.groupBy(_.references).map(_._2.reduceLeft(And)).toSeq)
     }
   
     def conjunctiveNormalForm(
         condition: Expression,
         groupExpsFunc: Seq[Expression] => Seq[Expression] = _.toSeq): Seq[Expression] = {
       val postOrderNodes = postOrderTraversal(condition)
       val resultStack = new mutable.Stack[Seq[Expression]]
       val maxCnfNodeCount = SQLConf.get.maxCnfNodeCount
       // Bottom up approach to get CNF of sub-expressions
       while (postOrderNodes.nonEmpty) {
         val cnf = postOrderNodes.pop() match {
           case _: And =>
             val right = resultStack.pop()
             val left = resultStack.pop()
             left ++ right
           case _: Or =>
             // For each side, there is no need to expand predicates of the same references.
             // So here we can aggregate predicates of the same qualifier as one single predicate,
             // for reducing the size of pushed down predicates and corresponding codegen.
             val right = groupExpsFunc(resultStack.pop())
             val left = groupExpsFunc(resultStack.pop())
             // Stop the loop whenever the result exceeds the `maxCnfNodeCount`
             if (left.size * right.size > maxCnfNodeCount) {
               logInfo(s"As the result size exceeds the threshold $maxCnfNodeCount. " +
                 "The CNF conversion is skipped and returning Seq.empty now. To avoid this, you can " +
                 s"raise the limit ${SQLConf.MAX_CNF_NODE_COUNT.key}.")
               return Seq.empty
             } else {
               for { x <- left; y <- right } yield Or(x, y)
             }
           case other => other :: Nil
         }
         resultStack.push(cnf)
       }
       if (resultStack.length != 1) {
         logWarning("The length of CNF conversion result stack is supposed to be 1. There might " +
           "be something wrong with CNF conversion.")
         return Seq.empty
       }
       resultStack.top
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org