You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2018/03/26 18:45:26 UTC

spark git commit: [SPARK-23599][SQL][BACKPORT-2.3] Use RandomUUIDGenerator in Uuid expression

Repository: spark
Updated Branches:
  refs/heads/branch-2.3 328dea6f8 -> 1c39dfaef


[SPARK-23599][SQL][BACKPORT-2.3] Use RandomUUIDGenerator in Uuid expression

## What changes were proposed in this pull request?

As stated in Jira, there are problems with current `Uuid` expression which uses `java.util.UUID.randomUUID` for UUID generation.

This patch uses the newly added `RandomUUIDGenerator` for UUID generation. So we can make `Uuid` deterministic between retries.

This backports SPARK-23599 to Spark 2.3.

## How was this patch tested?

Added tests.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #20903 from viirya/SPARK-23599-2.3.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c39dfae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c39dfae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c39dfae

Branch: refs/heads/branch-2.3
Commit: 1c39dfaef09538ad63e4d5d6d9a343c9bfe9f8d3
Parents: 328dea6
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Mon Mar 26 20:45:20 2018 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Mon Mar 26 20:45:20 2018 +0200

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 16 +++++
 .../spark/sql/catalyst/expressions/misc.scala   | 27 ++++++--
 .../analysis/ResolvedUuidExpressionsSuite.scala | 73 ++++++++++++++++++++
 .../expressions/ExpressionEvalHelper.scala      |  1 +
 .../expressions/MiscExpressionsSuite.scala      | 23 +++++-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  6 ++
 6 files changed, 139 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c39dfae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 251099f..9cc928c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
@@ -177,6 +178,7 @@ class Analyzer(
       TimeWindowing ::
       ResolveInlineTables(conf) ::
       ResolveTimeZone(conf) ::
+      ResolvedUuidExpressions ::
       TypeCoercion.typeCoercionRules(conf) ++
       extendedResolutionRules : _*),
     Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
