You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/30 10:00:49 UTC
spark git commit: [SPARK-22385][SQL] MapObjects should not access
list element by index
Repository: spark
Updated Branches:
refs/heads/master 9f5c77ae3 -> 9f02d7dc5
[SPARK-22385][SQL] MapObjects should not access list element by index
## What changes were proposed in this pull request?
This issue was discovered and investigated by Ohad Raviv and Sean Owen in https://issues.apache.org/jira/browse/SPARK-21657. The input data of `MapObjects` may be a `List` which has O(n) complexity for accessing by index. When converting input data to catalyst array, `MapObjects` gets element by index in each loop, and results to bad performance.
This PR fixes this issue by accessing elements via Iterator.
## How was this patch tested?
using the test script in https://issues.apache.org/jira/browse/SPARK-21657
```
val BASE = 100000000
val N = 100000
val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => (x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList ))).toDF("c1", "c_arr")
spark.time(df.queryExecution.toRdd.foreach(_ => ()))
```
We can see 50x speed up.
Author: Wenchen Fan <we...@databricks.com>
Closes #19603 from cloud-fan/map-objects.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f02d7dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f02d7dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f02d7dc
Branch: refs/heads/master
Commit: 9f02d7dc537b73988468b11337dbb14a8602f246
Parents: 9f5c77a
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Oct 30 11:00:44 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Oct 30 11:00:44 2017 +0100
----------------------------------------------------------------------
.../catalyst/expressions/objects/objects.scala | 40 ++++++++++++++++----
1 file changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9f02d7dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 9b28a18..6ae3490 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -591,18 +591,43 @@ case class MapObjects private(
case _ => inputData.dataType
}
- val (getLength, getLoopVar) = inputDataType match {
+ // `MapObjects` generates a while loop to traverse the elements of the input collection. We
+ // need to take care of Seq and List because they may have O(n) complexity for indexed accessing
+ // like `list.get(1)`. Here we use Iterator to traverse Seq and List.
+ val (getLength, prepareLoop, getLoopVar) = inputDataType match {
case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
- s"${genInputData.value}.size()" -> s"${genInputData.value}.apply($loopIndex)"
+ val it = ctx.freshName("it")
+ (
+ s"${genInputData.value}.size()",
+ s"scala.collection.Iterator $it = ${genInputData.value}.toIterator();",
+ s"$it.next()"
+ )
case ObjectType(cls) if cls.isArray =>
- s"${genInputData.value}.length" -> s"${genInputData.value}[$loopIndex]"
+ (
+ s"${genInputData.value}.length",
+ "",
+ s"${genInputData.value}[$loopIndex]"
+ )
case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
- s"${genInputData.value}.size()" -> s"${genInputData.value}.get($loopIndex)"
+ val it = ctx.freshName("it")
+ (
+ s"${genInputData.value}.size()",
+ s"java.util.Iterator $it = ${genInputData.value}.iterator();",
+ s"$it.next()"
+ )
case ArrayType(et, _) =>
- s"${genInputData.value}.numElements()" -> ctx.getValue(genInputData.value, et, loopIndex)
+ (
+ s"${genInputData.value}.numElements()",
+ "",
+ ctx.getValue(genInputData.value, et, loopIndex)
+ )
case ObjectType(cls) if cls == classOf[Object] =>
- s"$seq == null ? $array.length : $seq.size()" ->
- s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)"
+ val it = ctx.freshName("it")
+ (
+ s"$seq == null ? $array.length : $seq.size()",
+ s"scala.collection.Iterator $it = $seq == null ? null : $seq.toIterator();",
+ s"$it == null ? $array[$loopIndex] : $it.next()"
+ )
}
// Make a copy of the data if it's unsafe-backed
@@ -676,6 +701,7 @@ case class MapObjects private(
$initCollection
int $loopIndex = 0;
+ $prepareLoop
while ($loopIndex < $dataLength) {
$loopValue = ($elementJavaType) ($getLoopVar);
$loopNullCheck
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org