You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ramkumar V <ra...@gmail.com> on 2015/10/29 13:44:10 UTC

Exception while reading from kafka stream

Hi,

I'm trying to read from kafka stream and printing it textfile. I'm using
java over spark. I dont know why i'm getting the following exception.
Also exception message is very abstract.  can anyone please help me ?

Log Trace :

15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
java.lang.NullPointerException
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
        at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
        at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
        at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
        at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
        at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
        at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
        at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
        at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
        at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
java.lang.NullPointerException
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
        at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
        at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
        at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
        at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
        at
scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
        at
scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
        at scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
        at
org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
        at
org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
        at org.apache.spark.streaming.scheduler.JobGenerator.org
$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
        at
org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)



*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
In general , i need to consume five different type of logs from kafka in
spark. I have different set of topics for each log. How to start five
different stream in spark ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 4:40 PM, Ramkumar V <ra...@gmail.com> wrote:

> I found NPE is mainly because of im using the same JavaStreamingContext
> for some other kafka stream. if i change the name , its running
> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
> getting following exception if i run multiple JavaStreamingContext in
> single file.
>
> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.IllegalStateException: Only one StreamingContext may be started
> in this JVM. Currently running StreamingContext was started
> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> From the code, I think this field "rememberDuration" shouldn't be null,
>> it will be verified at the start, unless some place changes it's value in
>> the runtime that makes it null, but I cannot image how this happened. Maybe
>> you could add some logs around the place where exception happens if you
>> could reproduce it.
>>
>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ra...@gmail.com>
>> wrote:
>>
>>> No. this is the only exception that im getting multiple times in my log.
>>> Also i was reading some other topics earlier but im not faced this NPE.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sa...@gmail.com>
>>> wrote:
>>>
>>>> I just did a local test with your code, seems everything is fine, the
>>>> only difference is that I use the master branch, but I don't think it
>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>> beside this one? Probably this is due to other exceptions that makes this
>>>> system unstable.
>>>>
>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>> my code, it's throwing NPE.
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>> will incur NPE at that place.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> spark version - spark 1.4.1
>>>>>>>
>>>>>>> my code snippet:
>>>>>>>
>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>> String topics = "x,y,z";
>>>>>>> HashSet<String> TopicsSet = new
>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>
>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>            jssc,
>>>>>>>            String.class,
>>>>>>>            String.class,
>>>>>>>            StringDecoder.class,
>>>>>>>            StringDecoder.class,
>>>>>>>            kafkaParams,
>>>>>>>             TopicsSet
>>>>>>>        );
>>>>>>>
>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void>
>>>>>>> () {
>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>
>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>                 return null;
>>>>>>>             }
>>>>>>>        });
>>>>>>>
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.shao@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>>>>> exception. Any idea for this ?
>>>>>>>>>
>>>>>>>>> *Thanks*,
>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the following
>>>>>>>>>> exception. Also exception message is very abstract.  can anyone please help
>>>>>>>>>> me ?
>>>>>>>>>>
>>>>>>>>>> Log Trace :
>>>>>>>>>>
>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>> generator
>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
Thanks a lot , it worked for me. I'm using single direct stream which
retrieves data from all the topic.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Mon, Nov 2, 2015 at 8:13 PM, Cody Koeninger <co...@koeninger.org> wrote:

