You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by bigdata4u <bi...@live.com> on 2014/12/24 06:32:07 UTC

Not Serializable exception when integrating SQL and Spark Streaming

I am trying to use sql over Spark streaming using Java. But i am getting
Serialization Exception.

public static void main(String args[]) {
    SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    JavaSparkContext jc = new JavaSparkContext(sparkConf);
    JavaStreamingContext jssc = new JavaStreamingContext(jc, new
Duration(2000));
    jssc.addStreamingListener(new WorkCountMonitor());
    int numThreads = Integer.parseInt(args[3]);
    Map<String,Integer> topicMap = new HashMap<String,Integer>();
    String[] topics = args[2].split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }
    JavaPairReceiverInputDStream<String,String> data =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    data.print();

    JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
String>, Person>() {
            public Person call(Tuple2<String,String> v1) throws Exception {
                String[] stringArray = v1._2.split(",");
                Person Person = new Person();
                Person.setName(stringArray[0]);
                Person.setAge(stringArray[1]);
                return Person;
            }

        });


    final JavaSQLContext sqlContext = new JavaSQLContext(jc);
    streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
        public Void call(JavaRDD<Person> rdd) {

            JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
Person.class);

            subscriberSchema.registerAsTable("people");
            System.out.println("all data");
            JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
            System.out.println("afterwards");

            List<String> males = new ArrayList<String>();

            males = names.map(new Function<Row,String>() {
                public String call(Row row) {
                    return row.getString(0);
                }
            }).collect();
            System.out.println("before for");
            for (String name : males) {
                System.out.println(name);
            }
            return null;
        }
    });
    jssc.start();
    jssc.awaitTermination();

Exception is

14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
org.apache.spark.rdd.RDD.map(RDD.scala:271) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161) at
org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724) 
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.api.java.JavaSQLContext at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Not Serializable exception when integrating SQL and Spark Streaming

Posted by Tarun Garg <bi...@live.com>.
Thanks.  I marked the variable as transient and i moved ahead now i am getting exception in execution the query.
    final static transient SparkConf sparkConf = new SparkConf().setAppName("NumberCount");    final static transient JavaSparkContext jc = new JavaSparkContext(sparkConf);    static transient JavaStreamingContext jssc = new JavaStreamingContext(jc, new Duration(2000));    final static transient JavaSQLContext sqlContext = new JavaSQLContext(jc);    public static void main(String args[]) {//        List<String> males = new ArrayList<String>();        // val ssc = new StreamingContext(...)        jssc.addStreamingListener(new WorkCountMonitor());//        JavaDStream<String> data = jssc.textFileStream("/home/tgarg/data/employee.txt");        int numThreads = Integer.parseInt(args[3]);        Map<String,Integer> topicMap = new HashMap<String,Integer>();        String[] topics = args[2].split(",");        for (String topic : topics) {            topicMap.put(topic, numThreads);        }        JavaPairReceiverInputDStream<String,String> data = KafkaUtils.createStream(jssc, args[0], args[1], topicMap);//        data.window(Duration.apply(1000));        data.print();                JavaDStream<String> streamData = data.map(new Function<Tuple2<String, String>, String>() {            public String call(Tuple2<String,String> tuple2) throws Exception {                return tuple2._2();            }
        });               streamData.foreachRDD(new Function<JavaRDD<String>,Void>() {            public Void call(JavaRDD<String> rdd) {                                if (rdd.count()<1)                    return null;                                try {                    rdd.map(new Function<String, Person>() {                        public Person call(String v1) throws Exception {                            String[] stringArray = v1.split(",");                            Person person = new Person();                            person.setName(stringArray[1]);                            person.setAge(stringArray[0]);                            person.setNumber(stringArray[2]);                            return person;                        }
                    });                                        for (String txt: rdd.collect())                        System.out.println(txt);                                        JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd, Person.class);                    subscriberSchema.registerAsTable("people");                    System.out.println("all data");                    JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");                    System.out.println("afterwards");
                    List<String> males = new ArrayList<String>();
                    males = names.map(new Function<Row,String>() {                        public String call(Row row) {                            return row.getString(0);                        }                    }).collect();                    System.out.println("before for");                    for (String name : males) {                        System.out.println(name);                    }                } catch (Exception e) {                    // TODO Auto-generated catch block                    e.printStackTrace();                }                return null;            }        });        jssc.start();        jssc.awaitTermination();    }
But now i am getting exception
java.lang.IllegalArgumentException: object is not an instance of declaring class        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:606)        at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(JavaSQLContext.scala:112)        at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1$$anonfun$apply$2.apply(JavaSQLContext.scala:111)        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)        at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1.apply(JavaSQLContext.scala:111)        at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$1$$anonfun$apply$1.apply(JavaSQLContext.scala:109)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        at scala.collection.Iterator$class.foreach(Iterator.scala:727)        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)        at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)        at scala.collection.AbstractIterator.to(Iterator.scala:1157)        at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)        at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)        at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)        at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)        at org.apache.spark.rdd.RDD$$anonfun$16.apply(RDD.scala:780)        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)        at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314)        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)        at org.apache.spark.scheduler.Task.run(Task.scala:56)        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)        at java.lang.Thread.run(Thread.java:724)

