You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yang <te...@gmail.com> on 2016/10/31 07:59:09 UTC

task not serializable in case of groupByKey() + mapGroups + map?

with the following simple code


    val a =
sc.createDataFrame(sc.parallelize(Seq((1,2),(3,4)))).as[(Int,Int)]
    val grouped = a.groupByKey({x:(Int,Int)=>x._1})
    val mappedGroups = grouped.mapGroups((k,x)=>{(k,1)})
    val yyy = sc.broadcast(1)
    val last = mappedGroups.rdd.map(xx=>{
      val simpley = yyy.value

      1
    })



I'm seeing error:
org.apache.spark.SparkException: Task not serializable
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2053)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:366)
  at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:365)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
  at org.apache.spark.rdd.RDD.map(RDD.scala:365)
  ... 56 elided
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
        - object not serializable (class:
org.apache.spark.sql.execution.QueryExecution, value: == Parsed Logical
Plan ==
'AppendColumns <function1>, unresolveddeserializer(newInstance(class
scala.Tuple2)), [input[0, int, true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Analyzed Logical Plan ==
_1: int, _2: int, value: int
AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Optimized Logical Plan ==
AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- LogicalRDD [_1#201, _2#202]

== Physical Plan ==
AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, int,
true] AS value#210]
+- Scan ExistingRDD[_1#201,_2#202])
        - field (class: org.apache.spark.sql.KeyValueGroupedDataset, name:
queryExecution, type: class org.apache.spark.sql.execution.QueryExecution)
        - object (class org.apache.spark.sql.KeyValueGroupedDataset,
org.apache.spark.sql.KeyValueGroupedDataset@71148f10)
        - field (class: $iw, name: grouped, type: class
org.apache.spark.sql.KeyValueGroupedDataset)
        - object (class $iw, $iw@7b1c13e4)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@3e9a0c21)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@218cc682)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@2ecedd08)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@79efd402)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@d81976c)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@2d5d6e2a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@74dc6a7a)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@5e220d85)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@1c790a4f)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@1d954b06)
        - field (class: $iw, name: $iw, type: class $iw)
        - object (class $iw, $iw@1343c904)
        - field (class: $line115.$read, name: $iw, type: class $iw)
        - object (class $line115.$read, $line115.$read@42497908)
        - field (class: $iw, name: $line115$read, type: class
$line115.$read)
        - object (class $iw, $iw@af36da5)
        - field (class: $iw, name: $outer, type: class $iw)
        - object (class $iw, $iw@2fd5b99a)
        - field (class: $anonfun$1, name: $outer, type: class $iw)
        - object (class $anonfun$1, <function1>)
  at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
  ... 65 more