> combine topicsSet_1 and topicsSet_2 in a single createDirectStream call.
> Then you can use hasOffsetRanges to see what the topic for a given
> partition is.
>
> On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V <ra...@gmail.com>
> wrote:
>
>> if i try like below code snippet , it shows exception , how to avoid this
>> exception ? how to switch processing based on topic ?
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.seconds(30));
>> HashSet<String> topicsSet_1 = new
>> HashSet<String>(Arrays.asList(topics.split(",")));
>> HashSet<String> topicsSet_2 = new
>> HashSet<String>(Arrays.asList(topics.split(",")));
>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>> kafkaParams.put("metadata.broker.list", brokers);
>> JavaPairInputDStream<String, String> messages_1 =
>> KafkaUtils.createDirectStream(
>>            jssc,
>>            String.class,
>>            String.class,
>>            StringDecoder.class,
>>            StringDecoder.class,
>>            kafkaParams,
>>            topicsSet_1
>>        );
>>
>> JavaPairInputDStream<String, String> messages_2 =
>> KafkaUtils.createDirectStream(
>>            jssc,
>>            String.class,
>>            String.class,
>>            StringDecoder.class,
>>            StringDecoder.class,
>>            kafkaParams,
>>             topicsSet_2
>>        );
>>
>> * Log Trace* :
>>
>> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.io.IOException: Failed to delete
>> somedomain/user/hdfs/spark_output/kyt_req/part-00055
>> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception: java.io.IOException:
>> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
>> remote=somedomain]. 59994 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
>> remote=somedomain]. 59998 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain;
>> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.lang.NullPointerException
>> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.NullPointerException)
>> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
>> ApplicationMaster with FAILED (diag message: User class threw exception:
>> java.lang.NullPointerException)
>> java.io.IOException: Failed on local exception:
>> java.io.InterruptedIOException: Interruped while waiting for IO on channel
>> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
>> remote=somedomain]. 59991 millis timeout left.; Host Details : local host
>> is: "somedomain"; destination host is: "somedomain":8020;
>> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
>> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
>> swallowing exception during message send
>> (akka.remote.RemoteTransportExceptionNoStackTrace)
>>
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger <co...@koeninger.org>
>> wrote:
>>
>>> Just put them all in one stream and switch processing based on the topic
>>>
>>> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ra...@gmail.com>
>>> wrote:
>>>
>>>> i want to join all those logs in some manner. That's what i'm trying to
>>>> do.
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> I don't think Spark Streaming supports multiple streaming context in
>>>>> one jvm, you cannot use in such way. Instead you could run multiple
>>>>> streaming applications, since you're using Yarn.
>>>>>
>>>>> 2015年10月30日星期五,Ramkumar V <ra...@gmail.com> 写道:
>>>>>
>>>>>> I found NPE is mainly because of im using the
>>>>>> same JavaStreamingContext for some other kafka stream. if i change the name
>>>>>> , its running successfully. how to run multiple JavaStreamingContext in
>>>>>> a program ?  I'm getting following exception if i run
>>>>>> multiple JavaStreamingContext in single file.
>>>>>>
>>>>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status:
>>>>>> FAILED, exitCode: 15, (reason: User class threw exception:
>>>>>> java.lang.IllegalStateException: Only one StreamingContext may be started
>>>>>> in this JVM. Currently running StreamingContext was started
>>>>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>>>>>
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> From the code, I think this field "rememberDuration" shouldn't be
>>>>>>> null, it will be verified at the start, unless some place changes it's
>>>>>>> value in the runtime that makes it null, but I cannot image how this
>>>>>>> happened. Maybe you could add some logs around the place where exception
>>>>>>> happens if you could reproduce it.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.cs31@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> No. this is the only exception that im getting multiple times in my
>>>>>>>> log. Also i was reading some other topics earlier but im not faced this NPE.
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <
>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I just did a local test with your code, seems everything is fine,
>>>>>>>>> the only difference is that I use the master branch, but I don't think it
>>>>>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>>>>>> system unstable.
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <
>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> No, i dont have any special settings. if i keep only reading line
>>>>>>>>>> in my code, it's throwing NPE.
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <
>>>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Do you have any special settings, from your code, I don't think
>>>>>>>>>>> it will incur NPE at that place.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <
>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> spark version - spark 1.4.1
>>>>>>>>>>>>
>>>>>>>>>>>> my code snippet:
>>>>>>>>>>>>
>>>>>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>>>>>> String topics = "x,y,z";
>>>>>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String,
>>>>>>>>>>>> String>();
>>>>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>>>>>
>>>>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>>>>            jssc,
>>>>>>>>>>>>            String.class,
>>>>>>>>>>>>            String.class,
>>>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>>>            kafkaParams,
>>>>>>>>>>>>             TopicsSet
>>>>>>>>>>>>        );
>>>>>>>>>>>>
>>>>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String ,
>>>>>>>>>>>> String>,Void> () {
>>>>>>>>>>>>             public Void call(JavaPairRDD<String , String>
>>>>>>>>>>>> tuple) {
>>>>>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>>>>>
>>>>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>>>>>                 return null;
>>>>>>>>>>>>             }
>>>>>>>>>>>>        });
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What Spark version are you using, also a small code snippet of
>>>>>>>>>>>>> how you use Spark Streaming would be greatly helpful.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm trying to read from kafka stream and printing it
>>>>>>>>>>>>>>> textfile. I'm using java over spark. I dont know why i'm getting the
>>>>>>>>>>>>>>> following exception. Also exception message is very abstract.  can anyone
>>>>>>>>>>>>>>> please help me ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Log Trace :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>>>>>> generator
>>>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>>> threw exception: java.lang.NullPointerException
>>>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Cody Koeninger <co...@koeninger.org>.
combine topicsSet_1 and topicsSet_2 in a single createDirectStream call.
Then you can use hasOffsetRanges to see what the topic for a given
partition is.

On Mon, Nov 2, 2015 at 7:26 AM, Ramkumar V <ra...@gmail.com> wrote:

> if i try like below code snippet , it shows exception , how to avoid this
> exception ? how to switch processing based on topic ?
>
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(30));
> HashSet<String> topicsSet_1 = new
> HashSet<String>(Arrays.asList(topics.split(",")));
> HashSet<String> topicsSet_2 = new
> HashSet<String>(Arrays.asList(topics.split(",")));
> HashMap<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put("metadata.broker.list", brokers);
> JavaPairInputDStream<String, String> messages_1 =
> KafkaUtils.createDirectStream(
>            jssc,
>            String.class,
>            String.class,
>            StringDecoder.class,
>            StringDecoder.class,
>            kafkaParams,
>            topicsSet_1
>        );
>
> JavaPairInputDStream<String, String> messages_2 =
> KafkaUtils.createDirectStream(
>            jssc,
>            String.class,
>            String.class,
>            StringDecoder.class,
>            StringDecoder.class,
>            kafkaParams,
>             topicsSet_2
>        );
>
> * Log Trace* :
>
> [ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> [ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> [ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
> 15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw
> exception: java.io.IOException: Failed to delete
> somedomain/user/hdfs/spark_output/kyt_req/part-00055
> 15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception: java.io.IOException:
> Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
> remote=somedomain]. 59994 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain":8020;
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
> remote=somedomain]. 59998 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain;
> 15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NullPointerException
> 15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.NullPointerException)
> 15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
> ApplicationMaster with FAILED (diag message: User class threw exception:
> java.lang.NullPointerException)
> java.io.IOException: Failed on local exception:
> java.io.InterruptedIOException: Interruped while waiting for IO on channel
> java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
> remote=somedomain]. 59991 millis timeout left.; Host Details : local host
> is: "somedomain"; destination host is: "somedomain":8020;
> [ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
> [akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
> swallowing exception during message send
> (akka.remote.RemoteTransportExceptionNoStackTrace)
>
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> Just put them all in one stream and switch processing based on the topic
>>
>> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ra...@gmail.com>
>> wrote:
>>
>>> i want to join all those logs in some manner. That's what i'm trying to
>>> do.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sa...@gmail.com>
>>> wrote:
>>>
>>>> I don't think Spark Streaming supports multiple streaming context in
>>>> one jvm, you cannot use in such way. Instead you could run multiple
>>>> streaming applications, since you're using Yarn.
>>>>
>>>> 2015年10月30日星期五,Ramkumar V <ra...@gmail.com> 写道:
>>>>
>>>>> I found NPE is mainly because of im using the
>>>>> same JavaStreamingContext for some other kafka stream. if i change the name
>>>>> , its running successfully. how to run multiple JavaStreamingContext in
>>>>> a program ?  I'm getting following exception if i run
>>>>> multiple JavaStreamingContext in single file.
>>>>>
>>>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status:
>>>>> FAILED, exitCode: 15, (reason: User class threw exception:
>>>>> java.lang.IllegalStateException: Only one StreamingContext may be started
>>>>> in this JVM. Currently running StreamingContext was started
>>>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>>>>
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> From the code, I think this field "rememberDuration" shouldn't be
>>>>>> null, it will be verified at the start, unless some place changes it's
>>>>>> value in the runtime that makes it null, but I cannot image how this
>>>>>> happened. Maybe you could add some logs around the place where exception
>>>>>> happens if you could reproduce it.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> No. this is the only exception that im getting multiple times in my
>>>>>>> log. Also i was reading some other topics earlier but im not faced this NPE.
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.shao@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> I just did a local test with your code, seems everything is fine,
>>>>>>>> the only difference is that I use the master branch, but I don't think it
>>>>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>>>>> system unstable.
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <
>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> No, i dont have any special settings. if i keep only reading line
>>>>>>>>> in my code, it's throwing NPE.
>>>>>>>>>
>>>>>>>>> *Thanks*,
>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <
>>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Do you have any special settings, from your code, I don't think
>>>>>>>>>> it will incur NPE at that place.
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <
>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> spark version - spark 1.4.1
>>>>>>>>>>>
>>>>>>>>>>> my code snippet:
>>>>>>>>>>>
>>>>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>>>>> String topics = "x,y,z";
>>>>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String,
>>>>>>>>>>> String>();
>>>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>>>>
>>>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>>>            jssc,
>>>>>>>>>>>            String.class,
>>>>>>>>>>>            String.class,
>>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>>            kafkaParams,
>>>>>>>>>>>             TopicsSet
>>>>>>>>>>>        );
>>>>>>>>>>>
>>>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String ,
>>>>>>>>>>> String>,Void> () {
>>>>>>>>>>>             public Void call(JavaPairRDD<String , String> tuple)
>>>>>>>>>>> {
>>>>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>>>>
>>>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>>>>                 return null;
>>>>>>>>>>>             }
>>>>>>>>>>>        });
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Thanks*,
>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What Spark version are you using, also a small code snippet of
>>>>>>>>>>>> how you use Spark Streaming would be greatly helpful.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm trying to read from kafka stream and printing it
>>>>>>>>>>>>>> textfile. I'm using java over spark. I dont know why i'm getting the
>>>>>>>>>>>>>> following exception. Also exception message is very abstract.  can anyone
>>>>>>>>>>>>>> please help me ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Log Trace :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>>>>> generator
>>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>>> threw exception: java.lang.NullPointerException
>>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>>         at
>>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
if i try like below code snippet , it shows exception , how to avoid this
exception ? how to switch processing based on topic ?

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(30));
HashSet<String> topicsSet_1 = new
HashSet<String>(Arrays.asList(topics.split(",")));
HashSet<String> topicsSet_2 = new
HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages_1 =
KafkaUtils.createDirectStream(
           jssc,
           String.class,
           String.class,
           StringDecoder.class,
           StringDecoder.class,
           kafkaParams,
           topicsSet_1
       );