From: michael@databricks.com
Date: Thu, 25 Dec 2014 00:06:45 -0500
Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming
To: bigdata4u@live.com
CC: lian.cs.zju@gmail.com; user@spark.apache.org

The various spark contexts generally aren't serializable because you can't use them on the executors anyway.  We made SQLContext serializable just because it gets pulled into scope more often due to the implicit conversions its contains.  You should try marking the variable that holds the context with the annotation @transient.
On Wed, Dec 24, 2014 at 7:04 PM, Tarun Garg <bi...@live.com> wrote:



Thanks
I debug this further and below is the cause
Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext        - field (class "com.basic.spark.NumberCount$2", name: "val$sqlContext", type: "class org.apache.spark.sql.api.java.JavaSQLContext")        - object (class "com.basic.spark.NumberCount$2", com.basic.spark.NumberCount$2@69ddbcc7)        - field (class "com.basic.spark.NumberCount$2$1", name: "this$0", type: "class com.basic.spark.NumberCount$2")        - object (class "com.basic.spark.NumberCount$2$1", com.basic.spark.NumberCount$2$1@2524beed)        - field (class "org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name: "fun$1", type: "interface org.apache.spark.api.java.function.Function")
I tried this also http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150
Why there is difference SQLContext is Serializable but JavaSQLContext is not? Spark is designed like this.
Thanks
Date: Wed, 24 Dec 2014 16:23:30 +0800
From: lian.cs.zju@gmail.com
To: bigdata4u@live.com; user@spark.apache.org
Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming


  
    
  
  
    
      Generally you can use -Dsun.io.serialization.extendedDebugInfo=true
        to enable serialization debugging information when serialization
        exceptions are raised.

      On 12/24/14 1:32 PM,
        bigdata4u wrote:

      

      
        

        
          I am trying to use sql over Spark streaming using Java. But i am getting
Serialization Exception.

public static void main(String args[]) {
    SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    JavaSparkContext jc = new JavaSparkContext(sparkConf);
    JavaStreamingContext jssc = new JavaStreamingContext(jc, new
Duration(2000));
    jssc.addStreamingListener(new WorkCountMonitor());
    int numThreads = Integer.parseInt(args[3]);
    Map<String,Integer> topicMap = new HashMap<String,Integer>();
    String[] topics = args[2].split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }
    JavaPairReceiverInputDStream<String,String> data =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    data.print();

    JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
String>, Person>() {
            public Person call(Tuple2<String,String> v1) throws Exception {
                String[] stringArray = v1._2.split(",");
                Person Person = new Person();
                Person.setName(stringArray[0]);
                Person.setAge(stringArray[1]);
                return Person;
            }

        });


    final JavaSQLContext sqlContext = new JavaSQLContext(jc);
    streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
        public Void call(JavaRDD<Person> rdd) {

            JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
Person.class);

            subscriberSchema.registerAsTable("people");
            System.out.println("all data");
            JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
            System.out.println("afterwards");

            List<String> males = new ArrayList<String>();

            males = names.map(new Function<Row,String>() {
                public String call(Row row) {
                    return row.getString(0);
                }
            }).collect();
            System.out.println("before for");
            for (String name : males) {
                System.out.println(name);
            }
            return null;
        }
    });
    jssc.start();
    jssc.awaitTermination();

Exception is

14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
org.apache.spark.rdd.RDD.map(RDD.scala:271) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161) at
org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724) 
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.api.java.JavaSQLContext at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



        
        

      
      

      ​
     		 	   		  

 		 	   		  

Re: Not Serializable exception when integrating SQL and Spark Streaming

