You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/19 11:55:02 UTC

spark git commit: [SPARK-25358][SQL] MutableProjection supports fallback to an interpreted mode

Repository: spark
Updated Branches:
  refs/heads/master 5534a3a58 -> 12b1e91e6


[SPARK-25358][SQL] MutableProjection supports fallback to an interpreted mode

## What changes were proposed in this pull request?
In SPARK-23711, `UnsafeProjection` supports fallback to an interpreted mode. Therefore, this pr fixed code to support the same fallback mode in `MutableProjection` based on `CodeGeneratorWithInterpretedFallback`.

## How was this patch tested?
Added tests in `CodeGeneratorWithInterpretedFallbackSuite`.

Closes #22355 from maropu/SPARK-25358.

Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12b1e91e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12b1e91e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12b1e91e

Branch: refs/heads/master
Commit: 12b1e91e6b5135f6ed3e59a49abfc2e5a855263a
Parents: 5534a3a
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed Sep 19 19:54:49 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Sep 19 19:54:49 2018 +0800

----------------------------------------------------------------------
 .../InterpretedMutableProjection.scala          | 89 ++++++++++++++++++++
 .../sql/catalyst/expressions/Projection.scala   | 75 ++++++++---------
 .../codegen/GenerateMutableProjection.scala     |  4 +
 .../sql/catalyst/expressions/package.scala      | 18 +---
 ...eGeneratorWithInterpretedFallbackSuite.scala | 38 ++++++++-
 .../CollectionExpressionsSuite.scala            |  8 +-
 .../expressions/ExpressionEvalHelper.scala      | 34 +++++---
 .../expressions/MiscExpressionsSuite.scala      | 10 +--
 .../expressions/ObjectExpressionsSuite.scala    |  8 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |  2 +-
 .../spark/sql/execution/aggregate/udaf.scala    |  2 +-
 11 files changed, 201 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
new file mode 100644
index 0000000..0654108
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+
+
+/**
+ * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified
+ * expressions.
+ *
+ * @param expressions a sequence of expressions that determine the value of each column of the
+ *                    output row.
+ */
+class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
+  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
+    this(toBoundExprs(expressions, inputSchema))
+
+  private[this] val buffer = new Array[Any](expressions.size)
+
+  override def initialize(partitionIndex: Int): Unit = {
+    expressions.foreach(_.foreach {
+      case n: Nondeterministic => n.initialize(partitionIndex)
+      case _ =>
+    })
+  }
+
+  private[this] val validExprs = expressions.zipWithIndex.filter {
+    case (NoOp, _) => false
+    case _ => true
+  }
+  private[this] var mutableRow: InternalRow = new GenericInternalRow(expressions.size)
+  def currentValue: InternalRow = mutableRow
+
+  override def target(row: InternalRow): MutableProjection = {
+    mutableRow = row
+    this
+  }
+
+  override def apply(input: InternalRow): InternalRow = {
+    var i = 0
+    while (i < validExprs.length) {
+      val (expr, ordinal) = validExprs(i)
+      // Store the result into buffer first, to make the projection atomic (needed by aggregation)
+      buffer(ordinal) = expr.eval(input)
+      i += 1
+    }
+    i = 0
+    while (i < validExprs.length) {
+      val (_, ordinal) = validExprs(i)
+      mutableRow(ordinal) = buffer(ordinal)
+      i += 1
+    }
+    mutableRow
+  }
+}
+
+/**
+ * Helper functions for creating an [[InterpretedMutableProjection]].
+ */
+object InterpretedMutableProjection {
+
+  /**
+   * Returns a [[MutableProjection]] for given sequence of bound Expressions.
+   */
+  def createProjection(exprs: Seq[Expression]): MutableProjection = {
+    // We need to make sure that we do not reuse stateful expressions.
+    val cleanedExpressions = exprs.map(_.transform {
+      case s: Stateful => s.freshCopy()
+    })
+    new InterpretedMutableProjection(cleanedExpressions)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 5f24170..792646c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateSafeProjection, GenerateUnsafeProjection}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -56,47 +56,50 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
 }
 
 /**
- * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified
- * expressions.
+ * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
+ * column of the new row. If the schema of the input row is specified, then the given expression
+ * will be bound to that schema.
  *
- * @param expressions a sequence of expressions that determine the value of each column of the
- *                    output row.
+ * In contrast to a normal projection, a MutableProjection reuses the same underlying row object
+ * each time an input row is added.  This significantly reduces the cost of calculating the
+ * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
+ * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
+ * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
  */