JavaPairInputDStream<String, String> messages_2 =
KafkaUtils.createDirectStream(
           jssc,
           String.class,
           String.class,
           StringDecoder.class,
           StringDecoder.class,
           kafkaParams,
            topicsSet_2
       );

* Log Trace* :

[ERROR] [11/02/2015 12:59:08.107] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 12:59:08.104] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
[ERROR] [11/02/2015 13:01:13.812] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:41039/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)
15/11/02 12:59:05 ERROR yarn.ApplicationMaster: User class threw exception:
java.io.IOException: Failed to delete
somedomain/user/hdfs/spark_output/kyt_req/part-00055
15/11/02 12:59:05 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception: java.io.IOException:
Failed to delete somedomain/user/hdfs/spark_output/kyt_req/part-00055)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:40770
remote=somedomain]. 59994 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.200:41898
remote=somedomain]. 59998 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain;
15/11/02 13:01:11 ERROR yarn.ApplicationMaster: User class threw exception:
java.lang.NullPointerException
15/11/02 13:01:11 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
java.lang.NullPointerException)
15/11/02 13:01:13 INFO yarn.ApplicationMaster: Unregistering
ApplicationMaster with FAILED (diag message: User class threw exception:
java.lang.NullPointerException)
java.io.IOException: Failed on local exception:
java.io.InterruptedIOException: Interruped while waiting for IO on channel
java.nio.channels.SocketChannel[connected local=/10.125.4.224:40482
remote=somedomain]. 59991 millis timeout left.; Host Details : local host
is: "somedomain"; destination host is: "somedomain":8020;
[ERROR] [11/02/2015 12:59:08.102] [Executor task launch worker-0]
[akka.tcp://sparkDriver@10.125.4.200:34251/user/CoarseGrainedScheduler]
swallowing exception during message send
(akka.remote.RemoteTransportExceptionNoStackTrace)



*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 7:34 PM, Cody Koeninger <co...@koeninger.org> wrote:

> Just put them all in one stream and switch processing based on the topic
>
> On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ra...@gmail.com>
> wrote:
>
>> i want to join all those logs in some manner. That's what i'm trying to
>> do.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sa...@gmail.com>
>> wrote:
>>
>>> I don't think Spark Streaming supports multiple streaming context in one
>>> jvm, you cannot use in such way. Instead you could run multiple streaming
>>> applications, since you're using Yarn.
>>>
>>> 2015年10月30日星期五,Ramkumar V <ra...@gmail.com> 写道:
>>>
>>>> I found NPE is mainly because of im using the same JavaStreamingContext
>>>> for some other kafka stream. if i change the name , its running
>>>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>>>> getting following exception if i run multiple JavaStreamingContext in
>>>> single file.
>>>>
>>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status:
>>>> FAILED, exitCode: 15, (reason: User class threw exception:
>>>> java.lang.IllegalStateException: Only one StreamingContext may be started
>>>> in this JVM. Currently running StreamingContext was started
>>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>>>
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> From the code, I think this field "rememberDuration" shouldn't be
>>>>> null, it will be verified at the start, unless some place changes it's
>>>>> value in the runtime that makes it null, but I cannot image how this
>>>>> happened. Maybe you could add some logs around the place where exception
>>>>> happens if you could reproduce it.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> No. this is the only exception that im getting multiple times in my
>>>>>> log. Also i was reading some other topics earlier but im not faced this NPE.
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I just did a local test with your code, seems everything is fine,
>>>>>>> the only difference is that I use the master branch, but I don't think it
>>>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>>>> system unstable.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.cs31@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> No, i dont have any special settings. if i keep only reading line
>>>>>>>> in my code, it's throwing NPE.
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <
>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>>>>> will incur NPE at that place.
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <
>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> spark version - spark 1.4.1
>>>>>>>>>>
>>>>>>>>>> my code snippet:
>>>>>>>>>>
>>>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>>>> String topics = "x,y,z";
>>>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String,
>>>>>>>>>> String>();
>>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>>>
>>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>>            jssc,
>>>>>>>>>>            String.class,
>>>>>>>>>>            String.class,
>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>            StringDecoder.class,
>>>>>>>>>>            kafkaParams,
>>>>>>>>>>             TopicsSet
>>>>>>>>>>        );
>>>>>>>>>>
>>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String ,
>>>>>>>>>> String>,Void> () {
>>>>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>>>
>>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>>>                 return null;
>>>>>>>>>>             }
>>>>>>>>>>        });
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What Spark version are you using, also a small code snippet of
>>>>>>>>>>> how you use Spark Streaming would be greatly helpful.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>>>
>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the following
>>>>>>>>>>>>> exception. Also exception message is very abstract.  can anyone please help
>>>>>>>>>>>>> me ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Log Trace :
>>>>>>>>>>>>>
>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>>>> generator
>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>>> threw exception: java.lang.NullPointerException
>>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>>         at
>>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Cody Koeninger <co...@koeninger.org>.
Just put them all in one stream and switch processing based on the topic

