You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/30 10:00:49 UTC

spark git commit: [SPARK-22385][SQL] MapObjects should not access list element by index

Repository: spark
Updated Branches:
  refs/heads/master 9f5c77ae3 -> 9f02d7dc5


[SPARK-22385][SQL] MapObjects should not access list element by index

## What changes were proposed in this pull request?

This issue was discovered and investigated by Ohad Raviv and Sean Owen in https://issues.apache.org/jira/browse/SPARK-21657. The input data of `MapObjects` may be a `List` which has O(n) complexity for accessing by index. When converting input data to catalyst array, `MapObjects` gets element by index in each loop, and results to bad performance.

This PR fixes this issue by accessing elements via Iterator.

## How was this patch tested?

using the test script in https://issues.apache.org/jira/browse/SPARK-21657
```
val BASE = 100000000
val N = 100000
val df = sc.parallelize(List(("1234567890", (BASE to (BASE+N)).map(x => (x.toString, (x+1).toString, (x+2).toString, (x+3).toString)).toList ))).toDF("c1", "c_arr")
spark.time(df.queryExecution.toRdd.foreach(_ => ()))
```

We can see 50x speed up.

Author: Wenchen Fan <we...@databricks.com>

Closes #19603 from cloud-fan/map-objects.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f02d7dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f02d7dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f02d7dc

Branch: refs/heads/master
Commit: 9f02d7dc537b73988468b11337dbb14a8602f246
Parents: 9f5c77a
Author: Wenchen Fan <we...@databricks.com>
Authored: Mon Oct 30 11:00:44 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Oct 30 11:00:44 2017 +0100

----------------------------------------------------------------------
 .../catalyst/expressions/objects/objects.scala  | 40 ++++++++++++++++----
 1 file changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9f02d7dc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 9b28a18..6ae3490 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -591,18 +591,43 @@ case class MapObjects private(
       case _ => inputData.dataType
     }
 
-    val (getLength, getLoopVar) = inputDataType match {
+    // `MapObjects` generates a while loop to traverse the elements of the input collection. We
+    // need to take care of Seq and List because they may have O(n) complexity for indexed accessing
+    // like `list.get(1)`. Here we use Iterator to traverse Seq and List.
+    val (getLength, prepareLoop, getLoopVar) = inputDataType match {
       case ObjectType(cls) if classOf[Seq[_]].isAssignableFrom(cls) =>
-        s"${genInputData.value}.size()" -> s"${genInputData.value}.apply($loopIndex)"
+        val it = ctx.freshName("it")
+        (
+          s"${genInputData.value}.size()",
+          s"scala.collection.Iterator $it = ${genInputData.value}.toIterator();",
+          s"$it.next()"
+        )
       case ObjectType(cls) if cls.isArray =>
-        s"${genInputData.value}.length" -> s"${genInputData.value}[$loopIndex]"
+        (
+          s"${genInputData.value}.length",
+          "",
+          s"${genInputData.value}[$loopIndex]"
+        )
       case ObjectType(cls) if classOf[java.util.List[_]].isAssignableFrom(cls) =>
-        s"${genInputData.value}.size()" -> s"${genInputData.value}.get($loopIndex)"
+        val it = ctx.freshName("it")
+        (
+          s"${genInputData.value}.size()",
+          s"java.util.Iterator $it = ${genInputData.value}.iterator();",
+          s"$it.next()"
+        )
       case ArrayType(et, _) =>
-        s"${genInputData.value}.numElements()" -> ctx.getValue(genInputData.value, et, loopIndex)
+        (
+          s"${genInputData.value}.numElements()",
+          "",
+          ctx.getValue(genInputData.value, et, loopIndex)
+        )
       case ObjectType(cls) if cls == classOf[Object] =>
-        s"$seq == null ? $array.length : $seq.size()" ->
-          s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)"
+        val it = ctx.freshName("it")
+        (
+          s"$seq == null ? $array.length : $seq.size()",
+          s"scala.collection.Iterator $it = $seq == null ? null : $seq.toIterator();",
+          s"$it == null ? $array[$loopIndex] : $it.next()"
+        )
     }
 
     // Make a copy of the data if it's unsafe-backed
@@ -676,6 +701,7 @@ case class MapObjects private(
         $initCollection
 
         int $loopIndex = 0;
+        $prepareLoop
         while ($loopIndex < $dataLength) {
           $loopValue = ($elementJavaType) ($getLoopVar);
           $loopNullCheck


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org