You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/20 18:42:21 UTC
spark git commit: [SPARK-9186][SQL] make deterministic describing the
tree rather than the expression
Repository: spark
Updated Branches:
refs/heads/master a15ecd057 -> 04db58ae3
[SPARK-9186][SQL] make deterministic describing the tree rather than the expression
Author: Wenchen Fan <cl...@outlook.com>
Closes #7525 from cloud-fan/deterministic and squashes the following commits:
4189bfa [Wenchen Fan] make deterministic describing the tree rather than the expression
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04db58ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04db58ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04db58ae
Branch: refs/heads/master
Commit: 04db58ae30d2f73af45b7e6813f97be62dc92095
Parents: a15ecd0
Author: Wenchen Fan <cl...@outlook.com>
Authored: Mon Jul 20 09:42:18 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Jul 20 09:42:18 2015 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/expressions/Expression.scala | 12 +++++++++++-
.../apache/spark/sql/catalyst/expressions/random.scala | 10 +++++-----
.../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++--
.../expressions/MonotonicallyIncreasingID.scala | 6 ++----
.../sql/execution/expressions/SparkPartitionID.scala | 6 ++----
5 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index d0a1aa9..da599b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -65,10 +65,12 @@ abstract class Expression extends TreeNode[Expression] {
* Note that this means that an expression should be considered as non-deterministic if:
* - if it relies on some mutable internal state, or
* - if it relies on some implicit input that is not part of the children expression list.
+ * - if it has non-deterministic child or children.
*
* An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext.
+ * By default leaf expressions are deterministic as Nil.forall(_.deterministic) returns true.
*/
- def deterministic: Boolean = true
+ def deterministic: Boolean = children.forall(_.deterministic)
def nullable: Boolean
@@ -183,6 +185,14 @@ trait Unevaluable { self: Expression =>
throw new UnsupportedOperationException(s"Cannot evaluate expression: $this")
}
+/**
+ * An expression that is nondeterministic.
+ */
+trait Nondeterministic { self: Expression =>
+
+ override def deterministic: Boolean = false
+}
+
/**
* A leaf expression, i.e. one without any child expressions.
http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
index 822898e..aef24a5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala
@@ -32,7 +32,9 @@ import org.apache.spark.util.random.XORShiftRandom
*
* Since this expression is stateful, it cannot be a case object.
*/
-abstract class RDG(seed: Long) extends LeafExpression with Serializable {
+abstract class RDG extends LeafExpression with Nondeterministic {
+
+ protected def seed: Long
/**
* Record ID within each partition. By being transient, the Random Number Generator is
@@ -40,15 +42,13 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable {
*/
@transient protected lazy val rng = new XORShiftRandom(seed + TaskContext.getPartitionId)
- override def deterministic: Boolean = false
-
override def nullable: Boolean = false
override def dataType: DataType = DoubleType
}
/** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */
-case class Rand(seed: Long) extends RDG(seed) {
+case class Rand(seed: Long) extends RDG {
override def eval(input: InternalRow): Double = rng.nextDouble()
def this() = this(Utils.random.nextLong())
@@ -71,7 +71,7 @@ case class Rand(seed: Long) extends RDG(seed) {
}
/** Generate a random column with i.i.d. gaussian random distribution. */
-case class Randn(seed: Long) extends RDG(seed) {
+case class Randn(seed: Long) extends RDG {
override def eval(input: InternalRow): Double = rng.nextGaussian()
def this() = this(Utils.random.nextLong())
http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0f28a0d..fafdae0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -216,9 +216,9 @@ object ProjectCollapsing extends Rule[LogicalPlan] {
// We only collapse these two Projects if their overlapped expressions are all
// deterministic.
- val hasNondeterministic = projectList1.flatMap(_.collect {
+ val hasNondeterministic = projectList1.exists(_.collect {
case a: Attribute if aliasMap.contains(a) => aliasMap(a).child
- }).exists(_.find(!_.deterministic).isDefined)
+ }.exists(!_.deterministic))
if (hasNondeterministic) {
p
http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
index 4d8ed08..2645eb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/MonotonicallyIncreasingID.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types.{LongType, DataType}
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{LongType, DataType}
*
* Since this expression is stateful, it cannot be a case object.
*/
-private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
+private[sql] case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterministic {
/**
* Record ID within each partition. By being transient, count's value is reset to 0 every time
@@ -43,8 +43,6 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
@transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33
- override def deterministic: Boolean = false
-
override def nullable: Boolean = false
override def dataType: DataType = LongType
http://git-wip-us.apache.org/repos/asf/spark/blob/04db58ae/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
index 43ffc9c..53ddd47 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/expressions/SparkPartitionID.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.expressions
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.LeafExpression
+import org.apache.spark.sql.catalyst.expressions.{Nondeterministic, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types.{IntegerType, DataType}
@@ -27,9 +27,7 @@ import org.apache.spark.sql.types.{IntegerType, DataType}
/**
* Expression that returns the current partition id of the Spark task.
*/
-private[sql] case object SparkPartitionID extends LeafExpression {
-
- override def deterministic: Boolean = false
+private[sql] case object SparkPartitionID extends LeafExpression with Nondeterministic {
override def nullable: Boolean = false
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org