On Fri, Oct 30, 2015 at 6:29 AM, Ramkumar V <ra...@gmail.com> wrote:

> i want to join all those logs in some manner. That's what i'm trying to do.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> I don't think Spark Streaming supports multiple streaming context in one
>> jvm, you cannot use in such way. Instead you could run multiple streaming
>> applications, since you're using Yarn.
>>
>> 2015年10月30日星期五,Ramkumar V <ra...@gmail.com> 写道:
>>
>>> I found NPE is mainly because of im using the same JavaStreamingContext
>>> for some other kafka stream. if i change the name , its running
>>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>>> getting following exception if i run multiple JavaStreamingContext in
>>> single file.
>>>
>>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
>>> exitCode: 15, (reason: User class threw exception:
>>> java.lang.IllegalStateException: Only one StreamingContext may be started
>>> in this JVM. Currently running StreamingContext was started
>>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sa...@gmail.com>
>>> wrote:
>>>
>>>> From the code, I think this field "rememberDuration" shouldn't be
>>>> null, it will be verified at the start, unless some place changes it's
>>>> value in the runtime that makes it null, but I cannot image how this
>>>> happened. Maybe you could add some logs around the place where exception
>>>> happens if you could reproduce it.
>>>>
>>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> No. this is the only exception that im getting multiple times in my
>>>>> log. Also i was reading some other topics earlier but im not faced this NPE.
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I just did a local test with your code, seems everything is fine, the
>>>>>> only difference is that I use the master branch, but I don't think it
>>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>>> system unstable.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>>>> my code, it's throwing NPE.
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.shao@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>>>> will incur NPE at that place.
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <
>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> spark version - spark 1.4.1
>>>>>>>>>
>>>>>>>>> my code snippet:
>>>>>>>>>
>>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>>> String topics = "x,y,z";
>>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String,
>>>>>>>>> String>();
>>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>>
>>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>>            jssc,
>>>>>>>>>            String.class,
>>>>>>>>>            String.class,
>>>>>>>>>            StringDecoder.class,
>>>>>>>>>            StringDecoder.class,
>>>>>>>>>            kafkaParams,
>>>>>>>>>             TopicsSet
>>>>>>>>>        );
>>>>>>>>>
>>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String ,
>>>>>>>>> String>,Void> () {
>>>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>>
>>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>>                 return null;
>>>>>>>>>             }
>>>>>>>>>        });
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Thanks*,
>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> What Spark version are you using, also a small code snippet of
>>>>>>>>>> how you use Spark Streaming would be greatly helpful.
>>>>>>>>>>
>>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>>
>>>>>>>>>>> *Thanks*,
>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the following
>>>>>>>>>>>> exception. Also exception message is very abstract.  can anyone please help
>>>>>>>>>>>> me ?
>>>>>>>>>>>>
>>>>>>>>>>>> Log Trace :
>>>>>>>>>>>>
>>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>>> generator
>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class
>>>>>>>>>>>> threw exception: java.lang.NullPointerException
>>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>>         at
>>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>>         at
>>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Thanks*,
>>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
i want to join all those logs in some manner. That's what i'm trying to do.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 4:57 PM, Saisai Shao <sa...@gmail.com> wrote:

> I don't think Spark Streaming supports multiple streaming context in one
> jvm, you cannot use in such way. Instead you could run multiple streaming
> applications, since you're using Yarn.
>
> 2015年10月30日星期五,Ramkumar V <ra...@gmail.com> 写道:
>
>> I found NPE is mainly because of im using the same JavaStreamingContext
>> for some other kafka stream. if i change the name , its running
>> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
>> getting following exception if i run multiple JavaStreamingContext in
>> single file.
>>
>> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
>> exitCode: 15, (reason: User class threw exception:
>> java.lang.IllegalStateException: Only one StreamingContext may be started
>> in this JVM. Currently running StreamingContext was started
>> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sa...@gmail.com>
>> wrote:
>>
>>> From the code, I think this field "rememberDuration" shouldn't be null,
>>> it will be verified at the start, unless some place changes it's value in
>>> the runtime that makes it null, but I cannot image how this happened. Maybe
>>> you could add some logs around the place where exception happens if you
>>> could reproduce it.
>>>
>>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ra...@gmail.com>
>>> wrote:
>>>
>>>> No. this is the only exception that im getting multiple times in my
>>>> log. Also i was reading some other topics earlier but im not faced this NPE.
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> I just did a local test with your code, seems everything is fine, the
>>>>> only difference is that I use the master branch, but I don't think it
>>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>>> beside this one? Probably this is due to other exceptions that makes this
>>>>> system unstable.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>>> my code, it's throwing NPE.
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>>> will incur NPE at that place.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.cs31@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> spark version - spark 1.4.1
>>>>>>>>
>>>>>>>> my code snippet:
>>>>>>>>
>>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>>> String topics = "x,y,z";
>>>>>>>> HashSet<String> TopicsSet = new
>>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>>
>>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>>            jssc,
>>>>>>>>            String.class,
>>>>>>>>            String.class,
>>>>>>>>            StringDecoder.class,
>>>>>>>>            StringDecoder.class,
>>>>>>>>            kafkaParams,
>>>>>>>>             TopicsSet
>>>>>>>>        );
>>>>>>>>
>>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void>
>>>>>>>> () {
>>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>>
>>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>>                 return null;
>>>>>>>>             }
>>>>>>>>        });
>>>>>>>>
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <
>>>>>>>> sai.sai.shao@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>>>>
>>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting
>>>>>>>>>> this exception. Any idea for this ?
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the following
>>>>>>>>>>> exception. Also exception message is very abstract.  can anyone please help
>>>>>>>>>>> me ?
>>>>>>>>>>>
>>>>>>>>>>> Log Trace :
>>>>>>>>>>>
>>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>>> generator
>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>>         at
>>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>>         at
>>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *Thanks*,
>>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: Exception while reading from kafka stream

