You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/07/18 20:59:05 UTC
spark git commit: [SPARK-9171][SQL] add and improve tests for
nondeterministic expressions
Repository: spark
Updated Branches:
refs/heads/master 692378c01 -> 86c50bf72
[SPARK-9171][SQL] add and improve tests for nondeterministic expressions
Author: Wenchen Fan <cl...@outlook.com>
Closes #7496 from cloud-fan/tests and squashes the following commits:
0958f90 [Wenchen Fan] improve test for nondeterministic expressions
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86c50bf7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86c50bf7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86c50bf7
Branch: refs/heads/master
Commit: 86c50bf72c41d95107a55c16a6853dcda7f3e143
Parents: 692378c
Author: Wenchen Fan <cl...@outlook.com>
Authored: Sat Jul 18 11:58:53 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Jul 18 11:58:53 2015 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/TaskContext.scala | 2 +-
.../expressions/ExpressionEvalHelper.scala | 108 ++++++++++---------
.../expressions/MathFunctionsSuite.scala | 18 +---
.../sql/catalyst/expressions/RandomSuite.scala | 6 +-
.../spark/sql/ColumnExpressionSuite.scala | 9 +-
.../expression/NondeterministicSuite.scala | 32 ++++++
6 files changed, 103 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 345bb50..e93eb93 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -38,7 +38,7 @@ object TaskContext {
*/
def getPartitionId(): Int = {
val tc = taskContext.get()
- if (tc == null) {
+ if (tc eq null) {
0
} else {
tc.partitionId()
http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index c43486b..7a96044 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -23,7 +23,7 @@ import org.scalatest.Matchers._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateUnsafeProjection, GenerateProjection, GenerateMutableProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
@@ -38,7 +38,7 @@ trait ExpressionEvalHelper {
}
protected def checkEvaluation(
- expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
+ expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow)
@@ -51,12 +51,14 @@ trait ExpressionEvalHelper {
/**
* Check the equality between result of expression and expected value, it will handle
- * Array[Byte].
+ * Array[Byte] and Spread[Double].
*/
protected def checkResult(result: Any, expected: Any): Boolean = {
(result, expected) match {
case (result: Array[Byte], expected: Array[Byte]) =>
java.util.Arrays.equals(result, expected)
+ case (result: Double, expected: Spread[Double]) =>
+ expected.isWithin(result)
case _ => result == expected
}
}
@@ -65,10 +67,29 @@ trait ExpressionEvalHelper {
expression.eval(inputRow)
}
+ protected def generateProject(
+ generator: => Projection,
+ expression: Expression): Projection = {
+ try {
+ generator
+ } catch {
+ case e: Throwable =>
+ val ctx = new CodeGenContext
+ val evaluated = expression.gen(ctx)
+ fail(
+ s"""
+ |Code generation of $expression failed:
+ |${evaluated.code}
+ |$e
+ """.stripMargin)
+ }
+ }
+
protected def checkEvaluationWithoutCodegen(
expression: Expression,
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
+
val actual = try evaluate(expression, inputRow) catch {
case e: Exception => fail(s"Exception evaluating $expression", e)
}
@@ -85,21 +106,11 @@ trait ExpressionEvalHelper {
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
- val plan = try {
- GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)()
- } catch {
- case e: Throwable =>
- val ctx = GenerateProjection.newCodeGenContext()
- val evaluated = expression.gen(ctx)
- fail(
- s"""
- |Code generation of $expression failed:
- |${evaluated.code}
- |$e
- """.stripMargin)
- }
+ val plan = generateProject(
+ GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)(),
+ expression)
- val actual = plan(inputRow).apply(0)
+ val actual = plan(inputRow).get(0)
if (!checkResult(actual, expected)) {
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input")
@@ -110,24 +121,19 @@ trait ExpressionEvalHelper {
expression: Expression,
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
- val ctx = GenerateProjection.newCodeGenContext()
- lazy val evaluated = expression.gen(ctx)
- val plan = try {
- GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)
- } catch {
- case e: Throwable =>
- fail(
- s"""
- |Code generation of $expression failed:
- |${evaluated.code}
- |$e
- """.stripMargin)
- }
+ val plan = generateProject(
+ GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+ expression)
val actual = plan(inputRow)
val expectedRow = InternalRow(expected)
+
+ // We reimplement hashCode in generated `SpecificRow`, make sure it's consistent with our
+ // interpreted version.
if (actual.hashCode() != expectedRow.hashCode()) {
+ val ctx = new CodeGenContext
+ val evaluated = expression.gen(ctx)
fail(
s"""
|Mismatched hashCodes for values: $actual, $expectedRow
@@ -136,9 +142,10 @@ trait ExpressionEvalHelper {
|Code: $evaluated
""".stripMargin)
}
+
if (actual != expectedRow) {
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
- fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input")
+ fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input")
}
if (actual.copy() != expectedRow) {
fail(s"Copy of generated Row is wrong: actual: ${actual.copy()}, expected: $expectedRow")
@@ -149,20 +156,10 @@ trait ExpressionEvalHelper {
expression: Expression,
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
- val ctx = GenerateUnsafeProjection.newCodeGenContext()
- lazy val evaluated = expression.gen(ctx)
- val plan = try {
- GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)
- } catch {
- case e: Throwable =>
- fail(
- s"""
- |Code generation of $expression failed:
- |${evaluated.code}
- |$e
- """.stripMargin)
- }
+ val plan = generateProject(
+ GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+ expression)
val unsafeRow = plan(inputRow)
// UnsafeRow cannot be compared with GenericInternalRow directly
@@ -170,7 +167,7 @@ trait ExpressionEvalHelper {
val expectedRow = InternalRow(expected)
if (actual != expectedRow) {
val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
- fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expected$input")
+ fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: $expectedRow$input")
}
}
@@ -184,12 +181,23 @@ trait ExpressionEvalHelper {
}
protected def checkDoubleEvaluation(
- expression: Expression,
+ expression: => Expression,
expected: Spread[Double],
inputRow: InternalRow = EmptyRow): Unit = {
- val actual = try evaluate(expression, inputRow) catch {
- case e: Exception => fail(s"Exception evaluating $expression", e)
- }
- actual.asInstanceOf[Double] shouldBe expected
+ checkEvaluationWithoutCodegen(expression, expected)
+ checkEvaluationWithGeneratedMutableProjection(expression, expected)
+ checkEvaluationWithOptimization(expression, expected)
+
+ var plan = generateProject(
+ GenerateProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+ expression)
+ var actual = plan(inputRow).get(0)
+ assert(checkResult(actual, expected))
+
+ plan = generateProject(
+ GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+ expression)
+ actual = FromUnsafeProjection(expression.dataType :: Nil)(plan(inputRow)).get(0)
+ assert(checkResult(actual, expected))
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
index df988f5..04acd5b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MathFunctionsSuite.scala
@@ -143,7 +143,6 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
case e: Exception => fail(s"Exception evaluating $expression", e)
}
if (!actual.asInstanceOf[Double].isNaN) {
- val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
fail(s"Incorrect evaluation (codegen off): $expression, " +
s"actual: $actual, " +
s"expected: NaN")
@@ -155,23 +154,12 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
expression: Expression,
inputRow: InternalRow = EmptyRow): Unit = {
- val plan = try {
- GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)()
- } catch {
- case e: Throwable =>
- val ctx = GenerateProjection.newCodeGenContext()
- val evaluated = expression.gen(ctx)
- fail(
- s"""
- |Code generation of $expression failed:
- |${evaluated.code}
- |$e
- """.stripMargin)
- }
+ val plan = generateProject(
+ GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)(),
+ expression)
val actual = plan(inputRow).apply(0)
if (!actual.asInstanceOf[Double].isNaN) {
- val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
fail(s"Incorrect Evaluation: $expression, actual: $actual, expected: NaN")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
index 9be2b23..698c81b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/RandomSuite.scala
@@ -21,13 +21,13 @@ import org.scalatest.Matchers._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.types.{DoubleType, IntegerType}
+import org.apache.spark.sql.types.DoubleType
class RandomSuite extends SparkFunSuite with ExpressionEvalHelper {
test("random") {
- val row = create_row(1.1, 2.0, 3.1, null)
- checkDoubleEvaluation(Rand(30), (0.7363714192755834 +- 0.001), row)
+ checkDoubleEvaluation(Rand(30), 0.7363714192755834 +- 0.001)
+ checkDoubleEvaluation(Randn(30), 0.5181478766595276 +- 0.001)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 8f15479..6bd5804 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -450,7 +450,7 @@ class ColumnExpressionSuite extends QueryTest {
test("monotonicallyIncreasingId") {
// Make sure we have 2 partitions, each with 2 records.
- val df = ctx.sparkContext.parallelize(1 to 2, 2).mapPartitions { iter =>
+ val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
Iterator(Tuple1(1), Tuple1(2))
}.toDF("a")
checkAnswer(
@@ -460,10 +460,13 @@ class ColumnExpressionSuite extends QueryTest {
}
test("sparkPartitionId") {
- val df = ctx.sparkContext.parallelize(1 to 1, 1).map(i => (i, i)).toDF("a", "b")
+ // Make sure we have 2 partitions, each with 2 records.
+ val df = ctx.sparkContext.parallelize(Seq[Int](), 2).mapPartitions { _ =>
+ Iterator(Tuple1(1), Tuple1(2))
+ }.toDF("a")
checkAnswer(
df.select(sparkPartitionId()),
- Row(0)
+ Row(0) :: Row(0) :: Row(1) :: Row(1) :: Nil
)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/86c50bf7/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
new file mode 100644
index 0000000..99e11fd
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/expression/NondeterministicSuite.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.expression
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions. ExpressionEvalHelper
+import org.apache.spark.sql.execution.expressions.{SparkPartitionID, MonotonicallyIncreasingID}
+
+class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper {
+ test("MonotonicallyIncreasingID") {
+ checkEvaluation(MonotonicallyIncreasingID(), 0)
+ }
+
+ test("SparkPartitionID") {
+ checkEvaluation(SparkPartitionID, 0)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org