You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ue...@apache.org on 2018/07/27 14:03:03 UTC

spark git commit: [SPARK-23928][SQL] Add shuffle collection function.

Repository: spark
Updated Branches:
  refs/heads/master 21fcac164 -> ef6c8395c


[SPARK-23928][SQL] Add shuffle collection function.

## What changes were proposed in this pull request?

This PR adds a new collection function: shuffle. It generates a random permutation of the given array. This implementation uses the "inside-out" version of Fisher-Yates algorithm.

## How was this patch tested?

New tests are added to CollectionExpressionsSuite.scala and DataFrameFunctionsSuite.scala.

Author: Takuya UESHIN <ue...@databricks.com>
Author: pkuwm <ih...@gmail.com>

Closes #21802 from ueshin/issues/SPARK-23928/shuffle.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef6c8395
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef6c8395
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef6c8395

Branch: refs/heads/master
Commit: ef6c8395c483954857547cbd75f84a6814317d13
Parents: 21fcac1
Author: pkuwm <ih...@gmail.com>
Authored: Fri Jul 27 23:02:48 2018 +0900
Committer: Takuya UESHIN <ue...@databricks.com>
Committed: Fri Jul 27 23:02:48 2018 +0900

----------------------------------------------------------------------
 python/pyspark/sql/functions.py                 |  17 +++
 .../spark/sql/catalyst/analysis/Analyzer.scala  |   7 +-
 .../catalyst/analysis/FunctionRegistry.scala    |   1 +
 .../expressions/collectionOperations.scala      | 106 +++++++++++++++++++
 .../catalyst/util/RandomIndicesGenerator.scala  |  45 ++++++++
 .../CollectionExpressionsSuite.scala            |  69 ++++++++++++
 .../scala/org/apache/spark/sql/functions.scala  |  10 ++
 .../spark/sql/DataFrameFunctionsSuite.scala     |  65 ++++++++++++
 8 files changed, 317 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f2e6633..0a88e48 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2382,6 +2382,23 @@ def array_sort(col):
     return Column(sc._jvm.functions.array_sort(_to_java_column(col)))
 
 
+@since(2.4)
+def shuffle(col):
+    """
+    Collection function: Generates a random permutation of the given array.
+
+    .. note:: The function is non-deterministic.
+
+    :param col: name of column or expression
+
+    >>> df = spark.createDataFrame([([1, 20, 3, 5],), ([1, 20, None, 3],)], ['data'])
+    >>> df.select(shuffle(df.data).alias('s')).collect()  # doctest: +SKIP
+    [Row(s=[3, 1, 5, 20]), Row(s=[20, None, 3, 1])]
+    """
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.functions.shuffle(_to_java_column(col)))
+
+
 @since(1.5)
 @ignore_unicode_prefix
 def reverse(col):

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 8e8f8e3..d18509f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -181,7 +181,7 @@ class Analyzer(
       TimeWindowing ::
       ResolveInlineTables(conf) ::
       ResolveTimeZone(conf) ::
-      ResolvedUuidExpressions ::
+      ResolveRandomSeed ::
       TypeCoercion.typeCoercionRules(conf) ++
       extendedResolutionRules : _*),
     Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
@@ -2100,15 +2100,16 @@ class Analyzer(
   }
 
   /**
-   * Set the seed for random number generation in Uuid expressions.
+   * Set the seed for random number generation.
    */