Posted by Michael Armbrust <mi...@databricks.com>.
The various spark contexts generally aren't serializable because you can't
use them on the executors anyway.  We made SQLContext serializable just
because it gets pulled into scope more often due to the implicit
conversions its contains.  You should try marking the variable that holds
the context with the annotation @transient.

On Wed, Dec 24, 2014 at 7:04 PM, Tarun Garg <bi...@live.com> wrote:

> Thanks
>
> I debug this further and below is the cause
>
> Caused by: java.io.NotSerializableException:
> org.apache.spark.sql.api.java.JavaSQLContext
>         - field (class "com.basic.spark.NumberCount$2", name:
> "val$sqlContext", type: "class
> org.apache.spark.sql.api.java.JavaSQLContext")
>         - object (class "com.basic.spark.NumberCount$2",
> com.basic.spark.NumberCount$2@69ddbcc7)
>         - field (class "com.basic.spark.NumberCount$2$1", name: "this$0",
> type: "class com.basic.spark.NumberCount$2")
>         - object (class "com.basic.spark.NumberCount$2$1",
> com.basic.spark.NumberCount$2$1@2524beed)
>         - field (class
> "org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name:
> "fun$1", type: "interface org.apache.spark.api.java.function.Function")
>
> I tried this also
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150
>
> Why there is difference SQLContext is Serializable but JavaSQLContext is
> not? Spark is designed like this.
>
> Thanks
>
> ------------------------------
> Date: Wed, 24 Dec 2014 16:23:30 +0800
> From: lian.cs.zju@gmail.com
> To: bigdata4u@live.com; user@spark.apache.org
> Subject: Re: Not Serializable exception when integrating SQL and Spark
> Streaming
>
>  Generally you can use -Dsun.io.serialization.extendedDebugInfo=true to
> enable serialization debugging information when serialization exceptions
> are raised.
>
> On 12/24/14 1:32 PM, bigdata4u wrote:
>
>
>  I am trying to use sql over Spark streaming using Java. But i am getting
> Serialization Exception.
>
> public static void main(String args[]) {
>     SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
>     JavaSparkContext jc = new JavaSparkContext(sparkConf);
>     JavaStreamingContext jssc = new JavaStreamingContext(jc, new
> Duration(2000));
>     jssc.addStreamingListener(new WorkCountMonitor());
>     int numThreads = Integer.parseInt(args[3]);
>     Map<String,Integer> topicMap = new HashMap<String,Integer>();
>     String[] topics = args[2].split(",");
>     for (String topic : topics) {
>         topicMap.put(topic, numThreads);
>     }
>     JavaPairReceiverInputDStream<String,String> data =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
>     data.print();
>
>     JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
> String>, Person>() {
>             public Person call(Tuple2<String,String> v1) throws Exception {
>                 String[] stringArray = v1._2.split(",");
>                 Person Person = new Person();
>                 Person.setName(stringArray[0]);
>                 Person.setAge(stringArray[1]);
>                 return Person;
>             }
>
>         });
>
>
>     final JavaSQLContext sqlContext = new JavaSQLContext(jc);
>     streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
>         public Void call(JavaRDD<Person> rdd) {
>
>             JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
> Person.class);
>
>             subscriberSchema.registerAsTable("people");
>             System.out.println("all data");
>             JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
>             System.out.println("afterwards");
>
>             List<String> males = new ArrayList<String>();
>
>             males = names.map(new Function<Row,String>() {
>                 public String call(Row row) {
>                     return row.getString(0);
>                 }
>             }).collect();
>             System.out.println("before for");
>             for (String name : males) {
>                 System.out.println(name);
>             }
>             return null;
>         }
>     });
>     jssc.start();
>     jssc.awaitTermination();
>
> Exception is
>
> 14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
> 1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
> org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
> org.apache.spark.rdd.RDD.map(RDD.scala:271) at
> org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
> org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
> com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
> com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
> org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
> at
> org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
> at
> org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161) at
> org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> Caused by: java.io.NotSerializableException:
> org.apache.spark.sql.api.java.JavaSQLContext at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
> ... 20 more
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
>  ​
>

RE: Not Serializable exception when integrating SQL and Spark Streaming