-case class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
-  def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
-    this(expressions.map(BindReferences.bindReference(_, inputSchema)))
+abstract class MutableProjection extends Projection {
+  def currentValue: InternalRow
 
-  private[this] val buffer = new Array[Any](expressions.size)
+  /** Uses the given row to store the output of the projection. */
+  def target(row: InternalRow): MutableProjection
+}
 
-  override def initialize(partitionIndex: Int): Unit = {
-    expressions.foreach(_.foreach {
-      case n: Nondeterministic => n.initialize(partitionIndex)
-      case _ =>
-    })
+/**
+ * The factory object for `MutableProjection`.
+ */
+object MutableProjection
+    extends CodeGeneratorWithInterpretedFallback[Seq[Expression], MutableProjection] {
+
+  override protected def createCodeGeneratedObject(in: Seq[Expression]): MutableProjection = {
+    GenerateMutableProjection.generate(in, SQLConf.get.subexpressionEliminationEnabled)
   }
 
-  private[this] val exprArray = expressions.toArray
-  private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
-  def currentValue: InternalRow = mutableRow
+  override protected def createInterpretedObject(in: Seq[Expression]): MutableProjection = {
+    InterpretedMutableProjection.createProjection(in)
+  }
 
-  override def target(row: InternalRow): MutableProjection = {
-    mutableRow = row
-    this
+  /**
+   * Returns an MutableProjection for given sequence of bound Expressions.
+   */
+  def create(exprs: Seq[Expression]): MutableProjection = {
+    createObject(exprs)
   }
 
-  override def apply(input: InternalRow): InternalRow = {
-    var i = 0
-    while (i < exprArray.length) {
-      // Store the result into buffer first, to make the projection atomic (needed by aggregation)
-      buffer(i) = exprArray(i).eval(input)
-      i += 1
-    }
-    i = 0
-    while (i < exprArray.length) {
-      mutableRow(i) = buffer(i)
-      i += 1
-    }
-    mutableRow
+  /**
+   * Returns an MutableProjection for given sequence of Expressions, which will be bound to
+   * `inputSchema`.
+   */
+  def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): MutableProjection = {
+    create(toBoundExprs(exprs, inputSchema))
   }
 }
 
@@ -123,12 +126,6 @@ object UnsafeProjection
     InterpretedUnsafeProjection.createProjection(in)
   }
 
-  protected def toBoundExprs(
-      exprs: Seq[Expression],
-      inputSchema: Seq[Attribute]): Seq[Expression] = {
-    exprs.map(BindReferences.bindReference(_, inputSchema))
-  }
-
   protected def toUnsafeExprs(exprs: Seq[Expression]): Seq[Expression] = {
     exprs.map(_ transform {
       case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 33d1432..d588e7f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -44,6 +44,10 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
     create(canonicalize(bind(expressions, inputSchema)), useSubexprElimination)
   }
 
+  def generate(expressions: Seq[Expression], useSubexprElimination: Boolean): MutableProjection = {
+    create(canonicalize(expressions), useSubexprElimination)
+  }
+
   protected def create(expressions: Seq[Expression]): MutableProjection = {
     create(expressions, false)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 11dcc3e..0083ee6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -86,24 +86,12 @@ package object expressions  {
   }
 
   /**
-   * Converts a [[InternalRow]] to another Row given a sequence of expression that define each
-   * column of the new row. If the schema of the input row is specified, then the given expression
-   * will be bound to that schema.
-   *
-   * In contrast to a normal projection, a MutableProjection reuses the same underlying row object
-   * each time an input row is added.  This significantly reduces the cost of calculating the
-   * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
-   * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
-   * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
+   * A helper function to bind given expressions to an input schema.
    */
-  abstract class MutableProjection extends Projection {
-    def currentValue: InternalRow
-
-    /** Uses the given row to store the output of the projection. */
-    def target(row: InternalRow): MutableProjection
+  def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = {
+    exprs.map(BindReferences.bindReference(_, inputSchema))
   }
 
-
   /**
    * Helper functions for working with `Seq[Attribute]`.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
index 28edd85..6ea3b05 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
@@ -20,13 +20,18 @@ package org.apache.spark.sql.catalyst.expressions
 import java.util.concurrent.ExecutionException
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
 import org.apache.spark.sql.catalyst.plans.PlanTestBase
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{IntegerType, StructType}
 
 class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanTestBase {
 
+  val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
+  val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString
+
   object FailedCodegenProjection
       extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {
 
@@ -44,19 +49,30 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
 
   test("UnsafeProjection with codegen factory mode") {
     val input = Seq(BoundReference(0, IntegerType, nullable = true))
-    val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
     withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
       val obj = UnsafeProjection.createObject(input)
       assert(obj.getClass.getName.contains("GeneratedClass$SpecificUnsafeProjection"))
     }
 
-    val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString
     withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
       val obj = UnsafeProjection.createObject(input)
       assert(obj.isInstanceOf[InterpretedUnsafeProjection])
     }
   }
 
+  test("MutableProjection with codegen factory mode") {
+    val input = Seq(BoundReference(0, IntegerType, nullable = true))
+    withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
+      val obj = MutableProjection.createObject(input)
+      assert(obj.getClass.getName.contains("GeneratedClass$SpecificMutableProjection"))
+    }
+
+    withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
+      val obj = MutableProjection.createObject(input)
+      assert(obj.isInstanceOf[InterpretedMutableProjection])
+    }
+  }
+
   test("fallback to the interpreter mode") {
     val input = Seq(BoundReference(0, IntegerType, nullable = true))
     val fallback = CodegenObjectFactoryMode.FALLBACK.toString
@@ -69,11 +85,25 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
   test("codegen failures in the CODEGEN_ONLY mode") {
     val errMsg = intercept[ExecutionException] {
       val input = Seq(BoundReference(0, IntegerType, nullable = true))
-      val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
       withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
         FailedCodegenProjection.createObject(input)
       }
     }.getMessage
     assert(errMsg.contains("failed to compile: org.codehaus.commons.compiler.CompileException:"))
   }
+
+  test("SPARK-25358 Correctly handles NoOp in MutableProjection") {
+    val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp)
+    val input = InternalRow.fromSeq(1 :: 1 :: Nil)
+    val expected = 2 :: null :: Nil
+    withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
+      val proj = MutableProjection.createObject(exprs)
+      assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected)
+    }
+
+    withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
+      val proj = MutableProjection.createObject(exprs)
+      assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index c7db4ec..2e0adbb 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -1510,16 +1510,16 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
     val seed1 = Some(r.nextLong())
     assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) ===
       evaluateWithoutCodegen(Shuffle(ai0, seed1)))
-    assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) ===
-      evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)))
+    assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) ===
+      evaluateWithMutableProjection(Shuffle(ai0, seed1)))
     assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) ===
       evaluateWithUnsafeProjection(Shuffle(ai0, seed1)))
 
     val seed2 = Some(r.nextLong())
     assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) !==
       evaluateWithoutCodegen(Shuffle(ai0, seed2)))
-    assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) !==
-      evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed2)))
+    assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) !==
+      evaluateWithMutableProjection(Shuffle(ai0, seed2)))
     assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
       evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index 6684e5c..b5986aa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -60,7 +60,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
     def expr = prepareEvaluation(expression)
     val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
     checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
-    checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
+    checkEvaluationWithMutableProjection(expr, catalystValue, inputRow)
     if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
       checkEvaluationWithUnsafeProjection(expr, catalystValue, inputRow)
     }
@@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
     // Make it as method to obtain fresh expression everytime.
     def expr = prepareEvaluation(expression)
     checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode")
-    checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode")
+    checkException(evaluateWithMutableProjection(expr, inputRow), "codegen mode")
     if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
       checkException(evaluateWithUnsafeProjection(expr, inputRow), "unsafe mode")
     }
@@ -183,22 +183,28 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
     }
   }
 
-  protected def checkEvaluationWithGeneratedMutableProjection(
-      expression: Expression,
+  protected def checkEvaluationWithMutableProjection(
+      expression: => Expression,
       expected: Any,
       inputRow: InternalRow = EmptyRow): Unit = {
-    val actual = evaluateWithGeneratedMutableProjection(expression, inputRow)
-    if (!checkResult(actual, expected, expression.dataType)) {
-      val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
-      fail(s"Incorrect evaluation: $expression, actual: $actual, expected: $expected$input")
+    val modes = Seq(CodegenObjectFactoryMode.CODEGEN_ONLY, CodegenObjectFactoryMode.NO_CODEGEN)
+    for (fallbackMode <- modes) {
+      withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> fallbackMode.toString) {
+        val actual = evaluateWithMutableProjection(expression, inputRow)
+        if (!checkResult(actual, expected, expression.dataType)) {
+          val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"
+          fail(s"Incorrect evaluation (fallback mode = $fallbackMode): $expression, " +
+            s"actual: $actual, expected: $expected$input")
+        }
+      }
     }
   }
 
-  protected def evaluateWithGeneratedMutableProjection(
-      expression: Expression,
+  protected def evaluateWithMutableProjection(
+      expression: => Expression,
       inputRow: InternalRow = EmptyRow): Any = {
     val plan = generateProject(
-      GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
+      MutableProjection.create(Alias(expression, s"Optimized($expression)")() :: Nil),
       expression)
     plan.initialize(0)
 
@@ -218,7 +224,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
         if (expected == null) {
           if (!unsafeRow.isNullAt(0)) {
             val expectedRow = InternalRow(expected, expected)
-            fail("Incorrect evaluation in unsafe mode: " +
+            fail(s"Incorrect evaluation in unsafe mode (fallback mode = $fallbackMode): " +
               s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
           }
         } else {
@@ -226,7 +232,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
           val expectedRow =
             UnsafeProjection.create(Array(expression.dataType, expression.dataType)).apply(lit)
           if (unsafeRow != expectedRow) {
-            fail("Incorrect evaluation in unsafe mode: " +
+            fail(s"Incorrect evaluation in unsafe mode (fallback mode = $fallbackMode): " +
               s"$expression, actual: $unsafeRow, expected: $expectedRow$input")
           }
         }
@@ -266,7 +272,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa
       expected: Spread[Double],
       inputRow: InternalRow = EmptyRow): Unit = {
     checkEvaluationWithoutCodegen(expression, expected)
-    checkEvaluationWithGeneratedMutableProjection(expression, expected)
+    checkEvaluationWithMutableProjection(expression, expected)
     checkEvaluationWithOptimization(expression, expected)
 
     var plan = generateProject(

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
index b6c2693..4b2d153 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
@@ -48,15 +48,15 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val r = new Random()
     val seed1 = Some(r.nextLong())
     assert(evaluateWithoutCodegen(Uuid(seed1)) === evaluateWithoutCodegen(Uuid(seed1)))
-    assert(evaluateWithGeneratedMutableProjection(Uuid(seed1)) ===
-      evaluateWithGeneratedMutableProjection(Uuid(seed1)))
+    assert(evaluateWithMutableProjection(Uuid(seed1)) ===
+      evaluateWithMutableProjection(Uuid(seed1)))
     assert(evaluateWithUnsafeProjection(Uuid(seed1)) ===
       evaluateWithUnsafeProjection(Uuid(seed1)))
 
     val seed2 = Some(r.nextLong())
     assert(evaluateWithoutCodegen(Uuid(seed1)) !== evaluateWithoutCodegen(Uuid(seed2)))
-    assert(evaluateWithGeneratedMutableProjection(Uuid(seed1)) !==
-      evaluateWithGeneratedMutableProjection(Uuid(seed2)))
+    assert(evaluateWithMutableProjection(Uuid(seed1)) !==
+      evaluateWithMutableProjection(Uuid(seed2)))
     assert(evaluateWithUnsafeProjection(Uuid(seed1)) !==
       evaluateWithUnsafeProjection(Uuid(seed2)))
 
@@ -79,7 +79,7 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       val outputEval = errorStream.toString
       errorStream.reset()
       // check with codegen
-      checkEvaluationWithGeneratedMutableProjection(PrintToStderr(inputExpr), 1)
+      checkEvaluationWithMutableProjection(PrintToStderr(inputExpr), 1)
       val outputCodegen = errorStream.toString
       (outputEval, outputCodegen)
     } finally {

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
index b0af9e0..d145fd0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
@@ -72,7 +72,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val cls = classOf[Tuple2[Boolean, java.lang.Integer]]
     val inputObject = BoundReference(0, ObjectType(cls), nullable = true)
     val invoke = Invoke(inputObject, "_2", IntegerType)
-    checkEvaluationWithGeneratedMutableProjection(invoke, null, inputRow)
+    checkEvaluationWithMutableProjection(invoke, null, inputRow)
   }
 
   test("MapObjects should make copies of unsafe-backed data") {
@@ -233,13 +233,13 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
       Literal.fromObject(new TestBean),
       Map("setNonPrimitive" -> Literal(null)))
     evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq()))
-    evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
+    evaluateWithMutableProjection(initializeBean, InternalRow.fromSeq(Seq()))
 
     val initializeBean2 = InitializeJavaBean(
       Literal.fromObject(new TestBean),
       Map("setNonPrimitive" -> Literal("string")))
     evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq()))
-    evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
+    evaluateWithMutableProjection(initializeBean2, InternalRow.fromSeq(Seq()))
   }
 
   test("SPARK-23585: UnwrapOption should support interpreted execution") {
@@ -273,7 +273,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
     val resolver = ResolveTimeZone(new SQLConf)
     val expr = resolver.resolveTimeZones(serializer.deserialize(serializer.serialize(expression)))
     checkEvaluationWithoutCodegen(expr, expected, inputRow)
-    checkEvaluationWithGeneratedMutableProjection(expr, expected, inputRow)
+    checkEvaluationWithMutableProjection(expr, expected, inputRow)
     if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
       checkEvaluationWithUnsafeProjection(
         expr,

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 1f97993..ab6031c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -380,7 +380,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
       inputSchema: Seq[Attribute],
       useSubexprElimination: Boolean = false): MutableProjection = {
     log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
-    GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
+    MutableProjection.create(expressions, inputSchema)
   }
 
   private def genInterpretedPredicate(

http://git-wip-us.apache.org/repos/asf/spark/blob/12b1e91e/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 72aa4ad..100486f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -365,7 +365,7 @@ case class ScalaUDAF(
     val inputAttributes = childrenSchema.toAttributes
     log.debug(
       s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
-    GenerateMutableProjection.generate(children, inputAttributes)
+    MutableProjection.create(children, inputAttributes)
   }
 
   private[this] lazy val inputToScalaConverters: Any => Any =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org