You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/08/22 14:25:40 UTC

[spark] branch master updated: [SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 25c7d0f  [SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13
25c7d0f is described below

commit 25c7d0fe6ae20a4c1c42e0cd0b448c08ab03f3fb
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Sat Aug 22 09:24:16 2020 -0500

    [SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13
    
    ### What changes were proposed in this pull request?
    The purpose of this pr is to resolve [SPARK-32526](https://issues.apache.org/jira/browse/SPARK-32526), all remaining failed cases are fixed.
    
    The main change of this pr as follow:
    
    - Change of `ExecutorAllocationManager.scala` for core module compilation in Scala 2.13, it's a blocking problem
    
    - Change `Seq[_]` to `scala.collection.Seq[_]` refer to failed cases
    
    - Added different expected plan of `Test 4: Star with several branches` of StarJoinCostBasedReorderSuite  for Scala 2.13 because the candidates plans:
    
    ```
    Join Inner, (d1_pk#5 = f1_fk1#0)
    :- Join Inner, (f1_fk2#1 = d2_pk#8)
    :  :- Join Inner, (f1_fk3#2 = d3_pk#11)
    ```
    and
    
    ```
    Join Inner, (f1_fk2#1 = d2_pk#8)
    :- Join Inner, (d1_pk#5 = f1_fk1#0)
    :  :- Join Inner, (f1_fk3#2 = d3_pk#11)
    ```
    
    have same cost `Cost(200,9200)`, but `HashMap` is rewritten in scala 2.13 and The order of iterations leads to different results.
    
    This pr fix test cases as follow:
    
    - LiteralExpressionSuite (1 FAILED -> PASS)
    - StarJoinCostBasedReorderSuite ( 1 FAILED-> PASS)
    - ObjectExpressionsSuite( 2 FAILED-> PASS)
    - ScalaReflectionSuite (1 FAILED-> PASS)
    - RowEncoderSuite (10 FAILED-> PASS)
    - ExpressionEncoderSuite  (ABORTED-> PASS)
    
    ### Why are the changes needed?
    We need to support a Scala 2.13 build.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    <!--
    - Scala 2.12: Pass the Jenkins or GitHub Action
    
    - Scala 2.13: Do the following:
    
    ```
    dev/change-scala-version.sh 2.13
    mvn clean install -DskipTests  -pl sql/catalyst -Pscala-2.13 -am
    mvn test -pl sql/catalyst -Pscala-2.13
    ```
    
    **Before**
    ```
    Tests: succeeded 4035, failed 17, canceled 0, ignored 6, pending 0
    *** 1 SUITE ABORTED ***
    *** 15 TESTS FAILED ***
    ```
    
    **After**
    
    ```
    Tests: succeeded 4338, failed 0, canceled 0, ignored 6, pending 0
    All tests passed.
    ```
    
    Closes #29434 from LuciferYang/sql-catalyst-tests.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../main/scala/org/apache/spark/ExecutorAllocationManager.scala   | 2 +-
 sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala        | 2 +-
 .../org/apache/spark/sql/catalyst/CatalystTypeConverters.scala    | 2 +-
 .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala     | 7 +++----
 .../scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala | 4 +++-
 .../apache/spark/sql/catalyst/expressions/objects/objects.scala   | 8 ++++----
 .../org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala    | 3 ++-
 .../sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala    | 6 ++++--
 8 files changed, 19 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 2d8beef..d27ee78 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -595,7 +595,7 @@ private[spark] class ExecutorAllocationManager(
     // reset the newExecutorTotal to the existing number of executors
     if (testing || executorsRemoved.nonEmpty) {
       if (decommissionEnabled) {
-        executorMonitor.executorsDecommissioned(executorsRemoved)
+        executorMonitor.executorsDecommissioned(executorsRemoved.toSeq)
       } else {
         executorMonitor.executorsKilled(executorsRemoved.toSeq)
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 5b17f1d..475164a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -314,7 +314,7 @@ trait Row extends Serializable {
    *
    * @throws ClassCastException when data type does not match.
    */
-  def getSeq[T](i: Int): Seq[T] = getAs[Seq[T]](i)
+  def getSeq[T](i: Int): Seq[T] = getAs[scala.collection.Seq[T]](i).toSeq
 
   /**
    * Returns the value at position i of array type as `java.util.List`.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 34d2f45..aab944c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -164,7 +164,7 @@ object CatalystTypeConverters {
       scalaValue match {
         case a: Array[_] =>
           new GenericArrayData(a.map(elementConverter.toCatalyst))
-        case s: Seq[_] =>
+        case s: scala.collection.Seq[_] =>
           new GenericArrayData(s.map(elementConverter.toCatalyst).toArray)
         case i: JavaIterable[_] =>
           val iter = i.iterator
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 05de21b..a9c8b0b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -284,7 +284,7 @@ object ScalaReflection extends ScalaReflection {
 
       // We serialize a `Set` to Catalyst array. When we deserialize a Catalyst array
       // to a `Set`, if there are duplicated elements, the elements will be de-duplicated.
-      case t if isSubtype(t, localTypeOf[Seq[_]]) ||
+      case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) ||
           isSubtype(t, localTypeOf[scala.collection.Set[_]]) =>
         val TypeRef(_, _, Seq(elementType)) = t
         val Schema(dataType, elementNullable) = schemaFor(elementType)
@@ -448,10 +448,9 @@ object ScalaReflection extends ScalaReflection {
       // Since List[_] also belongs to localTypeOf[Product], we put this case before
       // "case t if definedByConstructorParams(t)" to make sure it will match to the
       // case "localTypeOf[Seq[_]]"
-      case t if isSubtype(t, localTypeOf[Seq[_]]) =>
+      case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) =>
         val TypeRef(_, _, Seq(elementType)) = t
         toCatalystArray(inputObject, elementType)
-
       case t if isSubtype(t, localTypeOf[Array[_]]) =>
         val TypeRef(_, _, Seq(elementType)) = t
         toCatalystArray(inputObject, elementType)
@@ -686,7 +685,7 @@ object ScalaReflection extends ScalaReflection {
         val TypeRef(_, _, Seq(elementType)) = t
         val Schema(dataType, nullable) = schemaFor(elementType)
         Schema(ArrayType(dataType, containsNull = nullable), nullable = true)
-      case t if isSubtype(t, localTypeOf[Seq[_]]) =>
+      case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) =>
         val TypeRef(_, _, Seq(elementType)) = t
         val Schema(dataType, nullable) = schemaFor(elementType)
         Schema(ArrayType(dataType, containsNull = nullable), nullable = true)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 765018f..ee63209 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -291,9 +291,11 @@ object RowEncoder {
           MapObjects(deserializerFor(_), input, et),
           "array",
           ObjectType(classOf[Array[_]]), returnNullable = false)
+      // TODO should use `scala.collection.immutable.ArrayDeq.unsafeMake` method to create
+      //  `immutable.Seq` in Scala 2.13 when Scala version compatibility is no longer required.
       StaticInvoke(
         scala.collection.mutable.WrappedArray.getClass,
-        ObjectType(classOf[Seq[_]]),
+        ObjectType(classOf[scala.collection.Seq[_]]),
         "make",
         arrayData :: Nil,
         returnNullable = false)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 7cf2c73..4f6a587 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -736,8 +736,8 @@ case class MapObjects private(
   }
 
   private lazy val convertToSeq: Any => Seq[_] = inputDataType match {
-    case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
-      _.asInstanceOf[Seq[_]]
+    case ObjectType(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) =>
+      _.asInstanceOf[scala.collection.Seq[_]].toSeq
     case ObjectType(cls) if cls.isArray =>
       _.asInstanceOf[Array[_]].toSeq
     case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
@@ -758,7 +758,7 @@ case class MapObjects private(
     case Some(cls) if classOf[WrappedArray[_]].isAssignableFrom(cls) =>
       // Scala WrappedArray
       inputCollection => WrappedArray.make(executeFuncOnCollection(inputCollection).toArray)
-    case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+    case Some(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) =>
       // Scala sequence
       executeFuncOnCollection(_).toSeq
     case Some(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
@@ -859,7 +859,7 @@ case class MapObjects private(
     // need to take care of Seq and List because they may have O(n) complexity for indexed accessing
     // like `list.get(1)`. Here we use Iterator to traverse Seq and List.
     val (getLength, prepareLoop, getLoopVar) = inputDataType match {
-      case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+      case ObjectType(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) =>
         val it = ctx.freshName("it")
         (
           s"${genInputData.value}.size()",
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
index 5df2af9..3768f7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
@@ -136,7 +136,8 @@ object ArrayBasedMapData {
     keys.zip(values).toMap
   }
 
-  def toScalaMap(keys: Seq[Any], values: Seq[Any]): Map[Any, Any] = {
+  def toScalaMap(keys: scala.collection.Seq[Any],
+      values: scala.collection.Seq[Any]): Map[Any, Any] = {
     keys.zip(values).toMap
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index 5b8e59a..a7c0bac 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -328,8 +328,10 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
     // level 9: {d1 t5 t3 t6 t2 t4 d3 f1 t1 d2 }
     //
     // Number of generated plans: 46 (vs. 82)
+    // TODO(SPARK-32687): find a way to make optimization result of `CostBasedJoinReorder`
+    //  deterministic even if the input order is different.
     val query =
-      d1.join(t3).join(t4).join(f1).join(d2).join(t5).join(t6).join(d3).join(t1).join(t2)
+      d1.join(t3).join(t4).join(f1).join(d3).join(d2).join(t5).join(t6).join(t1).join(t2)
         .where((nameToAttr("d1_c2") === nameToAttr("t3_c1")) &&
           (nameToAttr("t3_c2") === nameToAttr("t4_c2")) &&
           (nameToAttr("d1_pk") === nameToAttr("f1_fk1")) &&
@@ -350,7 +352,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
           Some(nameToAttr("d3_c2") === nameToAttr("t1_c1")))
         .join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === nameToAttr("t6_c2"))), Inner,
           Some(nameToAttr("d2_c2") === nameToAttr("t5_c1")))
-        .select(outputsOf(d1, t3, t4, f1, d2, t5, t6, d3, t1, t2): _*)
+        .select(outputsOf(d1, t3, t4, f1, d3, d2, t5, t6, t1, t2): _*)
 
     assertEqualPlans(query, expected)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org