Posted by Saisai Shao <sa...@gmail.com>.
I don't think Spark Streaming supports multiple streaming context in one
jvm, you cannot use in such way. Instead you could run multiple streaming
applications, since you're using Yarn.

2015年10月30日星期五,Ramkumar V <ra...@gmail.com> 写道:

> I found NPE is mainly because of im using the same JavaStreamingContext
> for some other kafka stream. if i change the name , its running
> successfully. how to run multiple JavaStreamingContext in a program ?  I'm
> getting following exception if i run multiple JavaStreamingContext in
> single file.
>
> 15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
> exitCode: 15, (reason: User class threw exception:
> java.lang.IllegalStateException: Only one StreamingContext may be started
> in this JVM. Currently running StreamingContext was started
> atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sai.sai.shao@gmail.com
> <javascript:_e(%7B%7D,'cvml','sai.sai.shao@gmail.com');>> wrote:
>
>> From the code, I think this field "rememberDuration" shouldn't be null,
>> it will be verified at the start, unless some place changes it's value in
>> the runtime that makes it null, but I cannot image how this happened. Maybe
>> you could add some logs around the place where exception happens if you
>> could reproduce it.
>>
>> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ramkumar.cs31@gmail.com
>> <javascript:_e(%7B%7D,'cvml','ramkumar.cs31@gmail.com');>> wrote:
>>
>>> No. this is the only exception that im getting multiple times in my log.
>>> Also i was reading some other topics earlier but im not faced this NPE.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sai.sai.shao@gmail.com
>>> <javascript:_e(%7B%7D,'cvml','sai.sai.shao@gmail.com');>> wrote:
>>>
>>>> I just did a local test with your code, seems everything is fine, the
>>>> only difference is that I use the master branch, but I don't think it
>>>> changes a lot in this part. Do you met any other exceptions or errors
>>>> beside this one? Probably this is due to other exceptions that makes this
>>>> system unstable.
>>>>
>>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ramkumar.cs31@gmail.com
>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.cs31@gmail.com');>> wrote:
>>>>
>>>>> No, i dont have any special settings. if i keep only reading line in
>>>>> my code, it's throwing NPE.
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sai.sai.shao@gmail.com
>>>>> <javascript:_e(%7B%7D,'cvml','sai.sai.shao@gmail.com');>> wrote:
>>>>>
>>>>>> Do you have any special settings, from your code, I don't think it
>>>>>> will incur NPE at that place.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ramkumar.cs31@gmail.com
>>>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.cs31@gmail.com');>> wrote:
>>>>>>
>>>>>>> spark version - spark 1.4.1
>>>>>>>
>>>>>>> my code snippet:
>>>>>>>
>>>>>>> String brokers = "ip:port,ip:port";
>>>>>>> String topics = "x,y,z";
>>>>>>> HashSet<String> TopicsSet = new
>>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>>
>>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>>> KafkaUtils.createDirectStream(
>>>>>>>            jssc,
>>>>>>>            String.class,
>>>>>>>            String.class,
>>>>>>>            StringDecoder.class,
>>>>>>>            StringDecoder.class,
>>>>>>>            kafkaParams,
>>>>>>>             TopicsSet
>>>>>>>        );
>>>>>>>
>>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void>
>>>>>>> () {
>>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>>
>>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>>                 return null;
>>>>>>>             }
>>>>>>>        });
>>>>>>>
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sai.sai.shao@gmail.com
>>>>>>> <javascript:_e(%7B%7D,'cvml','sai.sai.shao@gmail.com');>> wrote:
>>>>>>>
>>>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>>>
>>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <
>>>>>>>> ramkumar.cs31@gmail.com
>>>>>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.cs31@gmail.com');>> wrote:
>>>>>>>>
>>>>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>>>>> exception. Any idea for this ?
>>>>>>>>>
>>>>>>>>> *Thanks*,
>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>>> ramkumar.cs31@gmail.com
>>>>>>>>> <javascript:_e(%7B%7D,'cvml','ramkumar.cs31@gmail.com');>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm trying to read from kafka stream and printing it textfile.
>>>>>>>>>> I'm using java over spark. I dont know why i'm getting the following
>>>>>>>>>> exception. Also exception message is very abstract.  can anyone please help
>>>>>>>>>> me ?
>>>>>>>>>>
>>>>>>>>>> Log Trace :
>>>>>>>>>>
>>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>>> generator
>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>>         at
>>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>>         at
>>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Thanks*,
>>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
I found NPE is mainly because of im using the same JavaStreamingContext for
some other kafka stream. if i change the name , its running successfully.
how to run multiple JavaStreamingContext in a program ?  I'm getting
following exception if i run multiple JavaStreamingContext in single file.

15/10/30 11:04:29 INFO yarn.ApplicationMaster: Final app status: FAILED,
exitCode: 15, (reason: User class threw exception:
java.lang.IllegalStateException: Only one StreamingContext may be started
in this JVM. Currently running StreamingContext was started
atorg.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:622)


