You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/24 20:19:33 UTC

git commit: [SPARK-2603][SQL] Remove unnecessary toMap and toList in converting Java collections to Scala collections JsonRDD.scala

Repository: spark
Updated Branches:
  refs/heads/master 9fd141477 -> b352ef175


[SPARK-2603][SQL] Remove unnecessary toMap and toList in converting Java collections to Scala collections JsonRDD.scala

In JsonRDD.scalafy, we are using toMap/toList to convert a Java Map/List to a Scala one. These two operations are pretty expensive because they read elements from a Java Map/List and then load to a Scala Map/List. We can use Scala wrappers to wrap those Java collections instead of using toMap/toList.

I did a quick test to see the performance. I had a 2.9GB cached RDD[String] storing one JSON object per record (twitter dataset). My simple test program is attached below.
```scala
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._

val jsonData = sc.textFile("...")
jsonData.cache.count

val jsonSchemaRDD = sqlContext.jsonRDD(jsonData)
jsonSchemaRDD.registerAsTable("jt")

sqlContext.sql("select count(*) from jt").collect
```
Stages for the schema inference and the table scan both had 48 tasks. These tasks were executed sequentially. For the current implementation, scanning the JSON dataset will materialize values of all fields of a record. The inferred schema of the dataset can be accessed at https://gist.github.com/yhuai/05fe8a57c638c6666f8d.

>From the result, there was no significant difference on running `jsonRDD`. For the simple aggregation query, results are attached below.
```
Original:
Run 1: 26.1s
Run 2: 27.03s
Run 3: 27.035s

With this change:
Run 1: 21.086s
Run 2: 21.035s
Run 3: 21.029s
```

JIRA: https://issues.apache.org/jira/browse/SPARK-2603

Author: Yin Huai <hu...@cse.ohio-state.edu>

Closes #1504 from yhuai/removeToMapToList and squashes the following commits:

6831b77 [Yin Huai] Fix failed tests.
09b9bca [Yin Huai] Merge remote-tracking branch 'upstream/master' into removeToMapToList
d1abdb8 [Yin Huai] Remove unnecessary toMap and toList.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b352ef17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b352ef17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b352ef17

Branch: refs/heads/master
Commit: b352ef175c234a2ea86b72c2f40da2ac69658b2e
Parents: 9fd1414
Author: Yin Huai <hu...@cse.ohio-state.edu>
Authored: Thu Jul 24 11:19:19 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Jul 24 11:19:19 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/json/JsonRDD.scala  | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b352ef17/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index df80dfb..b48c70e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.json
 
-import scala.collection.JavaConversions._
+import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
 import scala.math.BigDecimal
 
 import com.fasterxml.jackson.databind.ObjectMapper
@@ -210,12 +210,12 @@ private[sql] object JsonRDD extends Logging {
           case (k, dataType) => (s"$key.$k", dataType)
         } ++ Set((key, StructType(Nil)))
       }
-      case (key: String, array: List[_]) => {
+      case (key: String, array: Seq[_]) => {
         // The value associated with the key is an array.
         typeOfArray(array) match {
           case ArrayType(StructType(Nil)) => {
             // The elements of this arrays are structs.
-            array.asInstanceOf[List[Map[String, Any]]].flatMap {
+            array.asInstanceOf[Seq[Map[String, Any]]].flatMap {
               element => allKeysWithValueTypes(element)
             }.map {
               case (k, dataType) => (s"$key.$k", dataType)
@@ -229,7 +229,7 @@ private[sql] object JsonRDD extends Logging {
   }
 
   /**
-   * Converts a Java Map/List to a Scala Map/List.
+   * Converts a Java Map/List to a Scala Map/Seq.
    * We do not use Jackson's scala module at here because
    * DefaultScalaModule in jackson-module-scala will make
    * the parsing very slow.
@@ -239,9 +239,9 @@ private[sql] object JsonRDD extends Logging {
       // .map(identity) is used as a workaround of non-serializable Map
       // generated by .mapValues.
       // This issue is documented at https://issues.scala-lang.org/browse/SI-7005
-      map.toMap.mapValues(scalafy).map(identity)
+      JMapWrapper(map).mapValues(scalafy).map(identity)
     case list: java.util.List[_] =>
-      list.toList.map(scalafy)
+      JListWrapper(list).map(scalafy)
     case atom => atom
   }