You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/11/01 18:18:45 UTC
spark git commit: [SPARK-18189][SQL] Fix serialization issue in
KeyValueGroupedDataset
Repository: spark
Updated Branches:
refs/heads/master 8cdf143f4 -> 8a538c97b
[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset
## What changes were proposed in this pull request?
Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient.
As mentioned in the Jira ticket, without transient we saw serialization issues like
```
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: ==
```
## How was this patch tested?
Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```
Author: Ergin Seyfe <es...@fb.com>
Closes #15706 from seyfe/keyvaluegrouped_serialization.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a538c97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a538c97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a538c97
Branch: refs/heads/master
Commit: 8a538c97b556f80f67c80519af0ce879557050d5
Parents: 8cdf143
Author: Ergin Seyfe <es...@fb.com>
Authored: Tue Nov 1 11:18:42 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Nov 1 11:18:42 2016 -0700
----------------------------------------------------------------------
.../scala/org/apache/spark/repl/ReplSuite.scala | 17 +++++++++++++++++
.../apache/spark/sql/KeyValueGroupedDataset.scala | 2 +-
2 files changed, 18 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 9262e93..96d2dfc 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
assertDoesNotContain("AssertionError", output)
assertDoesNotContain("Exception", output)
}
+
+ test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+ val resultValue = 12345
+ val output = runInterpreter("local",
+ s"""
+ |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+ |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+ |val broadcasted = sc.broadcast($resultValue)
+ |
+ |// Using broadcast triggers serialization issue in KeyValueGroupedDataset
+ |val dataset = mapGroups.map(_ => broadcasted.value)
+ |dataset.collect()
+ """.stripMargin)
+ assertDoesNotContain("error:", output)
+ assertDoesNotContain("Exception", output)
+ assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 4cb0313..31ce8eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
class KeyValueGroupedDataset[K, V] private[sql](
kEncoder: Encoder[K],
vEncoder: Encoder[V],
- val queryExecution: QueryExecution,
+ @transient val queryExecution: QueryExecution,
private val dataAttributes: Seq[Attribute],
private val groupingAttributes: Seq[Attribute]) extends Serializable {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org