Posted by Tarun Garg <bi...@live.com>.
Thanks
I debug this further and below is the cause
Caused by: java.io.NotSerializableException: org.apache.spark.sql.api.java.JavaSQLContext        - field (class "com.basic.spark.NumberCount$2", name: "val$sqlContext", type: "class org.apache.spark.sql.api.java.JavaSQLContext")        - object (class "com.basic.spark.NumberCount$2", com.basic.spark.NumberCount$2@69ddbcc7)        - field (class "com.basic.spark.NumberCount$2$1", name: "this$0", type: "class com.basic.spark.NumberCount$2")        - object (class "com.basic.spark.NumberCount$2$1", com.basic.spark.NumberCount$2$1@2524beed)        - field (class "org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1", name: "fun$1", type: "interface org.apache.spark.api.java.function.Function")
I tried this also http://apache-spark-user-list.1001560.n3.nabble.com/How-to-access-objects-declared-and-initialized-outside-the-call-method-of-JavaRDD-td17094.html#a17150
Why there is difference SQLContext is Serializable but JavaSQLContext is not? Spark is designed like this.
Thanks
Date: Wed, 24 Dec 2014 16:23:30 +0800
From: lian.cs.zju@gmail.com
To: bigdata4u@live.com; user@spark.apache.org
Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming


  
    
  
  
    
      Generally you can use -Dsun.io.serialization.extendedDebugInfo=true
        to enable serialization debugging information when serialization
        exceptions are raised.
      On 12/24/14 1:32 PM,
        bigdata4u wrote:
      
      
        

        
          I am trying to use sql over Spark streaming using Java. But i am getting
Serialization Exception.

public static void main(String args[]) {
    SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    JavaSparkContext jc = new JavaSparkContext(sparkConf);
    JavaStreamingContext jssc = new JavaStreamingContext(jc, new
Duration(2000));
    jssc.addStreamingListener(new WorkCountMonitor());
    int numThreads = Integer.parseInt(args[3]);
    Map<String,Integer> topicMap = new HashMap<String,Integer>();
    String[] topics = args[2].split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }
    JavaPairReceiverInputDStream<String,String> data =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    data.print();

    JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
String>, Person>() {
            public Person call(Tuple2<String,String> v1) throws Exception {
                String[] stringArray = v1._2.split(",");
                Person Person = new Person();
                Person.setName(stringArray[0]);
                Person.setAge(stringArray[1]);
                return Person;
            }

        });


    final JavaSQLContext sqlContext = new JavaSQLContext(jc);
    streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
        public Void call(JavaRDD<Person> rdd) {

            JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
Person.class);

            subscriberSchema.registerAsTable("people");
            System.out.println("all data");
            JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
            System.out.println("afterwards");

            List<String> males = new ArrayList<String>();

            males = names.map(new Function<Row,String>() {
                public String call(Row row) {
                    return row.getString(0);
                }
            }).collect();
            System.out.println("before for");
            for (String name : males) {
                System.out.println(name);
            }
            return null;
        }
    });
    jssc.start();
    jssc.awaitTermination();

Exception is

14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
org.apache.spark.rdd.RDD.map(RDD.scala:271) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161) at
org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724) 
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.api.java.JavaSQLContext at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



        
        

      
      
      ​
     		 	   		  

RE: Not Serializable exception when integrating SQL and Spark Streaming

Posted by Tarun Garg <bi...@live.com>.
Thanks for the reply.
I am testing this with a small amount of data and what is happening is when ever there is data in the Kafka topic Spark does not through Exception otherwise it is.
ThanksTarun

Date: Wed, 24 Dec 2014 16:23:30 +0800
From: lian.cs.zju@gmail.com
To: bigdata4u@live.com; user@spark.apache.org
Subject: Re: Not Serializable exception when integrating SQL and Spark Streaming


  
    
  
  
    
      Generally you can use -Dsun.io.serialization.extendedDebugInfo=true
        to enable serialization debugging information when serialization
        exceptions are raised.
      On 12/24/14 1:32 PM,
        bigdata4u wrote:
      
      
        

        
          I am trying to use sql over Spark streaming using Java. But i am getting
Serialization Exception.

public static void main(String args[]) {
    SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
    JavaSparkContext jc = new JavaSparkContext(sparkConf);
    JavaStreamingContext jssc = new JavaStreamingContext(jc, new
Duration(2000));
    jssc.addStreamingListener(new WorkCountMonitor());
    int numThreads = Integer.parseInt(args[3]);
    Map<String,Integer> topicMap = new HashMap<String,Integer>();
    String[] topics = args[2].split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }
    JavaPairReceiverInputDStream<String,String> data =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
    data.print();

    JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
