You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/24 14:35:42 UTC

spark git commit: [SPARK-13390][SQL][BRANCH-1.6] Fix the issue that Iterator.map().toSeq is not Serializable

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 573a2c97e -> 06f4fce29


[SPARK-13390][SQL][BRANCH-1.6] Fix the issue that Iterator.map().toSeq is not Serializable

## What changes were proposed in this pull request?

`scala.collection.Iterator`'s methods (e.g., map, filter) will return an `AbstractIterator` which is not Serializable. E.g.,
```Scala
scala> val iter = Array(1, 2, 3).iterator.map(_ + 1)
iter: Iterator[Int] = non-empty iterator

scala> println(iter.isInstanceOf[Serializable])
false
```
If we call something like `Iterator.map(...).toSeq`, it will create a `Stream` that contains a non-serializable `AbstractIterator` field and make the `Stream` be non-serializable.

This PR uses `toArray` instead of `toSeq` to fix such issue in `def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame`.

## How was the this patch tested?

Jenkins tests.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #11334 from zsxwing/SPARK-13390.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06f4fce2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06f4fce2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06f4fce2

Branch: refs/heads/branch-1.6
Commit: 06f4fce29227f9763d9f9abff6e7459542dce261
Parents: 573a2c9
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Feb 24 13:35:36 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Feb 24 13:35:36 2016 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala    |  2 +-
 .../org/apache/spark/sql/SQLContextSuite.scala     | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06f4fce2/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4e26250..47fd7fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -604,7 +604,7 @@ class SQLContext private[sql](
     val className = beanClass.getName
     val beanInfo = Introspector.getBeanInfo(beanClass)
     val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
-    DataFrame(self, LocalRelation(attrSeq, rows.toSeq))
+    DataFrame(self, LocalRelation(attrSeq, rows.toArray))
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/06f4fce2/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 1994dac..9bf865d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -65,4 +65,21 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext{
       session2.sql("select myadd(1, 2)").explain()
     }
   }
+
+  test("SPARK-13390: createDataFrame(java.util.List[_],Class[_]) NotSerializableException") {
+    val rows = new java.util.ArrayList[IntJavaBean]()
+    rows.add(new IntJavaBean(1))
+    val sqlContext = SQLContext.getOrCreate(sc)
+    // Without the fix for SPARK-13390, this will throw NotSerializableException
+    sqlContext.createDataFrame(rows, classOf[IntJavaBean]).groupBy("int").count().collect()
+  }
+}
+
+class IntJavaBean(private var i: Int) extends Serializable {
+
+  def getInt(): Int = i
+
+  def setInt(i: Int): Unit = {
+    this.i = i
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org