*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 3:25 PM, Saisai Shao <sa...@gmail.com> wrote:

> From the code, I think this field "rememberDuration" shouldn't be null,
> it will be verified at the start, unless some place changes it's value in
> the runtime that makes it null, but I cannot image how this happened. Maybe
> you could add some logs around the place where exception happens if you
> could reproduce it.
>
> On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ra...@gmail.com>
> wrote:
>
>> No. this is the only exception that im getting multiple times in my log.
>> Also i was reading some other topics earlier but im not faced this NPE.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sa...@gmail.com>
>> wrote:
>>
>>> I just did a local test with your code, seems everything is fine, the
>>> only difference is that I use the master branch, but I don't think it
>>> changes a lot in this part. Do you met any other exceptions or errors
>>> beside this one? Probably this is due to other exceptions that makes this
>>> system unstable.
>>>
>>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ra...@gmail.com>
>>> wrote:
>>>
>>>> No, i dont have any special settings. if i keep only reading line in my
>>>> code, it's throwing NPE.
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Do you have any special settings, from your code, I don't think it
>>>>> will incur NPE at that place.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> spark version - spark 1.4.1
>>>>>>
>>>>>> my code snippet:
>>>>>>
>>>>>> String brokers = "ip:port,ip:port";
>>>>>> String topics = "x,y,z";
>>>>>> HashSet<String> TopicsSet = new
>>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>>
>>>>>> JavaPairInputDStream<String, String> messages =
>>>>>> KafkaUtils.createDirectStream(
>>>>>>            jssc,
>>>>>>            String.class,
>>>>>>            String.class,
>>>>>>            StringDecoder.class,
>>>>>>            StringDecoder.class,
>>>>>>            kafkaParams,
>>>>>>             TopicsSet
>>>>>>        );
>>>>>>
>>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void>
>>>>>> () {
>>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>>
>>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>>                 return null;
>>>>>>             }
>>>>>>        });
>>>>>>
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>>
>>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ramkumar.cs31@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>>>> exception. Any idea for this ?
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <
>>>>>>>> ramkumar.cs31@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>>>>>> using java over spark. I dont know why i'm getting the following exception.
>>>>>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>>>>>
>>>>>>>>> Log Trace :
>>>>>>>>>
>>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>>> generator
>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>         at
>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>         at
>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>         at
>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>         at
>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>         at
>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>         at
>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>>> java.lang.NullPointerException
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>>         at
>>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>>         at
>>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>>         at
>>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>>         at
>>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>>         at
>>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>>         at
>>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>>         at
>>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Thanks*,
>>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Saisai Shao <sa...@gmail.com>.
>From the code, I think this field "rememberDuration" shouldn't be null, it
will be verified at the start, unless some place changes it's value in the
runtime that makes it null, but I cannot image how this happened. Maybe you
could add some logs around the place where exception happens if you could
reproduce it.

On Fri, Oct 30, 2015 at 5:31 PM, Ramkumar V <ra...@gmail.com> wrote:

> No. this is the only exception that im getting multiple times in my log.
> Also i was reading some other topics earlier but im not faced this NPE.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> I just did a local test with your code, seems everything is fine, the
>> only difference is that I use the master branch, but I don't think it
>> changes a lot in this part. Do you met any other exceptions or errors
>> beside this one? Probably this is due to other exceptions that makes this
>> system unstable.
>>
>> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ra...@gmail.com>
>> wrote:
>>
>>> No, i dont have any special settings. if i keep only reading line in my
>>> code, it's throwing NPE.
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sa...@gmail.com>
>>> wrote:
>>>
>>>> Do you have any special settings, from your code, I don't think it will
>>>> incur NPE at that place.
>>>>
>>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> spark version - spark 1.4.1
>>>>>
>>>>> my code snippet:
>>>>>
>>>>> String brokers = "ip:port,ip:port";
>>>>> String topics = "x,y,z";
>>>>> HashSet<String> TopicsSet = new
>>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>>
>>>>> JavaPairInputDStream<String, String> messages =
>>>>> KafkaUtils.createDirectStream(
>>>>>            jssc,
>>>>>            String.class,
>>>>>            String.class,
>>>>>            StringDecoder.class,
>>>>>            StringDecoder.class,
>>>>>            kafkaParams,
>>>>>             TopicsSet
>>>>>        );
>>>>>
>>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> ()
>>>>> {
>>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>>
>>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>>                 return null;
>>>>>             }
>>>>>        });
>>>>>
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> What Spark version are you using, also a small code snippet of how
>>>>>> you use Spark Streaming would be greatly helpful.
>>>>>>
>>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>>> exception. Any idea for this ?
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ramkumar.cs31@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>>>>> using java over spark. I dont know why i'm getting the following exception.
>>>>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>>>>
>>>>>>>> Log Trace :
>>>>>>>>
>>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>>> generator
>>>>>>>> java.lang.NullPointerException
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>         at
>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>         at
>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>>> exception: java.lang.NullPointerException
>>>>>>>> java.lang.NullPointerException
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>>         at
>>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>>         at
>>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>>         at
>>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>>         at
>>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>>         at
>>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>>         at
>>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Thanks*,
>>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
No. this is the only exception that im getting multiple times in my log.
Also i was reading some other topics earlier but im not faced this NPE.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 2:50 PM, Saisai Shao <sa...@gmail.com> wrote:

