You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/20 18:42:21 UTC

spark git commit: [SPARK-9186][SQL] make deterministic describing the tree rather than the expression

Repository: spark
Updated Branches:
  refs/heads/master a15ecd057 -> 04db58ae3


[SPARK-9186][SQL] make deterministic describing the tree rather than the expression

Author: Wenchen Fan <cl...@outlook.com>

Closes #7525 from cloud-fan/deterministic and squashes the following commits:

4189bfa [Wenchen Fan] make deterministic describing the tree rather than the expression


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04db58ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04db58ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04db58ae

Branch: refs/heads/master
Commit: 04db58ae30d2f73af45b7e6813f97be62dc92095
Parents: a15ecd0
Author: Wenchen Fan <cl...@outlook.com>
Authored: Mon Jul 20 09:42:18 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Jul 20 09:42:18 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/expressions/Expression.scala     | 12 +++++++++++-
 .../apache/spark/sql/catalyst/expressions/random.scala  | 10 +++++-----
 .../apache/spark/sql/catalyst/optimizer/Optimizer.scala |  4 ++--
 .../expressions/MonotonicallyIncreasingID.scala         |  6 ++----
 .../sql/execution/expressions/SparkPartitionID.scala    |  6 ++----
 5 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index d0a1aa9..da599b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -65,10 +65,12 @@ abstract class Expression extends TreeNode[Expression] {
    * Note that this means that an expression should be considered as non-deterministic if:
    * - if it relies on some mutable internal state, or
    * - if it relies on some implicit input that is not part of the children expression list.
+   * - if it has non-deterministic child or children.
    *
    * An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext.
+   * By default leaf expressions are deterministic as Nil.forall(_.deterministic) returns true.
    */
-  def deterministic: Boolean = true
+  def deterministic: Boolean = children.forall(_.deterministic)
 
   def nullable: Boolean
 
@@ -183,6 +185,14 @@ trait Unevaluable { self: Expression =>
     throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
 }
 
+/**
+ * An expression that is nondeterministic.
+ */
+trait Nondeterministic { self: Expression =>
+
+  override def deterministic: Boolean = false
+}
+
 
 /**
  * A leaf expression, i.e. one without any child expressions.

http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 822898e..aef24a5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -32,7 +32,9 @@ import org.apache.spark.util.random.XORShiftRandom
  *
  * Since this expression is stateful, it cannot be a case object.
  */
-abstract class RDG(seed: Long) extends LeafExpression with Serializable {
+abstract class RDG extends LeafExpression with Nondeterministic {
+
+  protected def seed: Long
 
   /**
    * Record ID within each partition. By being transient, the Random Number Generator is
@@ -40,15 +42,13 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable {
    */
   @transient protected lazy val rng = new XORShiftRandom(seed + TaskContext.getPartitionId)
 
-  override def deterministic: Boolean = false
-
   override def nullable: Boolean = false
 
   override def dataType: DataType = DoubleType
 }
 
 /** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */
-case class Rand(seed: Long) extends RDG(seed) {
+case class Rand(seed: Long) extends RDG {
   override def eval(input: InternalRow): Double = rng.nextDouble()
 
   def this() = this(Utils.random.nextLong())
@@ -71,7 +71,7 @@ case class Rand(seed: Long) extends RDG(seed) {
 }
 
 /** Generate a random column with i.i.d. gaussian random distribution. */
-case class Randn(seed: Long) extends RDG(seed) {
+case class Randn(seed: Long) extends RDG {
   override def eval(input: InternalRow): Double = rng.nextGaussian()
 
   def this() = this(Utils.random.nextLong())

http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0f28a0d..fafdae0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -216,9 +216,9 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
 
       // We only collapse these two Projects if their overlapped expressions are all
       // deterministic.
-      val hasNondeterministic = projectList1.flatMap(_.collect {
+      val hasNondeterministic = projectList1.exists(_.collect {
         case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
-      }).exists(_.find(!_.deterministic).isDefined)
+      }.exists(!_.deterministic))
 
       if (hasNondeterministic) {
         p

http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index 4d8ed08..2645eb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions
 
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
 import org.apache.spark.sql.types.{LongType, DataType}
 
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{LongType, DataType}
  *
  * Since this expression is stateful, it cannot be a case object.
  */
-private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
+private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
 
   /**
    * Record ID within each partition. By being transient, count's value is reset to 0 every time
@@ -43,8 +43,6 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
 
   @transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33
 
-  override def deterministic: Boolean = false
-
   override def nullable: Boolean = false
 
   override def dataType: DataType = LongType

http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
index 43ffc9c..53ddd47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions
 
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
 import org.apache.spark.sql.types.{IntegerType, DataType}
 
@@ -27,9 +27,7 @@ import org.apache.spark.sql.types.{IntegerType, DataType}
 /**
  * Expression that returns the current partition id of the Spark task.
  */
-private[sql] case object SparkPartitionID extends LeafExpression {
-
-  override def deterministic: Boolean = false
+private[sql] case object SparkPartitionID extends LeafExpression with Nondeterministic {
 
   override def nullable: Boolean = false
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org