You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/19 11:55:02 UTC
spark git commit: [SPARK-25358][SQL] MutableProjection supports
fallback to an interpreted mode
Repository: spark
Updated Branches:
refs/heads/master 5534a3a58 -> 12b1e91e6
[SPARK-25358][SQL] MutableProjection supports fallback to an interpreted mode
## What changes were proposed in this pull request?
In SPARK-23711, `UnsafeProjection` supports fallback to an interpreted mode. Therefore, this pr fixed code to support the same fallback mode in `MutableProjection` based on `CodeGeneratorWithInterpretedFallback`.
## How was this patch tested?
Added tests in `CodeGeneratorWithInterpretedFallbackSuite`.
Closes #22355 from maropu/SPARK-25358.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12b1e91e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12b1e91e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12b1e91e
Branch: refs/heads/master
Commit: 12b1e91e6b5135f6ed3e59a49abfc2e5a855263a
Parents: 5534a3a
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed Sep 19 19:54:49 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 19 19:54:49 2018 +0800
----------------------------------------------------------------------
.../InterpretedMutableProjection.scala | 89 ++++++++++++++++++++
.../sql/catalyst/expressions/Projection.scala | 75 ++++++++---------
.../codegen/GenerateMutableProjection.scala | 4 +
.../sql/catalyst/expressions/package.scala | 18 +---
...eGeneratorWithInterpretedFallbackSuite.scala | 38 ++++++++-
.../CollectionExpressionsSuite.scala | 8 +-
.../expressions/ExpressionEvalHelper.scala | 34 +++++---
.../expressions/MiscExpressionsSuite.scala | 10 +--
.../expressions/ObjectExpressionsSuite.scala | 8 +-
.../apache/spark/sql/execution/SparkPlan.scala | 2 +-
.../spark/sql/execution/aggregate/udaf.scala | 2 +-
11 files changed, 201 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
new file mode 100644
index 0000000..0654108
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value of each column of the
+ * output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
+ def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+ this(toBoundExprs(expressions, inputSchema))
+
+ private[this] val buffer = new Array[Any](expressions.size)
+
+ override def initialize(partitionIndex: Int): Unit = {
+ expressions.foreach(_.foreach {
+ case n: Nondeterministic => n.initialize(partitionIndex)
+ case _ =>
+ })
+ }
+
+ private[this] val validExprs = expressions.zipWithIndex.filter {
+ case (NoOp, _) => false
+ case _ => true
+ }
+ private[this] var mutableRow: InternalRow = new GenericInternalRow(expressions.size)
+ def currentValue: InternalRow = mutableRow
+
+ override def target(row: InternalRow): MutableProjection = {
+ mutableRow = row
+ this
+ }
+
+ override def apply(input: InternalRow): InternalRow = {
+ var i = 0
+ while (i < validExprs.length) {
+ val (expr, ordinal) = validExprs(i)
+ // Store the result into buffer first, to make the projection atomic (needed by aggregation)
+ buffer(ordinal) = expr.eval(input)
+ i += 1
+ }
+ i = 0
+ while (i < validExprs.length) {
+ val (_, ordinal) = validExprs(i)
+ mutableRow(ordinal) = buffer(ordinal)
+ i += 1
+ }
+ mutableRow
+ }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedMutableProjection]].
+ */
+object InterpretedMutableProjection {
+
+ /**
+ * Returns a [[MutableProjection]] for given sequence of bound Expressions.
+ */
+ def createProjection(exprs: Seq[Expression]): MutableProjection = {
+ // We need to make sure that we do not reuse stateful expressions.
+ val cleanedExpressions = exprs.map(_.transform {
+ case s: Stateful => s.freshCopy()
+ })
+ new InterpretedMutableProjection(cleanedExpressions)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 5f24170..792646c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateSafeProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
@@ -56,47 +56,50 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
}
/**
- * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified
- * expressions.
+ * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
+ * column of the new row. If the schema of the input row is specified, then the given expression
+ * will be bound to that schema.
*
- * @param expressions a sequence of expressions that determine the value of each column of the
- * output row.
+ * In contrast to a normal projection, a MutableProjection reuses the same underlying row object
+ * each time an input row is added. This significantly reduces the cost of calculating the
+ * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
+ * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
+ * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
*/
-case class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
- def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
- this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+abstract class MutableProjection extends Projection {
+ def currentValue: InternalRow
- private[this] val buffer = new Array[Any](expressions.size)
+ /** Uses the given row to store the output of the projection. */
+ def target(row: InternalRow): MutableProjection
+}
- override def initialize(partitionIndex: Int): Unit = {
- expressions.foreach(_.foreach {
- case n: Nondeterministic => n.initialize(partitionIndex)
- case _ =>
- })
+/**
+ * The factory object for `MutableProjection`.
+ */
+object MutableProjection
+ extends CodeGeneratorWithInterpretedFallback[Seq[Expression], MutableProjection] {
+
+ override protected def createCodeGeneratedObject(in: Seq[Expression]): MutableProjection = {
+ GenerateMutableProjection.generate(in, SQLConf.get.subexpressionEliminationEnabled)
}
- private[this] val exprArray = expressions.toArray
- private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
- def currentValue: InternalRow = mutableRow
+ override protected def createInterpretedObject(in: Seq[Expression]): MutableProjection = {
+ InterpretedMutableProjection.createProjection(in)
+ }
- override def target(row: InternalRow): MutableProjection = {
- mutableRow = row
- this
+ /**
+ * Returns an MutableProjection for given sequence of bound Expressions.
+ */
+ def create(exprs: Seq[Expression]): MutableProjection = {
+ createObject(exprs)
}
- override def apply(input: InternalRow): InternalRow = {
- var i = 0
- while (i < exprArray.length) {
- // Store the result into buffer first, to make the projection atomic (needed by aggregation)
- buffer(i) = exprArray(i).eval(input)
- i += 1
- }
- i = 0
- while (i < exprArray.length) {
- mutableRow(i) = buffer(i)
- i += 1
- }
- mutableRow
+ /**
+ * Returns an MutableProjection for given sequence of Expressions, which will be bound to
+ * `inputSchema`.
+ */
+ def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): MutableProjection = {
+ create(toBoundExprs(exprs, inputSchema))
}
}
@@ -123,12 +126,6 @@ object UnsafeProjection
InterpretedUnsafeProjection.createProjection(in)
}
- protected def toBoundExprs(
- exprs: Seq[Expression],
- inputSchema: Seq[Attribute]): Seq[Expression] = {
- exprs.map(BindReferences.bindReference(_, inputSchema))
- }
-
protected def toUnsafeExprs(exprs: Seq[Expression]): Seq[Expression] = {
exprs.map(_ transform {
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 33d1432..d588e7f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -44,6 +44,10 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
create(canonicalize(bind(expressions, inputSchema)), useSubexprElimination)
}
+ def generate(expressions: Seq[Expression], useSubexprElimination: Boolean): MutableProjection = {
+ create(canonicalize(expressions), useSubexprElimination)
+ }
+
protected def create(expressions: Seq[Expression]): MutableProjection = {
create(expressions, false)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 11dcc3e..0083ee6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -86,24 +86,12 @@ package object expressions {
}
/**
- * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
- * column of the new row. If the schema of the input row is specified, then the given expression
- * will be bound to that schema.
- *
- * In contrast to a normal projection, a MutableProjection reuses the same underlying row object
- * each time an input row is added. This significantly reduces the cost of calculating the
- * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
- * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
- * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
+ * A helper function to bind given expressions to an input schema.
*/
- abstract class MutableProjection extends Projection {
- def currentValue: InternalRow
-
- /** Uses the given row to store the output of the projection. */
- def target(row: InternalRow): MutableProjection
+ def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = {
+ exprs.map(BindReferences.bindReference(_, inputSchema))
}
-
/**
* Helper functions for working with `Seq[Attribute]`.
*/
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
index 28edd85..6ea3b05 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
@@ -20,13 +20,18 @@ package org.apache.spark.sql.catalyst.expressions
import java.util.concurrent.ExecutionException
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{IntegerType, StructType}
class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanTestBase {
+ val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
+ val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString
+
object FailedCodegenProjection
extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {
@@ -44,19 +49,30 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
test("UnsafeProjection with codegen factory mode") {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
- val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
val obj = UnsafeProjection.createObject(input)
assert(obj.getClass.getName.contains("GeneratedClass$SpecificUnsafeProjection"))
}
- val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
val obj = UnsafeProjection.createObject(input)
assert(obj.isInstanceOf[InterpretedUnsafeProjection])
}
}
+ test("MutableProjection with codegen factory mode") {
+ val input = Seq(BoundReference(0, IntegerType, nullable = true))
+ withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
+ val obj = MutableProjection.createObject(input)
+ assert(obj.getClass.getName.contains("GeneratedClass$SpecificMutableProjection"))
+ }
+
+ withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
+ val obj = MutableProjection.createObject(input)
+ assert(obj.isInstanceOf[InterpretedMutableProjection])
+ }
+ }
+
test("fallback to the interpreter mode") {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
val fallback = CodegenObjectFactoryMode.FALLBACK.toString
@@ -69,11 +85,25 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
test("codegen failures in the CODEGEN_ONLY mode") {
val errMsg = intercept[ExecutionException] {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
- val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
FailedCodegenProjection.createObject(input)
}
}.getMessage
assert(errMsg.contains("failed to compile: org.codehaus.commons.compiler.CompileException:"))
}
+
+ test("SPARK-25358 Correctly handles NoOp in MutableProjection") {
+ val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp)
+ val input = InternalRow.fromSeq(1 :: 1 :: Nil)
+ val expected = 2 :: null :: Nil
+ withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
+ val proj = MutableProjection.createObject(exprs)
+ assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected)
+ }
+
+ withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
+ val proj = MutableProjection.createObject(exprs)
+ assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected)
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index c7db4ec..2e0adbb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -1510,16 +1510,16 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
val seed1 = Some(r.nextLong())
assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) ===
evaluateWithoutCodegen(Shuffle(ai0, seed1)))
- assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) ===
- evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)))
+ assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) ===
+ evaluateWithMutableProjection(Shuffle(ai0, seed1)))
assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) ===
evaluateWithUnsafeProjection(Shuffle(ai0, seed1)))
val seed2 = Some(r.nextLong())
assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) !==
evaluateWithoutCodegen(Shuffle(ai0, seed2)))
- assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) !==
- evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed2)))
+ assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) !==
+ evaluateWithMutableProjection(Shuffle(ai0, seed2)))
assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 6684e5c..b5986aa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -60,7 +60,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
def expr = prepareEvaluation(expression)
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
- checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
+ checkEvaluationWithMutableProjection(expr, catalystValue, inputRow)
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
checkEvaluationWithUnsafeProjection(expr, catalystValue, inputRow)
}
@@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
// Make it as method to obtain fresh expression everytime.
def expr = prepareEvaluation(expression)
checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
- checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode")
+ checkException(evaluateWithMutableProjection(expr, inputRow), "codegen mode")
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
checkException(evaluateWithUnsafeProjection(expr, inputRow), "unsafe mode")
}
@@ -183,22 +183,28 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
}
}
- protected def checkEvaluationWithGeneratedMutableProjection(
- expression: Expression,
+ protected def checkEvaluationWithMutableProjection(
+ expression: => Expression,
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
- val actual = evaluateWithGeneratedMutableProjection(expression, inputRow)
- if (!checkResult(actual, expected, expression.dataType)) {
- val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
- fail(s"Incorrect evaluation: $expression, actual: $actual, expected: $expected$input")
+ val modes = Seq(CodegenObjectFactoryMode.CODEGEN_ONLY, CodegenObjectFactoryMode.NO_CODEGEN)
+ for (fallbackMode <- modes) {
+ withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> fallbackMode.toString) {
+ val actual = evaluateWithMutableProjection(expression, inputRow)
+ if (!checkResult(actual, expected, expression.dataType)) {
+ val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
+ fail(s"Incorrect evaluation (fallback mode = $fallbackMode): $expression, " +
+ s"actual: $actual, expected: $expected$input")
+ }
+ }
}
}
- protected def evaluateWithGeneratedMutableProjection(
- expression: Expression,
+ protected def evaluateWithMutableProjection(
+ expression: => Expression,
inputRow: InternalRow = EmptyRow): Any = {
val plan = generateProject(
- GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+ MutableProjection.create(Alias(expression, s"Optimized($expression)")() :: Nil),
expression)
plan.initialize(0)
@@ -218,7 +224,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
if (expected == null) {
if (!unsafeRow.isNullAt(0)) {
val expectedRow = InternalRow(expected, expected)
- fail("Incorrect evaluation in unsafe mode: " +
+ fail(s"Incorrect evaluation in unsafe mode (fallback mode = $fallbackMode): " +
s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
}
} else {
@@ -226,7 +232,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
val expectedRow =
UnsafeProjection.create(Array(expression.dataType, expression.dataType)).apply(lit)
if (unsafeRow != expectedRow) {
- fail("Incorrect evaluation in unsafe mode: " +
+ fail(s"Incorrect evaluation in unsafe mode (fallback mode = $fallbackMode): " +
s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
}
}
@@ -266,7 +272,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
expected: Spread[Double],
inputRow: InternalRow = EmptyRow): Unit = {
checkEvaluationWithoutCodegen(expression, expected)
- checkEvaluationWithGeneratedMutableProjection(expression, expected)
+ checkEvaluationWithMutableProjection(expression, expected)
checkEvaluationWithOptimization(expression, expected)
var plan = generateProject(
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
index b6c2693..4b2d153 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
@@ -48,15 +48,15 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val r = new Random()
val seed1 = Some(r.nextLong())
assert(evaluateWithoutCodegen(Uuid(seed1)) === evaluateWithoutCodegen(Uuid(seed1)))
- assert(evaluateWithGeneratedMutableProjection(Uuid(seed1)) ===
- evaluateWithGeneratedMutableProjection(Uuid(seed1)))
+ assert(evaluateWithMutableProjection(Uuid(seed1)) ===
+ evaluateWithMutableProjection(Uuid(seed1)))
assert(evaluateWithUnsafeProjection(Uuid(seed1)) ===
evaluateWithUnsafeProjection(Uuid(seed1)))
val seed2 = Some(r.nextLong())
assert(evaluateWithoutCodegen(Uuid(seed1)) !== evaluateWithoutCodegen(Uuid(seed2)))
- assert(evaluateWithGeneratedMutableProjection(Uuid(seed1)) !==
- evaluateWithGeneratedMutableProjection(Uuid(seed2)))
+ assert(evaluateWithMutableProjection(Uuid(seed1)) !==
+ evaluateWithMutableProjection(Uuid(seed2)))
assert(evaluateWithUnsafeProjection(Uuid(seed1)) !==
evaluateWithUnsafeProjection(Uuid(seed2)))
@@ -79,7 +79,7 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val outputEval = errorStream.toString
errorStream.reset()
// check with codegen
- checkEvaluationWithGeneratedMutableProjection(PrintToStderr(inputExpr), 1)
+ checkEvaluationWithMutableProjection(PrintToStderr(inputExpr), 1)
val outputCodegen = errorStream.toString
(outputEval, outputCodegen)
} finally {
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index b0af9e0..d145fd0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -72,7 +72,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val cls = classOf[Tuple2[Boolean, java.lang.Integer]]
val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
val invoke = Invoke(inputObject, "_2", IntegerType)
- checkEvaluationWithGeneratedMutableProjection(invoke, null, inputRow)
+ checkEvaluationWithMutableProjection(invoke, null, inputRow)
}
test("MapObjects should make copies of unsafe-backed data") {
@@ -233,13 +233,13 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
Literal.fromObject(new TestBean),
Map("setNonPrimitive" -> Literal(null)))
evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
- evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
+ evaluateWithMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
val initializeBean2 = InitializeJavaBean(
Literal.fromObject(new TestBean),
Map("setNonPrimitive" -> Literal("string")))
evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
- evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
+ evaluateWithMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
}
test("SPARK-23585: UnwrapOption should support interpreted execution") {
@@ -273,7 +273,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val resolver = ResolveTimeZone(new SQLConf)
val expr = resolver.resolveTimeZones(serializer.deserialize(serializer.serialize(expression)))
checkEvaluationWithoutCodegen(expr, expected, inputRow)
- checkEvaluationWithGeneratedMutableProjection(expr, expected, inputRow)
+ checkEvaluationWithMutableProjection(expr, expected, inputRow)
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
checkEvaluationWithUnsafeProjection(
expr,
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 1f97993..ab6031c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -380,7 +380,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute],
useSubexprElimination: Boolean = false): MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
- GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
+ MutableProjection.create(expressions, inputSchema)
}
private def genInterpretedPredicate(
http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 72aa4ad..100486f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -365,7 +365,7 @@ case class ScalaUDAF(
val inputAttributes = childrenSchema.toAttributes
log.debug(
s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
- GenerateMutableProjection.generate(children, inputAttributes)
+ MutableProjection.create(children, inputAttributes)
}
private[this] lazy val inputToScalaConverters: Any => Any =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org