String>, Person>() {
            public Person call(Tuple2<String,String> v1) throws Exception {
                String[] stringArray = v1._2.split(",");
                Person Person = new Person();
                Person.setName(stringArray[0]);
                Person.setAge(stringArray[1]);
                return Person;
            }

        });


    final JavaSQLContext sqlContext = new JavaSQLContext(jc);
    streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
        public Void call(JavaRDD<Person> rdd) {

            JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
Person.class);

            subscriberSchema.registerAsTable("people");
            System.out.println("all data");
            JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
            System.out.println("afterwards");

            List<String> males = new ArrayList<String>();

            males = names.map(new Function<Row,String>() {
                public String call(Row row) {
                    return row.getString(0);
                }
            }).collect();
            System.out.println("before for");
            for (String name : males) {
                System.out.println(name);
            }
            return null;
        }
    });
    jssc.start();
    jssc.awaitTermination();

Exception is

14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
org.apache.spark.rdd.RDD.map(RDD.scala:271) at
org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161) at
org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724) 
Caused by: java.io.NotSerializableException:
org.apache.spark.sql.api.java.JavaSQLContext at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 20 more



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org



        
        

      
      
      ​
     		 	   		  

Re: Not Serializable exception when integrating SQL and Spark Streaming

Posted by Cheng Lian <li...@gmail.com>.
Generally you can use |-Dsun.io.serialization.extendedDebugInfo=true| to 
enable serialization debugging information when serialization exceptions 
are raised.

On 12/24/14 1:32 PM, bigdata4u wrote:

> I am trying to use sql over Spark streaming using Java. But i am getting
> Serialization Exception.
>
> public static void main(String args[]) {
>      SparkConf sparkConf = new SparkConf().setAppName("NumberCount");
>      JavaSparkContext jc = new JavaSparkContext(sparkConf);
>      JavaStreamingContext jssc = new JavaStreamingContext(jc, new
> Duration(2000));
>      jssc.addStreamingListener(new WorkCountMonitor());
>      int numThreads = Integer.parseInt(args[3]);
>      Map<String,Integer> topicMap = new HashMap<String,Integer>();
>      String[] topics = args[2].split(",");
>      for (String topic : topics) {
>          topicMap.put(topic, numThreads);
>      }
>      JavaPairReceiverInputDStream<String,String> data =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
>      data.print();
>
>      JavaDStream<Person> streamData = data.map(new Function<Tuple2&lt;String,
> String>, Person>() {
>              public Person call(Tuple2<String,String> v1) throws Exception {
>                  String[] stringArray = v1._2.split(",");
>                  Person Person = new Person();
>                  Person.setName(stringArray[0]);
>                  Person.setAge(stringArray[1]);
>                  return Person;
>              }
>
>          });
>
>
>      final JavaSQLContext sqlContext = new JavaSQLContext(jc);
>      streamData.foreachRDD(new Function<JavaRDD&lt;Person>,Void>() {
>          public Void call(JavaRDD<Person> rdd) {
>
>              JavaSchemaRDD subscriberSchema = sqlContext.applySchema(rdd,
> Person.class);
>
>              subscriberSchema.registerAsTable("people");
>              System.out.println("all data");
>              JavaSchemaRDD names = sqlContext.sql("SELECT name FROM people");
>              System.out.println("afterwards");
>
>              List<String> males = new ArrayList<String>();
>
>              males = names.map(new Function<Row,String>() {
>                  public String call(Row row) {
>                      return row.getString(0);
>                  }
>              }).collect();
>              System.out.println("before for");
>              for (String name : males) {
>                  System.out.println(name);
>              }
>              return null;
>          }
>      });
>      jssc.start();
>      jssc.awaitTermination();
>
> Exception is
>
> 14/12/23 23:49:38 ERROR JobScheduler: Error running job streaming job
> 1419378578000 ms.1 org.apache.spark.SparkException: Task not serializable at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at
> org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at
> org.apache.spark.rdd.RDD.map(RDD.scala:271) at
> org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:78) at
> org.apache.spark.sql.api.java.JavaSchemaRDD.map(JavaSchemaRDD.scala:42) at
> com.basic.spark.NumberCount$2.call(NumberCount.java:79) at
> com.basic.spark.NumberCount$2.call(NumberCount.java:67) at
> org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:274)
> at
> org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
> at
> org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDD$1.apply(DStream.scala:529)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
> at
> org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161) at
> org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:171)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:724)
> Caused by: java.io.NotSerializableException:
> org.apache.spark.sql.api.java.JavaSQLContext at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1541)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1506)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1175) at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
> ... 20 more
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Not-Serializable-exception-when-integrating-SQL-and-Spark-Streaming-tp20845.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>
​