> I just did a local test with your code, seems everything is fine, the only
> difference is that I use the master branch, but I don't think it changes a
> lot in this part. Do you met any other exceptions or errors beside this
> one? Probably this is due to other exceptions that makes this system
> unstable.
>
> On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ra...@gmail.com>
> wrote:
>
>> No, i dont have any special settings. if i keep only reading line in my
>> code, it's throwing NPE.
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sa...@gmail.com>
>> wrote:
>>
>>> Do you have any special settings, from your code, I don't think it will
>>> incur NPE at that place.
>>>
>>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ra...@gmail.com>
>>> wrote:
>>>
>>>> spark version - spark 1.4.1
>>>>
>>>> my code snippet:
>>>>
>>>> String brokers = "ip:port,ip:port";
>>>> String topics = "x,y,z";
>>>> HashSet<String> TopicsSet = new
>>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>>> kafkaParams.put("metadata.broker.list", brokers);
>>>>
>>>> JavaPairInputDStream<String, String> messages =
>>>> KafkaUtils.createDirectStream(
>>>>            jssc,
>>>>            String.class,
>>>>            String.class,
>>>>            StringDecoder.class,
>>>>            StringDecoder.class,
>>>>            kafkaParams,
>>>>             TopicsSet
>>>>        );
>>>>
>>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
>>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>>                 JavaRDD<String>rdd = tuple.values();
>>>>
>>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>>                 return null;
>>>>             }
>>>>        });
>>>>
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sa...@gmail.com>
>>>> wrote:
>>>>
>>>>> What Spark version are you using, also a small code snippet of how you
>>>>> use Spark Streaming would be greatly helpful.
>>>>>
>>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>>> exception. Any idea for this ?
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ra...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>>>> using java over spark. I dont know why i'm getting the following exception.
>>>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>>>
>>>>>>> Log Trace :
>>>>>>>
>>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job
>>>>>>> generator
>>>>>>> java.lang.NullPointerException
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>         at
>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>         at
>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>         at
>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>         at
>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>         at
>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>         at
>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>         at
>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>>> exception: java.lang.NullPointerException
>>>>>>> java.lang.NullPointerException
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>>         at
>>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>>         at
>>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>>         at
>>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>>         at
>>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>>         at
>>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>>         at
>>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>>         at
>>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>>         at
>>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Thanks*,
>>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Saisai Shao <sa...@gmail.com>.
I just did a local test with your code, seems everything is fine, the only
difference is that I use the master branch, but I don't think it changes a
lot in this part. Do you met any other exceptions or errors beside this
one? Probably this is due to other exceptions that makes this system
unstable.

On Fri, Oct 30, 2015 at 5:13 PM, Ramkumar V <ra...@gmail.com> wrote:

> No, i dont have any special settings. if i keep only reading line in my
> code, it's throwing NPE.
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> Do you have any special settings, from your code, I don't think it will
>> incur NPE at that place.
>>
>> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ra...@gmail.com>
>> wrote:
>>
>>> spark version - spark 1.4.1
>>>
>>> my code snippet:
>>>
>>> String brokers = "ip:port,ip:port";
>>> String topics = "x,y,z";
>>> HashSet<String> TopicsSet = new
>>> HashSet<String>(Arrays.asList(topics.split(",")));
>>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>>> kafkaParams.put("metadata.broker.list", brokers);
>>>
>>> JavaPairInputDStream<String, String> messages =
>>> KafkaUtils.createDirectStream(
>>>            jssc,
>>>            String.class,
>>>            String.class,
>>>            StringDecoder.class,
>>>            StringDecoder.class,
>>>            kafkaParams,
>>>             TopicsSet
>>>        );
>>>
>>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
>>>             public Void call(JavaPairRDD<String , String> tuple) {
>>>                 JavaRDD<String>rdd = tuple.values();
>>>
>>> rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>>                 return null;
>>>             }
>>>        });
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sa...@gmail.com>
>>> wrote:
>>>
>>>> What Spark version are you using, also a small code snippet of how you
>>>> use Spark Streaming would be greatly helpful.
>>>>
>>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>>> exception. Any idea for this ?
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ra...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>>> using java over spark. I dont know why i'm getting the following exception.
>>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>>
>>>>>> Log Trace :
>>>>>>
>>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>>>>> java.lang.NullPointerException
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>         at
>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>         at
>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>>> exception: java.lang.NullPointerException
>>>>>> java.lang.NullPointerException
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>>         at
>>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>>         at
>>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>>         at
>>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>>         at
>>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>>         at
>>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>>         at
>>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>>         at
>>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Thanks*,
>>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
No, i dont have any special settings. if i keep only reading line in my
code, it's throwing NPE.

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 2:14 PM, Saisai Shao <sa...@gmail.com> wrote:

> Do you have any special settings, from your code, I don't think it will
> incur NPE at that place.
>
> On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ra...@gmail.com>
> wrote:
>
>> spark version - spark 1.4.1
>>
>> my code snippet:
>>
>> String brokers = "ip:port,ip:port";
>> String topics = "x,y,z";
>> HashSet<String> TopicsSet = new
>> HashSet<String>(Arrays.asList(topics.split(",")));
>> HashMap<String, String> kafkaParams = new HashMap<String, String>();
>> kafkaParams.put("metadata.broker.list", brokers);
>>
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream(
>>            jssc,
>>            String.class,
>>            String.class,
>>            StringDecoder.class,
>>            StringDecoder.class,
>>            kafkaParams,
>>             TopicsSet
>>        );
>>
>> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
>>             public Void call(JavaPairRDD<String , String> tuple) {
>>                 JavaRDD<String>rdd = tuple.values();
>>                 rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>>                 return null;
>>             }
>>        });
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sa...@gmail.com>
>> wrote:
>>
>>> What Spark version are you using, also a small code snippet of how you
>>> use Spark Streaming would be greatly helpful.
>>>
>>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ra...@gmail.com>
>>> wrote:
>>>
>>>> I can able to read and print few lines. Afterthat i'm getting this
>>>> exception. Any idea for this ?
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>>> using java over spark. I dont know why i'm getting the following exception.
>>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>>
>>>>> Log Trace :
>>>>>
>>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>>>> java.lang.NullPointerException
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>         at
>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>         at
>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>         at
>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>>> exception: java.lang.NullPointerException
>>>>> java.lang.NullPointerException
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>>         at
>>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>>         at
>>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>>         at
>>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>>         at
>>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>>         at
>>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>>         at
>>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>>         at
>>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>>         at
>>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>
>>>>>
>>>>>
>>>>> *Thanks*,
>>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Saisai Shao <sa...@gmail.com>.
Do you have any special settings, from your code, I don't think it will
incur NPE at that place.

On Fri, Oct 30, 2015 at 4:32 PM, Ramkumar V <ra...@gmail.com> wrote:

