You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/01 18:18:45 UTC

spark git commit: [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

Repository: spark
Updated Branches:
  refs/heads/master 8cdf143f4 -> 8a538c97b


[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

## What changes were proposed in this pull request?
Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues like

```
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe <es...@fb.com>

Closes #15706 from seyfe/keyvaluegrouped_serialization.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a538c97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a538c97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a538c97

Branch: refs/heads/master
Commit: 8a538c97b556f80f67c80519af0ce879557050d5
Parents: 8cdf143
Author: Ergin Seyfe <es...@fb.com>
Authored: Tue Nov 1 11:18:42 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 11:18:42 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/repl/ReplSuite.scala    | 17 +++++++++++++++++
 .../apache/spark/sql/KeyValueGroupedDataset.scala  |  2 +-
 2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 9262e93..96d2dfc 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
     assertDoesNotContain("AssertionError", output)
     assertDoesNotContain("Exception", output)
   }
+
+  test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+    val resultValue = 12345
+    val output = runInterpreter("local",
+      s"""
+         |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+         |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+         |val broadcasted = sc.broadcast($resultValue)
+         |
+         |// Using broadcast triggers serialization issue in KeyValueGroupedDataset
+         |val dataset = mapGroups.map(_ => broadcasted.value)
+         |dataset.collect()
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 4cb0313..31ce8eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
 class KeyValueGroupedDataset[K, V] private[sql](
     kEncoder: Encoder[K],
     vEncoder: Encoder[V],
-    val queryExecution: QueryExecution,
+    @transient val queryExecution: QueryExecution,
     private val dataAttributes: Seq[Attribute],
     private val groupingAttributes: Seq[Attribute]) extends Serializable {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org