You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yang <te...@gmail.com> on 2016/10/31 07:59:09 UTC
task not serializable in case of groupByKey() + mapGroups + map?
with the following simple code
val a =
sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
val grouped = a.groupByKey({x:(Int,Int)=>x._1})
val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)})
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>{
val simpley = yyy.value
1
})
I'm seeing error:
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2053)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.map(RDD.scala:365)
... 56 elided
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class:
org.apache.spark.sql.execution.QueryExecution, value: == Parsed Logical
Plan ==
'AppendColumns <function1>, unresolveddeserializer(newInstance(class
scala.Tuple2)), [input[0, int, true] AS value#210]
+- LogicalRDD [_1#201, _2#202]
== Analyzed Logical Plan ==
_1: int, _2: int, value: int
AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]
== Optimized Logical Plan ==
AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]
== Physical Plan ==
AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- Scan ExistingRDD[_1#201,_2#202])
- field (class: org.apache.spark.sql.KeyValueGroupedDataset, name:
queryExecution, type: class org.apache.spark.sql.execution.QueryExecution)
- object (class org.apache.spark.sql.KeyValueGroupedDataset,
org.apache.spark.sql.KeyValueGroupedDataset@71148f10)
- field (class: $iw, name: grouped, type: class
org.apache.spark.sql.KeyValueGroupedDataset)
- object (class $iw, $iw@7b1c13e4)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3e9a0c21)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@218cc682)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2ecedd08)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@79efd402)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@d81976c)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2d5d6e2a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@74dc6a7a)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e220d85)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1c790a4f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1d954b06)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@1343c904)
- field (class: $line115.$read, name: $iw, type: class $iw)
- object (class $line115.$read, $line115.$read@42497908)
- field (class: $iw, name: $line115$read, type: class
$line115.$read)
- object (class $iw, $iw@af36da5)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@2fd5b99a)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 65 more