> spark version - spark 1.4.1
>
> my code snippet:
>
> String brokers = "ip:port,ip:port";
> String topics = "x,y,z";
> HashSet<String> TopicsSet = new
> HashSet<String>(Arrays.asList(topics.split(",")));
> HashMap<String, String> kafkaParams = new HashMap<String, String>();
> kafkaParams.put("metadata.broker.list", brokers);
>
> JavaPairInputDStream<String, String> messages =
> KafkaUtils.createDirectStream(
>            jssc,
>            String.class,
>            String.class,
>            StringDecoder.class,
>            StringDecoder.class,
>            kafkaParams,
>             TopicsSet
>        );
>
> messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
>             public Void call(JavaPairRDD<String , String> tuple) {
>                 JavaRDD<String>rdd = tuple.values();
>                 rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
>                 return null;
>             }
>        });
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sa...@gmail.com>
> wrote:
>
>> What Spark version are you using, also a small code snippet of how you
>> use Spark Streaming would be greatly helpful.
>>
>> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ra...@gmail.com>
>> wrote:
>>
>>> I can able to read and print few lines. Afterthat i'm getting this
>>> exception. Any idea for this ?
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ra...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to read from kafka stream and printing it textfile. I'm
>>>> using java over spark. I dont know why i'm getting the following exception.
>>>> Also exception message is very abstract.  can anyone please help me ?
>>>>
>>>> Log Trace :
>>>>
>>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>>> java.lang.NullPointerException
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>         at
>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>         at
>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>         at
>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>         at
>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>         at
>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>>> exception: java.lang.NullPointerException
>>>> java.lang.NullPointerException
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>>         at
>>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>>         at
>>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>>         at
>>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>>         at
>>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>>         at
>>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>>         at
>>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>>         at
>>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>>         at
>>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>
>>>>
>>>>
>>>> *Thanks*,
>>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>>
>>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
spark version - spark 1.4.1

my code snippet:

String brokers = "ip:port,ip:port";
String topics = "x,y,z";
HashSet<String> TopicsSet = new
HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);

JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
           jssc,
           String.class,
           String.class,
           StringDecoder.class,
           StringDecoder.class,
           kafkaParams,
            TopicsSet
       );

messages.foreachRDD(new Function<JavaPairRDD<String , String>,Void> () {
            public Void call(JavaPairRDD<String , String> tuple) {
                JavaRDD<String>rdd = tuple.values();
                rdd.saveAsTextFile("hdfs://myuser:8020/user/hdfs/output");
                return null;
            }
       });


*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Fri, Oct 30, 2015 at 1:57 PM, Saisai Shao <sa...@gmail.com> wrote:

> What Spark version are you using, also a small code snippet of how you use
> Spark Streaming would be greatly helpful.
>
> On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ra...@gmail.com>
> wrote:
>
>> I can able to read and print few lines. Afterthat i'm getting this
>> exception. Any idea for this ?
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm trying to read from kafka stream and printing it textfile. I'm using
>>> java over spark. I dont know why i'm getting the following exception.
>>> Also exception message is very abstract.  can anyone please help me ?
>>>
>>> Log Trace :
>>>
>>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>         at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>         at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>         at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>         at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>         at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>         at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>>> exception: java.lang.NullPointerException
>>> java.lang.NullPointerException
>>>         at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>         at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>>         at
>>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>>         at
>>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>>         at
>>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>>         at
>>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>>         at
>>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>>         at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>>         at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>
>>>
>>>
>>> *Thanks*,
>>> <https://in.linkedin.com/in/ramkumarcs31>
>>>
>>>
>>
>

Re: Exception while reading from kafka stream

Posted by Saisai Shao <sa...@gmail.com>.
What Spark version are you using, also a small code snippet of how you use
Spark Streaming would be greatly helpful.

On Fri, Oct 30, 2015 at 3:57 PM, Ramkumar V <ra...@gmail.com> wrote:

> I can able to read and print few lines. Afterthat i'm getting this
> exception. Any idea for this ?
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>
> On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm trying to read from kafka stream and printing it textfile. I'm using
>> java over spark. I dont know why i'm getting the following exception.
>> Also exception message is very abstract.  can anyone please help me ?
>>
>> Log Trace :
>>
>> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>         at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>         at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>         at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>         at
>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>         at
>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>         at
>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
>> exception: java.lang.NullPointerException
>> java.lang.NullPointerException
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>         at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>>         at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>>         at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>>         at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>>         at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>>         at
>> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>>         at
>> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>>         at
>> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>>         at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>>         at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>
>>
>> *Thanks*,
>> <https://in.linkedin.com/in/ramkumarcs31>
>>
>>
>

Re: Exception while reading from kafka stream

Posted by Ramkumar V <ra...@gmail.com>.
I can able to read and print few lines. Afterthat i'm getting this
exception. Any idea for this ?

*Thanks*,
<https://in.linkedin.com/in/ramkumarcs31>


On Thu, Oct 29, 2015 at 6:14 PM, Ramkumar V <ra...@gmail.com> wrote:

> Hi,
>
> I'm trying to read from kafka stream and printing it textfile. I'm using
> java over spark. I dont know why i'm getting the following exception.
> Also exception message is very abstract.  can anyone please help me ?
>
> Log Trace :
>
> 15/10/29 12:15:09 ERROR scheduler.JobScheduler: Error in job generator
> java.lang.NullPointerException
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>         at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>         at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>         at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>         at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>         at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>         at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>         at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 15/10/29 12:15:09 ERROR yarn.ApplicationMaster: User class threw
> exception: java.lang.NullPointerException
> java.lang.NullPointerException
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>         at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:172)
>         at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)
>         at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
>         at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)
>         at
> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableOnce$class.maxBy(TraversableOnce.scala:225)
>         at
> scala.collection.AbstractTraversable.maxBy(Traversable.scala:105)
>         at
> org.apache.spark.streaming.DStreamGraph.getMaxInputStreamRememberDuration(DStreamGraph.scala:172)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:267)
>         at org.apache.spark.streaming.scheduler.JobGenerator.org
> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:178)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:83)
>         at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:82)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>
>
> *Thanks*,
> <https://in.linkedin.com/in/ramkumarcs31>
>
>