@@ -1995,6 +1997,20 @@ class Analyzer(
   }
 
   /**
+   * Set the seed for random number generation in Uuid expressions.
+   */
+  object ResolvedUuidExpressions extends Rule[LogicalPlan] {
+    private lazy val random = new Random()
+
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
+      case p if p.resolved => p
+      case p => p transformExpressionsUp {
+        case Uuid(None) => Uuid(Some(random.nextLong()))
+      }
+    }
+  }
+
+  /**
    * Correctly handle null primitive inputs for UDF by adding extra [[If]] expression to do the
    * null check.  When user defines a UDF with primitive parameters, there is no way to tell if the
    * primitive parameter is null or not, so here we assume the primitive input is null-propagatable

http://git-wip-us.apache.org/repos/asf/spark/blob/1c39dfae/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 4b9006a..cdbe611 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -21,6 +21,7 @@ import java.util.UUID
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -117,18 +118,34 @@ case class CurrentDatabase() extends LeafExpression with Unevaluable {
        46707d92-02f4-4817-8116-a4c3b23e6266
   """)
 // scalastyle:on line.size.limit
-case class Uuid() extends LeafExpression {
+case class Uuid(randomSeed: Option[Long] = None) extends LeafExpression with Nondeterministic {
 
-  override lazy val deterministic: Boolean = false
+  def this() = this(None)
+
+  override lazy val resolved: Boolean = randomSeed.isDefined
 
   override def nullable: Boolean = false
 
   override def dataType: DataType = StringType
 
-  override def eval(input: InternalRow): Any = UTF8String.fromString(UUID.randomUUID().toString)
+  @transient private[this] var randomGenerator: RandomUUIDGenerator = _
+
+
+  override protected def initializeInternal(partitionIndex: Int): Unit =
+    randomGenerator = RandomUUIDGenerator(randomSeed.get + partitionIndex)
+
+  override protected def evalInternal(input: InternalRow): Any =
+    randomGenerator.getNextUUIDUTF8String()
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-    ev.copy(code = s"final UTF8String ${ev.value} = " +
-      s"UTF8String.fromString(java.util.UUID.randomUUID().toString());", isNull = "false")
+    val randomGen = ctx.freshName("randomGen")
+    ctx.addMutableState("org.apache.spark.sql.catalyst.util.RandomUUIDGenerator", randomGen,
+      forceInline = true,
+      useFreshName = false)
+    ctx.addPartitionInitializationStatement(s"$randomGen = " +
+      "new org.apache.spark.sql.catalyst.util.RandomUUIDGenerator(" +
+      s"${randomSeed.get}L + partitionIndex);")
+    ev.copy(code = s"final UTF8String ${ev.value} = $randomGen.getNextUUIDUTF8String();",
+      isNull = "false")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c39dfae/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
new file mode 100644
index 0000000..fe57c19
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolvedUuidExpressionsSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+
+/**
+ * Test suite for resolving Uuid expressions.
+ */
+class ResolvedUuidExpressionsSuite extends AnalysisTest {
+
+  private lazy val a = 'a.int
+  private lazy val r = LocalRelation(a)
+  private lazy val uuid1 = Uuid().as('_uuid1)
+  private lazy val uuid2 = Uuid().as('_uuid2)
+  private lazy val uuid3 = Uuid().as('_uuid3)
+  private lazy val uuid1Ref = uuid1.toAttribute
+
+  private val analyzer = getAnalyzer(caseSensitive = true)
+
+  private def getUuidExpressions(plan: LogicalPlan): Seq[Uuid] = {
+    plan.flatMap {
+      case p =>
+        p.expressions.flatMap(_.collect {
+          case u: Uuid => u
+        })
+    }
+  }
+
+  test("analyzed plan sets random seed for Uuid expression") {
+    val plan = r.select(a, uuid1)
+    val resolvedPlan = analyzer.executeAndCheck(plan)
+    getUuidExpressions(resolvedPlan).foreach { u =>
+      assert(u.resolved)
+      assert(u.randomSeed.isDefined)
+    }
+  }
+
+  test("Uuid expressions should have different random seeds") {
+    val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3)
+    val resolvedPlan = analyzer.executeAndCheck(plan)
+    assert(getUuidExpressions(resolvedPlan).map(_.randomSeed.get).distinct.length == 3)
+  }
+
+  test("Different analyzed plans should have different random seeds in Uuids") {
+    val plan = r.select(a, uuid1).groupBy(uuid1Ref)(uuid2, uuid3)
+    val resolvedPlan1 = analyzer.executeAndCheck(plan)
+    val resolvedPlan2 = analyzer.executeAndCheck(plan)
+    val uuids1 = getUuidExpressions(resolvedPlan1)
+    val uuids2 = getUuidExpressions(resolvedPlan2)
+    assert(uuids1.distinct.length == 3)
+    assert(uuids2.distinct.length == 3)
+    assert(uuids1.intersect(uuids2).length == 0)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c39dfae/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index b4c8eab..fad26f0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -162,6 +162,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
         Alias(expression, s"Optimized($expression)1")() ::
           Alias(expression, s"Optimized($expression)2")() :: Nil),
       expression)
+    plan.initialize(0)
 
     val unsafeRow = plan(inputRow)
     val input = if (inputRow == EmptyRow) "" else s", input: $inputRow"

http://git-wip-us.apache.org/repos/asf/spark/blob/1c39dfae/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
index facc863..593bcd0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import scala.util.Random
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
 
@@ -40,7 +43,23 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
   }
 
   test("uuid") {
-    checkEvaluation(Length(Uuid()), 36)
-    assert(evaluate(Uuid()) !== evaluate(Uuid()))
+    def assertIncorrectEval(f: () => Unit): Unit = {
+      intercept[Exception] {
+        f()
+      }.getMessage().contains("Incorrect evaluation")
+    }
+
+    checkEvaluation(Length(Uuid(Some(0))), 36)
+    val r = new Random()
+    val seed1 = Some(r.nextLong())
+    val uuid1 = evaluate(Uuid(seed1)).asInstanceOf[UTF8String]
+    checkEvaluation(Uuid(seed1), uuid1.toString)
+
+    val seed2 = Some(r.nextLong())
+    val uuid2 = evaluate(Uuid(seed2)).asInstanceOf[UTF8String]
+    assertIncorrectEval(() => checkEvaluationWithoutCodegen(Uuid(seed1), uuid2))
+    assertIncorrectEval(() => checkEvaluationWithGeneratedMutableProjection(Uuid(seed1), uuid2))
+    assertIncorrectEval(() => checkEvalutionWithUnsafeProjection(Uuid(seed1), uuid2))
+    assertIncorrectEval(() => checkEvaluationWithOptimization(Uuid(seed1), uuid2))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c39dfae/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 8b66f77..f7b3393 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.Matchers._
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Uuid
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union}
 import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
@@ -2264,4 +2265,9 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
     checkAnswer(df, Row(0, 10) :: Nil)
     assert(df.queryExecution.executedPlan.isInstanceOf[WholeStageCodegenExec])
   }
+
+  test("Uuid expressions should produce same results at retries in the same DataFrame") {
+    val df = spark.range(1).select($"id", new Column(Uuid()))
+    checkAnswer(df, df.collect())
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org