You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2020/08/22 14:25:40 UTC
[spark] branch master updated: [SPARK-32526][SQL] Pass all test of
sql/catalyst module in Scala 2.13
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 25c7d0f [SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13
25c7d0f is described below
commit 25c7d0fe6ae20a4c1c42e0cd0b448c08ab03f3fb
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Sat Aug 22 09:24:16 2020 -0500
[SPARK-32526][SQL] Pass all test of sql/catalyst module in Scala 2.13
### What changes were proposed in this pull request?
The purpose of this pr is to resolve [SPARK-32526](https://issues.apache.org/jira/browse/SPARK-32526), all remaining failed cases are fixed.
The main change of this pr as follow:
- Change of `ExecutorAllocationManager.scala` for core module compilation in Scala 2.13, it's a blocking problem
- Change `Seq[_]` to `scala.collection.Seq[_]` refer to failed cases
- Added different expected plan of `Test 4: Star with several branches` of StarJoinCostBasedReorderSuite for Scala 2.13 because the candidates plans:
```
Join Inner, (d1_pk#5 = f1_fk1#0)
:- Join Inner, (f1_fk2#1 = d2_pk#8)
: :- Join Inner, (f1_fk3#2 = d3_pk#11)
```
and
```
Join Inner, (f1_fk2#1 = d2_pk#8)
:- Join Inner, (d1_pk#5 = f1_fk1#0)
: :- Join Inner, (f1_fk3#2 = d3_pk#11)
```
have same cost `Cost(200,9200)`, but `HashMap` is rewritten in scala 2.13 and The order of iterations leads to different results.
This pr fix test cases as follow:
- LiteralExpressionSuite (1 FAILED -> PASS)
- StarJoinCostBasedReorderSuite ( 1 FAILED-> PASS)
- ObjectExpressionsSuite( 2 FAILED-> PASS)
- ScalaReflectionSuite (1 FAILED-> PASS)
- RowEncoderSuite (10 FAILED-> PASS)
- ExpressionEncoderSuite (ABORTED-> PASS)
### Why are the changes needed?
We need to support a Scala 2.13 build.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
<!--
- Scala 2.12: Pass the Jenkins or GitHub Action
- Scala 2.13: Do the following:
```
dev/change-scala-version.sh 2.13
mvn clean install -DskipTests -pl sql/catalyst -Pscala-2.13 -am
mvn test -pl sql/catalyst -Pscala-2.13
```
**Before**
```
Tests: succeeded 4035, failed 17, canceled 0, ignored 6, pending 0
*** 1 SUITE ABORTED ***
*** 15 TESTS FAILED ***
```
**After**
```
Tests: succeeded 4338, failed 0, canceled 0, ignored 6, pending 0
All tests passed.
```
Closes #29434 from LuciferYang/sql-catalyst-tests.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +-
sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala | 2 +-
.../org/apache/spark/sql/catalyst/CatalystTypeConverters.scala | 2 +-
.../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 7 +++----
.../scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala | 4 +++-
.../apache/spark/sql/catalyst/expressions/objects/objects.scala | 8 ++++----
.../org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala | 3 ++-
.../sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala | 6 ++++--
8 files changed, 19 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 2d8beef..d27ee78 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -595,7 +595,7 @@ private[spark] class ExecutorAllocationManager(
// reset the newExecutorTotal to the existing number of executors
if (testing || executorsRemoved.nonEmpty) {
if (decommissionEnabled) {
- executorMonitor.executorsDecommissioned(executorsRemoved)
+ executorMonitor.executorsDecommissioned(executorsRemoved.toSeq)
} else {
executorMonitor.executorsKilled(executorsRemoved.toSeq)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 5b17f1d..475164a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -314,7 +314,7 @@ trait Row extends Serializable {
*
* @throws ClassCastException when data type does not match.
*/
- def getSeq[T](i: Int): Seq[T] = getAs[Seq[T]](i)
+ def getSeq[T](i: Int): Seq[T] = getAs[scala.collection.Seq[T]](i).toSeq
/**
* Returns the value at position i of array type as `java.util.List`.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 34d2f45..aab944c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -164,7 +164,7 @@ object CatalystTypeConverters {
scalaValue match {
case a: Array[_] =>
new GenericArrayData(a.map(elementConverter.toCatalyst))
- case s: Seq[_] =>
+ case s: scala.collection.Seq[_] =>
new GenericArrayData(s.map(elementConverter.toCatalyst).toArray)
case i: JavaIterable[_] =>
val iter = i.iterator
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 05de21b..a9c8b0b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -284,7 +284,7 @@ object ScalaReflection extends ScalaReflection {
// We serialize a `Set` to Catalyst array. When we deserialize a Catalyst array
// to a `Set`, if there are duplicated elements, the elements will be de-duplicated.
- case t if isSubtype(t, localTypeOf[Seq[_]]) ||
+ case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) ||
isSubtype(t, localTypeOf[scala.collection.Set[_]]) =>
val TypeRef(_, _, Seq(elementType)) = t
val Schema(dataType, elementNullable) = schemaFor(elementType)
@@ -448,10 +448,9 @@ object ScalaReflection extends ScalaReflection {
// Since List[_] also belongs to localTypeOf[Product], we put this case before
// "case t if definedByConstructorParams(t)" to make sure it will match to the
// case "localTypeOf[Seq[_]]"
- case t if isSubtype(t, localTypeOf[Seq[_]]) =>
+ case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) =>
val TypeRef(_, _, Seq(elementType)) = t
toCatalystArray(inputObject, elementType)
-
case t if isSubtype(t, localTypeOf[Array[_]]) =>
val TypeRef(_, _, Seq(elementType)) = t
toCatalystArray(inputObject, elementType)
@@ -686,7 +685,7 @@ object ScalaReflection extends ScalaReflection {
val TypeRef(_, _, Seq(elementType)) = t
val Schema(dataType, nullable) = schemaFor(elementType)
Schema(ArrayType(dataType, containsNull = nullable), nullable = true)
- case t if isSubtype(t, localTypeOf[Seq[_]]) =>
+ case t if isSubtype(t, localTypeOf[scala.collection.Seq[_]]) =>
val TypeRef(_, _, Seq(elementType)) = t
val Schema(dataType, nullable) = schemaFor(elementType)
Schema(ArrayType(dataType, containsNull = nullable), nullable = true)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 765018f..ee63209 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -291,9 +291,11 @@ object RowEncoder {
MapObjects(deserializerFor(_), input, et),
"array",
ObjectType(classOf[Array[_]]), returnNullable = false)
+ // TODO should use `scala.collection.immutable.ArrayDeq.unsafeMake` method to create
+ // `immutable.Seq` in Scala 2.13 when Scala version compatibility is no longer required.
StaticInvoke(
scala.collection.mutable.WrappedArray.getClass,
- ObjectType(classOf[Seq[_]]),
+ ObjectType(classOf[scala.collection.Seq[_]]),
"make",
arrayData :: Nil,
returnNullable = false)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 7cf2c73..4f6a587 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -736,8 +736,8 @@ case class MapObjects private(
}
private lazy val convertToSeq: Any => Seq[_] = inputDataType match {
- case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
- _.asInstanceOf[Seq[_]]
+ case ObjectType(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) =>
+ _.asInstanceOf[scala.collection.Seq[_]].toSeq
case ObjectType(cls) if cls.isArray =>
_.asInstanceOf[Array[_]].toSeq
case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
@@ -758,7 +758,7 @@ case class MapObjects private(
case Some(cls) if classOf[WrappedArray[_]].isAssignableFrom(cls) =>
// Scala WrappedArray
inputCollection => WrappedArray.make(executeFuncOnCollection(inputCollection).toArray)
- case Some(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+ case Some(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) =>
// Scala sequence
executeFuncOnCollection(_).toSeq
case Some(cls) if classOf[scala.collection.Set[_]].isAssignableFrom(cls) =>
@@ -859,7 +859,7 @@ case class MapObjects private(
// need to take care of Seq and List because they may have O(n) complexity for indexed accessing
// like `list.get(1)`. Here we use Iterator to traverse Seq and List.
val (getLength, prepareLoop, getLoopVar) = inputDataType match {
- case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
+ case ObjectType(cls) if classOf[scala.collection.Seq[_]].isAssignableFrom(cls) =>
val it = ctx.freshName("it")
(
s"${genInputData.value}.size()",
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
index 5df2af9..3768f7a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
@@ -136,7 +136,8 @@ object ArrayBasedMapData {
keys.zip(values).toMap
}
- def toScalaMap(keys: Seq[Any], values: Seq[Any]): Map[Any, Any] = {
+ def toScalaMap(keys: scala.collection.Seq[Any],
+ values: scala.collection.Seq[Any]): Map[Any, Any] = {
keys.zip(values).toMap
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index 5b8e59a..a7c0bac 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -328,8 +328,10 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
// level 9: {d1 t5 t3 t6 t2 t4 d3 f1 t1 d2 }
//
// Number of generated plans: 46 (vs. 82)
+ // TODO(SPARK-32687): find a way to make optimization result of `CostBasedJoinReorder`
+ // deterministic even if the input order is different.
val query =
- d1.join(t3).join(t4).join(f1).join(d2).join(t5).join(t6).join(d3).join(t1).join(t2)
+ d1.join(t3).join(t4).join(f1).join(d3).join(d2).join(t5).join(t6).join(t1).join(t2)
.where((nameToAttr("d1_c2") === nameToAttr("t3_c1")) &&
(nameToAttr("t3_c2") === nameToAttr("t4_c2")) &&
(nameToAttr("d1_pk") === nameToAttr("f1_fk1")) &&
@@ -350,7 +352,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
Some(nameToAttr("d3_c2") === nameToAttr("t1_c1")))
.join(t5.join(t6, Inner, Some(nameToAttr("t5_c2") === nameToAttr("t6_c2"))), Inner,
Some(nameToAttr("d2_c2") === nameToAttr("t5_c1")))
- .select(outputsOf(d1, t3, t4, f1, d2, t5, t6, d3, t1, t2): _*)
+ .select(outputsOf(d1, t3, t4, f1, d3, d2, t5, t6, t1, t2): _*)
assertEqualPlans(query, expected)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org