You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Rohith Parameshwara <rp...@couponsinc.com> on 2015/10/28 04:41:23 UTC

Task not serializable exception

I am getting this spark not serializable exception when running spark submit in standalone mode. I am trying to use spark streaming which gets its stream from kafka queues.. but it is not able to process the mapping actions on the RDDs from the stream ..the code where the serialization exception occurs as follows.
I have a separate class to manage the contexts... which has the respective getteres and setters:
public class Contexts {
public RedisContext rc=null;
public SparkContext sc=null;
public Gson serializer = new Gson();
public  SparkConf sparkConf = null;//new SparkConf().setAppName("SparkStreamEventProcessingEngine");
public  JavaStreamingContext jssc=null;//new JavaStreamingContext(sparkConf, new Duration(2000));
public  Producer<String, String> kafkaProducer=null;
public  Tuple2<String,Object> hostTup=null;



The class with the main process logic of spark streaming is as follows:
public final class SparkStreamEventProcessingEngine {
     public Contexts contexts= new Contexts();
     public SparkStreamEventProcessingEngine() {
       }

       public static void main(String[] args) {
             SparkStreamEventProcessingEngine temp=new SparkStreamEventProcessingEngine();
             temp.tempfunc();
       }
       private void tempfunc(){

           System.out.println(contexts.getJssc().toString() +"\n"+ contexts.getRc().toString()+"\n"+contexts.getSc().toString() +"\n");
         createRewardProducer();
         Properties props = new Properties();
         try {
             props.load(SparkStreamEventProcessingEngine.class.getResourceAsStream("/application.properties"));
         } catch (IOException e) {
             System.out.println("Error loading application.properties file");
             return ;
         }

         Map<String, Integer> topicMap = new HashMap<String, Integer>();
         topicMap.put(props.getProperty("kafa.inbound.queue"),1);
         JavaPairReceiverInputDStream<String, String> messages =
                 KafkaUtils.createStream(contexts.getJssc(), props.getProperty("kafka.zookeeper.quorum"), props.getProperty("kafka.consumer.group"), topicMap);

      //The exception occurs at this line..
         JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
               //      private static final long serialVersionUID = 1L;

                     public String call(Tuple2<String, String> tuple2) {
                         return tuple2._2();
                     }
         });

         lines.foreachRDD(new Function<JavaRDD<String>,Void>() {
                public Void call(JavaRDD<String> rdd) throws Exception {
                 rdd.foreach(new VoidFunction<String>(){
                           public void call(String stringData) throws Exception {
                                Gson serializer = new Gson();
                         OfferRedeemed event = serializer.fromJson(stringData, OfferRedeemed.class);
                         System.out.println("Incoming Event:" + event.toString());
                         processTactic(event,"51367");
                         processTactic(event,"53740");
                     }
                 });
                 return null;
             }
         });

         contexts.getJssc().start();
         contexts.getJssc().awaitTermination();
       }

       private void processTactic(OfferRedeemed event, String tacticId){
             System.out.println(contexts.getRc().toString()+"hi4");

           TacticDefinition tactic = readTacticDefinition(tacticId);
           boolean conditionMet = false;
           if(tactic != null){
               System.out.println("Evaluating event of type :" + event.getEventType() + " for Tactic : " + tactic.toString());.... And so on.. for respective functionalities...

The exception thrown is as follows:


Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:528)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.SparkContext.withScope(SparkContext.scala:681)
        at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258)
        at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:527)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:157)
        at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.map(JavaDStreamLike.scala:43)
        at com.coupons.stream.processing.SparkStreamEventProcessingEngine.tempfunc(SparkStreamEventProcessingEngine.java:366)
        at com.coupons.stream.processing.SparkStreamEventProcessingEngine.main(SparkStreamEventProcessingEngine.java:346)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: com.coupons.stream.processing.SparkStreamEventProcessingEngine
Serialization stack:
        - object not serializable (class: com.coupons.stream.processing.SparkStreamEventProcessingEngine, value: com.coupons.stream.processing.SparkStreamEventProcessingEngine@6a48a7f3)
        - field (class: com.coupons.stream.processing.SparkStreamEventProcessingEngine$1, name: this$0, type: class com.coupons.stream.processing.SparkStreamEventProcessingEngine)
        - object (class com.coupons.stream.processing.SparkStreamEventProcessingEngine$1, com.coupons.stream.processing.SparkStreamEventProcessingEngine$1@1c6c6f24)
        - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
        - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
        ... 23 more




Any type of help on the topic is appreciated...