You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/18 20:59:05 UTC

spark git commit: [SPARK-9171][SQL] add and improve tests for nondeterministic expressions

Repository: spark
Updated Branches:
  refs/heads/master 692378c01 -> 86c50bf72


[SPARK-9171][SQL] add and improve tests for nondeterministic expressions

Author: Wenchen Fan <cl...@outlook.com>

Closes #7496 from cloud-fan/tests and squashes the following commits:

0958f90 [Wenchen Fan] improve test for nondeterministic expressions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86c50bf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86c50bf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86c50bf7

Branch: refs/heads/master
Commit: 86c50bf72c41d95107a55c16a6853dcda7f3e143
Parents: 692378c
Author: Wenchen Fan <cl...@outlook.com>
Authored: Sat Jul 18 11:58:53 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Jul 18 11:58:53 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskContext.scala    |   2 +-
 .../expressions/ExpressionEvalHelper.scala      | 108 ++++++++++---------
 .../expressions/MathFunctionsSuite.scala        |  18 +---
 .../sql/catalyst/expressions/RandomSuite.scala  |   6 +-
 .../spark/sql/ColumnExpressionSuite.scala       |   9 +-
 .../expression/NondeterministicSuite.scala      |  32 ++++++
 6 files changed, 103 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 345bb50..e93eb93 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -38,7 +38,7 @@ object TaskContext {
    */
   def getPartitionId(): Int = {
     val tc = taskContext.get()
-    if (tc == null) {
+    if (tc eq null) {
       0
     } else {
       tc.partitionId()

http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index c43486b..7a96044 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -23,7 +23,7 @@ import org.scalatest.Matchers._
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateProjection, GenerateMutableProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
 
@@ -38,7 +38,7 @@ trait ExpressionEvalHelper {
   }
 
   protected def checkEvaluation(
-      expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
+      expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
     checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
     checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow)
@@ -51,12 +51,14 @@ trait ExpressionEvalHelper {
 
   /**
    * Check the equality between result of expression and expected value, it will handle
-   * Array[Byte].
+   * Array[Byte] and Spread[Double].
    */
   protected def checkResult(result: Any, expected: Any): Boolean = {
     (result, expected) match {
       case (result: Array[Byte], expected: Array[Byte]) =>
         java.util.Arrays.equals(result, expected)
+      case (result: Double, expected: Spread[Double]) =>
+        expected.isWithin(result)
       case _ => result == expected
     }
   }
@@ -65,10 +67,29 @@ trait ExpressionEvalHelper {
     expression.eval(inputRow)
   }
 
+  protected def generateProject(
+      generator: => Projection,
+      expression: Expression): Projection = {
+    try {
+      generator
+    } catch {
+      case e: Throwable =>
+        val ctx = new CodeGenContext
+        val evaluated = expression.gen(ctx)
+        fail(
+          s"""
+            |Code generation of $expression failed:
+            |${evaluated.code}
+            |$e
+          """.stripMargin)
+    }
+  }
+
   protected def checkEvaluationWithoutCodegen(
       expression: Expression,
       expected: Any,
       inputRow: InternalRow = EmptyRow): Unit = {
+
     val actual = try evaluate(expression, inputRow) catch {
       case e: Exception => fail(s"Exception evaluating $expression", e)
     }
@@ -85,21 +106,11 @@ trait ExpressionEvalHelper {
       expected: Any,
       inputRow: InternalRow = EmptyRow): Unit = {
 
-    val plan = try {
-      GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)()
-    } catch {
-      case e: Throwable =>
-        val ctx = GenerateProjection.newCodeGenContext()
-        val evaluated = expression.gen(ctx)
-        fail(
-          s"""
-            |Code generation of $expression failed:
-            |${evaluated.code}
-            |$e
-          """.stripMargin)
-    }
+    val plan = generateProject(
+      GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)(),
+      expression)
 
-    val actual = plan(inputRow).apply(0)
+    val actual = plan(inputRow).get(0)
     if (!checkResult(actual, expected)) {
       val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
       fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input")
@@ -110,24 +121,19 @@ trait ExpressionEvalHelper {
       expression: Expression,
       expected: Any,
       inputRow: InternalRow = EmptyRow): Unit = {
-    val ctx = GenerateProjection.newCodeGenContext()
-    lazy val evaluated = expression.gen(ctx)
 
-    val plan = try {
-      GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)
-    } catch {
-      case e: Throwable =>
-        fail(
-          s"""
-            |Code generation of $expression failed:
-            |${evaluated.code}
-            |$e
-          """.stripMargin)
-    }
+    val plan = generateProject(
+      GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+      expression)
 
     val actual = plan(inputRow)
     val expectedRow = InternalRow(expected)
+
+    // We reimplement hashCode in generated `SpecificRow`, make sure it's consistent with our
+    // interpreted version.
     if (actual.hashCode() != expectedRow.hashCode()) {
+      val ctx = new CodeGenContext
+      val evaluated = expression.gen(ctx)
       fail(
         s"""
           |Mismatched hashCodes for values: $actual, $expectedRow
@@ -136,9 +142,10 @@ trait ExpressionEvalHelper {
           |Code: $evaluated
         """.stripMargin)
     }
+
     if (actual != expectedRow) {
       val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
-      fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input")
+      fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input")
     }
     if (actual.copy() != expectedRow) {
       fail(s"Copy of generated Row is wrong: actual: ${actual.copy()}, expected: $expectedRow")
@@ -149,20 +156,10 @@ trait ExpressionEvalHelper {
       expression: Expression,
       expected: Any,
       inputRow: InternalRow = EmptyRow): Unit = {
-    val ctx = GenerateUnsafeProjection.newCodeGenContext()
-    lazy val evaluated = expression.gen(ctx)
 
-    val plan = try {
-      GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)
-    } catch {
-      case e: Throwable =>
-        fail(
-          s"""
-            |Code generation of $expression failed:
-            |${evaluated.code}
-            |$e
-          """.stripMargin)
-    }
+    val plan = generateProject(
+      GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+      expression)
 
     val unsafeRow = plan(inputRow)
     // UnsafeRow cannot be compared with GenericInternalRow directly
@@ -170,7 +167,7 @@ trait ExpressionEvalHelper {
     val expectedRow = InternalRow(expected)
     if (actual != expectedRow) {
       val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
-      fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input")
+      fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input")
     }
   }
 
@@ -184,12 +181,23 @@ trait ExpressionEvalHelper {
   }
 
   protected def checkDoubleEvaluation(
-      expression: Expression,
+      expression: => Expression,
       expected: Spread[Double],
       inputRow: InternalRow = EmptyRow): Unit = {
-    val actual = try evaluate(expression, inputRow) catch {
-      case e: Exception => fail(s"Exception evaluating $expression", e)
-    }
-    actual.asInstanceOf[Double] shouldBe expected
+    checkEvaluationWithoutCodegen(expression, expected)
+    checkEvaluationWithGeneratedMutableProjection(expression, expected)
+    checkEvaluationWithOptimization(expression, expected)
+
+    var plan = generateProject(
+      GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+      expression)
+    var actual = plan(inputRow).get(0)
+    assert(checkResult(actual, expected))
+
+    plan = generateProject(
+      GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+      expression)
+    actual = FromUnsafeProjection(expression.dataType :: Nil)(plan(inputRow)).get(0)
+    assert(checkResult(actual, expected))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index df988f5..04acd5b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -143,7 +143,6 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       case e: Exception => fail(s"Exception evaluating $expression", e)
     }
     if (!actual.asInstanceOf[Double].isNaN) {
-      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
       fail(s"Incorrect evaluation (codegen off): $expression, " +
         s"actual: $actual, " +
         s"expected: NaN")
@@ -155,23 +154,12 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     expression: Expression,
     inputRow: InternalRow = EmptyRow): Unit = {
 
-    val plan = try {
-      GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)()
-    } catch {
-      case e: Throwable =>
-        val ctx = GenerateProjection.newCodeGenContext()
-        val evaluated = expression.gen(ctx)
-        fail(
-          s"""
-             |Code generation of $expression failed:
-             |${evaluated.code}
-             |$e
-          """.stripMargin)
-    }
+    val plan = generateProject(
+      GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)(),
+      expression)
 
     val actual = plan(inputRow).apply(0)
     if (!actual.asInstanceOf[Double].isNaN) {
-      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
       fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: NaN")
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
index 9be2b23..698c81b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
@@ -21,13 +21,13 @@ import org.scalatest.Matchers._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.types.{DoubleType, IntegerType}
+import org.apache.spark.sql.types.DoubleType
 
 
 class RandomSuite extends SparkFunSuite with ExpressionEvalHelper {
 
   test("random") {
-    val row = create_row(1.1, 2.0, 3.1, null)
-    checkDoubleEvaluation(Rand(30), (0.7363714192755834 +- 0.001), row)
+    checkDoubleEvaluation(Rand(30), 0.7363714192755834 +- 0.001)
+    checkDoubleEvaluation(Randn(30), 0.5181478766595276 +- 0.001)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 8f15479..6bd5804 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -450,7 +450,7 @@ class ColumnExpressionSuite extends QueryTest {
 
   test("monotonicallyIncreasingId") {
     // Make sure we have 2 partitions, each with 2 records.
-    val df = ctx.sparkContext.parallelize(1 to 2, 2).mapPartitions { iter =>
+    val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
       Iterator(Tuple1(1), Tuple1(2))
     }.toDF("a")
     checkAnswer(
@@ -460,10 +460,13 @@ class ColumnExpressionSuite extends QueryTest {
   }
 
   test("sparkPartitionId") {
-    val df = ctx.sparkContext.parallelize(1 to 1, 1).map(i => (i, i)).toDF("a", "b")
+    // Make sure we have 2 partitions, each with 2 records.
+    val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
+      Iterator(Tuple1(1), Tuple1(2))
+    }.toDF("a")
     checkAnswer(
       df.select(sparkPartitionId()),
-      Row(0)
+      Row(0) :: Row(0) :: Row(1) :: Row(1) :: Nil
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
new file mode 100644
index 0000000..99e11fd
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.expression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions. ExpressionEvalHelper
+import org.apache.spark.sql.execution.expressions.{SparkPartitionID, MonotonicallyIncreasingID}
+
+class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
+  test("MonotonicallyIncreasingID") {
+    checkEvaluation(MonotonicallyIncreasingID(), 0)
+  }
+
+  test("SparkPartitionID") {
+    checkEvaluation(SparkPartitionID, 0)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org