-  object ResolvedUuidExpressions extends Rule[LogicalPlan] {
+  object ResolveRandomSeed extends Rule[LogicalPlan] {
     private lazy val random = new Random()
 
     override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
       case p if p.resolved => p
       case p => p transformExpressionsUp {
         case Uuid(None) => Uuid(Some(random.nextLong()))
+        case Shuffle(child, None) => Shuffle(child, Some(random.nextLong()))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index d696ce9..adc4837 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -429,6 +429,7 @@ object FunctionRegistry {
     expression[Size]("cardinality"),
     expression[ArraysZip]("arrays_zip"),
     expression[SortArray]("sort_array"),
+    expression[Shuffle]("shuffle"),
     expression[ArrayMin]("array_min"),
     expression[ArrayMax]("array_max"),
     expression[Reverse]("reverse"),

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index b3d04bf..b1d91ff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -1204,6 +1204,112 @@ case class ArraySort(child: Expression) extends UnaryExpression with ArraySortLi
 }
 
 /**
+ * Returns a random permutation of the given array.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array) - Returns a random permutation of the given array.",
+  examples = """
+    Examples:
+      > SELECT _FUNC_(array(1, 20, 3, 5));
+       [3, 1, 5, 20]
+      > SELECT _FUNC_(array(1, 20, null, 3));
+       [20, null, 3, 1]
+  """,
+  note = "The function is non-deterministic.",
+  since = "2.4.0")
+case class Shuffle(child: Expression, randomSeed: Option[Long] = None)
+  extends UnaryExpression with ExpectsInputTypes with Stateful {
+
+  def this(child: Expression) = this(child, None)
+
+  override lazy val resolved: Boolean =
+    childrenResolved && checkInputDataTypes().isSuccess && randomSeed.isDefined
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType)
+
+  override def dataType: DataType = child.dataType
+
+  @transient lazy val elementType: DataType = dataType.asInstanceOf[ArrayType].elementType
+
+  @transient private[this] var random: RandomIndicesGenerator = _
+
+  override protected def initializeInternal(partitionIndex: Int): Unit = {
+    random = RandomIndicesGenerator(randomSeed.get + partitionIndex)
+  }
+
+  override protected def evalInternal(input: InternalRow): Any = {
+    val value = child.eval(input)
+    if (value == null) {
+      null
+    } else {
+      val source = value.asInstanceOf[ArrayData]
+      val numElements = source.numElements()
+      val indices = random.getNextIndices(numElements)
+      new GenericArrayData(indices.map(source.get(_, elementType)))
+    }
+  }
+
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    nullSafeCodeGen(ctx, ev, c => shuffleArrayCodeGen(ctx, ev, c))
+  }
+
+  private def shuffleArrayCodeGen(ctx: CodegenContext, ev: ExprCode, childName: String): String = {
+    val randomClass = classOf[RandomIndicesGenerator].getName
+
+    val rand = ctx.addMutableState(randomClass, "rand", forceInline = true)
+    ctx.addPartitionInitializationStatement(
+      s"$rand = new $randomClass(${randomSeed.get}L + partitionIndex);")
+
+    val isPrimitiveType = CodeGenerator.isPrimitiveType(elementType)
+
+    val numElements = ctx.freshName("numElements")
+    val arrayData = ctx.freshName("arrayData")
+
+    val initialization = if (isPrimitiveType) {
+      ctx.createUnsafeArray(arrayData, numElements, elementType, s" $prettyName failed.")
+    } else {
+      val arrayDataClass = classOf[GenericArrayData].getName()
+      s"$arrayDataClass $arrayData = new $arrayDataClass(new Object[$numElements]);"
+    }
+
+    val indices = ctx.freshName("indices")
+    val i = ctx.freshName("i")
+
+    val getValue = CodeGenerator.getValue(childName, elementType, s"$indices[$i]")
+
+    val setFunc = if (isPrimitiveType) {
+      s"set${CodeGenerator.primitiveTypeName(elementType)}"
+    } else {
+      "update"
+    }
+
+    val assignment = if (isPrimitiveType && dataType.asInstanceOf[ArrayType].containsNull) {
+      s"""
+         |if ($childName.isNullAt($indices[$i])) {
+         |  $arrayData.setNullAt($i);
+         |} else {
+         |  $arrayData.$setFunc($i, $getValue);
+         |}
+       """.stripMargin
+    } else {
+      s"$arrayData.$setFunc($i, $getValue);"
+    }
+
+    s"""
+       |int $numElements = $childName.numElements();
+       |int[] $indices = $rand.getNextIndices($numElements);
+       |$initialization
+       |for (int $i = 0; $i < $numElements; $i++) {
+       |  $assignment
+       |}
+       |${ev.value} = $arrayData;
+     """.stripMargin
+  }
+
+  override def freshCopy(): Shuffle = Shuffle(child, randomSeed)
+}
+
+/**
  * Returns a reversed string or an array with reverse order of elements.
  */
 @ExpressionDescription(

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala
new file mode 100644
index 0000000..ae05128
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RandomIndicesGenerator.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.commons.math3.random.MersenneTwister
+
+/**
+ * This class is used to generate a random indices of given length.
+ *
+ * This implementation uses the "inside-out" version of Fisher-Yates algorithm.
+ * Reference:
+ *   https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_%22inside-out%22_algorithm
+ */
+case class RandomIndicesGenerator(randomSeed: Long) {
+  private val random = new MersenneTwister(randomSeed)
+
+  def getNextIndices(length: Int): Array[Int] = {
+    val indices = new Array[Int](length)
+    var i = 0
+    while (i < length) {
+      val j = random.nextInt(i + 1)
+      if (j != i) {
+        indices(i) = indices(j)
+      }
+      indices(j) = i
+      i += 1
+    }
+    indices
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
index c7f0da7..5c57285 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
 import java.sql.{Date, Timestamp}
 import java.util.TimeZone
 
+import scala.util.Random
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
@@ -1434,4 +1436,71 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
     assert(ArrayUnion(a20, a21).dataType.asInstanceOf[ArrayType].containsNull === false)
     assert(ArrayUnion(a20, a22).dataType.asInstanceOf[ArrayType].containsNull === true)
   }
+
+  test("Shuffle") {
+    // Primitive-type elements
+    val ai0 = Literal.create(Seq(1, 2, 3, 4, 5), ArrayType(IntegerType, containsNull = false))
+    val ai1 = Literal.create(Seq(1, 2, 3), ArrayType(IntegerType, containsNull = false))
+    val ai2 = Literal.create(Seq(null, 1, null, 3), ArrayType(IntegerType, containsNull = true))
+    val ai3 = Literal.create(Seq(2, null, 4, null), ArrayType(IntegerType, containsNull = true))
+    val ai4 = Literal.create(Seq(null, null, null), ArrayType(IntegerType, containsNull = true))
+    val ai5 = Literal.create(Seq(1), ArrayType(IntegerType, containsNull = false))
+    val ai6 = Literal.create(Seq.empty, ArrayType(IntegerType, containsNull = false))
+    val ai7 = Literal.create(null, ArrayType(IntegerType, containsNull = true))
+
+    checkEvaluation(Shuffle(ai0, Some(0)), Seq(4, 1, 2, 3, 5))
+    checkEvaluation(Shuffle(ai1, Some(0)), Seq(3, 1, 2))
+    checkEvaluation(Shuffle(ai2, Some(0)), Seq(3, null, 1, null))
+    checkEvaluation(Shuffle(ai3, Some(0)), Seq(null, 2, null, 4))
+    checkEvaluation(Shuffle(ai4, Some(0)), Seq(null, null, null))
+    checkEvaluation(Shuffle(ai5, Some(0)), Seq(1))
+    checkEvaluation(Shuffle(ai6, Some(0)), Seq.empty)
+    checkEvaluation(Shuffle(ai7, Some(0)), null)
+
+    // Non-primitive-type elements
+    val as0 = Literal.create(Seq("a", "b", "c", "d"), ArrayType(StringType, containsNull = false))
+    val as1 = Literal.create(Seq("a", "b", "c"), ArrayType(StringType, containsNull = false))
+    val as2 = Literal.create(Seq(null, "a", null, "c"), ArrayType(StringType, containsNull = true))
+    val as3 = Literal.create(Seq("b", null, "d", null), ArrayType(StringType, containsNull = true))
+    val as4 = Literal.create(Seq(null, null, null), ArrayType(StringType, containsNull = true))
+    val as5 = Literal.create(Seq("a"), ArrayType(StringType, containsNull = false))
+    val as6 = Literal.create(Seq.empty, ArrayType(StringType, containsNull = false))
+    val as7 = Literal.create(null, ArrayType(StringType, containsNull = true))
+    val aa = Literal.create(
+      Seq(Seq("a", "b"), Seq("c", "d"), Seq("e")),
+      ArrayType(ArrayType(StringType)))
+
+    checkEvaluation(Shuffle(as0, Some(0)), Seq("d", "a", "b", "c"))
+    checkEvaluation(Shuffle(as1, Some(0)), Seq("c", "a", "b"))
+    checkEvaluation(Shuffle(as2, Some(0)), Seq("c", null, "a", null))
+    checkEvaluation(Shuffle(as3, Some(0)), Seq(null, "b", null, "d"))
+    checkEvaluation(Shuffle(as4, Some(0)), Seq(null, null, null))
+    checkEvaluation(Shuffle(as5, Some(0)), Seq("a"))
+    checkEvaluation(Shuffle(as6, Some(0)), Seq.empty)
+    checkEvaluation(Shuffle(as7, Some(0)), null)
+    checkEvaluation(Shuffle(aa, Some(0)), Seq(Seq("e"), Seq("a", "b"), Seq("c", "d")))
+
+    val r = new Random()
+    val seed1 = Some(r.nextLong())
+    assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) ===
+      evaluateWithoutCodegen(Shuffle(ai0, seed1)))
+    assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) ===
+      evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)))
+    assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) ===
+      evaluateWithUnsafeProjection(Shuffle(ai0, seed1)))
+
+    val seed2 = Some(r.nextLong())
+    assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) !==
+      evaluateWithoutCodegen(Shuffle(ai0, seed2)))
+    assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) !==
+      evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed2)))
+    assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
+      evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))
+
+    val shuffle = Shuffle(ai0, seed1)
+    assert(shuffle.fastEquals(shuffle))
+    assert(!shuffle.fastEquals(Shuffle(ai0, seed1)))
+    assert(!shuffle.fastEquals(shuffle.freshCopy()))
+    assert(!shuffle.fastEquals(Shuffle(ai0, seed2)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index de1d422..bcd0c94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3546,6 +3546,16 @@ object functions {
   def array_max(e: Column): Column = withExpr { ArrayMax(e.expr) }
 
   /**
+   * Returns a random permutation of the given array.
+   *
+   * @note The function is non-deterministic.
+   *
+   * @group collection_funcs
+   * @since 2.4.0
+   */
+  def shuffle(e: Column): Column = withExpr { Shuffle(e.expr) }
+
+  /**
    * Returns a reversed string or an array with reverse order of elements.
    * @group collection_funcs
    * @since 1.5.0

http://git-wip-us.apache.org/repos/asf/spark/blob/ef6c8395/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 5a7bd45..299c96f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -1513,6 +1513,71 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
     )
   }
 
+  // Shuffle expressions should produce same results at retries in the same DataFrame.
+  private def checkShuffleResult(df: DataFrame): Unit = {
+    checkAnswer(df, df.collect())
+  }
+
+  test("shuffle function - array for primitive type not containing null") {
+    val idfNotContainsNull = Seq(
+      Seq(1, 9, 8, 7),
+      Seq(5, 8, 9, 7, 2),
+      Seq.empty,
+      null
+    ).toDF("i")
+
+    def testArrayOfPrimitiveTypeNotContainsNull(): Unit = {
+      checkShuffleResult(idfNotContainsNull.select(shuffle('i)))
+      checkShuffleResult(idfNotContainsNull.selectExpr("shuffle(i)"))
+    }
+
+    // Test with local relation, the Project will be evaluated without codegen
+    testArrayOfPrimitiveTypeNotContainsNull()
+    // Test with cached relation, the Project will be evaluated with codegen
+    idfNotContainsNull.cache()
+    testArrayOfPrimitiveTypeNotContainsNull()
+  }
+
+  test("shuffle function - array for primitive type containing null") {
+    val idfContainsNull = Seq[Seq[Integer]](
+      Seq(1, 9, 8, null, 7),
+      Seq(null, 5, 8, 9, 7, 2),
+      Seq.empty,
+      null
+    ).toDF("i")
+
+    def testArrayOfPrimitiveTypeContainsNull(): Unit = {
+      checkShuffleResult(idfContainsNull.select(shuffle('i)))
+      checkShuffleResult(idfContainsNull.selectExpr("shuffle(i)"))
+    }
+
+    // Test with local relation, the Project will be evaluated without codegen
+    testArrayOfPrimitiveTypeContainsNull()
+    // Test with cached relation, the Project will be evaluated with codegen
+    idfContainsNull.cache()
+    testArrayOfPrimitiveTypeContainsNull()
+  }
+
+  test("shuffle function - array for non-primitive type") {
+    val sdf = Seq(
+      Seq("c", "a", "b"),
+      Seq("b", null, "c", null),
+      Seq.empty,
+      null
+    ).toDF("s")
+
+    def testNonPrimitiveType(): Unit = {
+      checkShuffleResult(sdf.select(shuffle('s)))
+      checkShuffleResult(sdf.selectExpr("shuffle(s)"))
+    }
+
+    // Test with local relation, the Project will be evaluated without codegen
+    testNonPrimitiveType()
+    // Test with cached relation, the Project will be evaluated with codegen
+    sdf.cache()
+    testNonPrimitiveType()
+  }
+
   private def assertValuesDoNotChangeAfterCoalesceOrUnion(v: Column): Unit = {
     import DataFrameFunctionsSuite.CodegenFallbackExpr
     for ((codegenFallback, wholeStage) <- Seq((true, false), (false, false), (false, true))) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org