You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by anoldbrain <an...@gmail.com> on 2014/02/26 03:52:08 UTC

NullPointerException from 'Count' on DStream

Dear all,

I encountered NullPointerException running a simple program like below:


> val sparkconf = new SparkConf()
>     .setMaster(master)
>     .setAppName("myapp")
>     // and other setups
> 
> val ssc = new StreamingContext(sparkconf, Seconds(30))
> val flume = new FlumeInputDStream(ssc, flume_sink_ip, flume_sink_port,
> StorageLevel.MEMORY_AND_DISK_SER_2)
> 
> val messages = flume.map(x => {
>     val charset = Charset.forName("UTF-8")
>     val decoder = charset.newDecoder()
>     val msg = decoder.decode(x.event.getBody()).toString()
>     msg
> })
> 
> // "messages.count" does not throw NullPointerException
> messages.count.foreachRDD(rdd => {
>     ()
> })
> messages.print()

If calling "messages.count" or 'messages.count.map', no exception is thrown.
If using 'messages.count.foreachRDD', NullPointerException is thrown.
Console output snippets:

> ....
> 14/02/26 10:36:51 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
> Registering block manager node-005:36924 with 294.4 MB RAM
> 14/02/26 10:37:00 ERROR scheduler.JobScheduler: Error generating jobs for
> time 1393382220000 ms
> java.lang.NullPointerException
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:547)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:545)
> 	at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> 	at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
> 	at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> 	at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
> 	at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> 	at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
> 	at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> 	at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> 	at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> 	at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> 	at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161)
> 	at scala.util.Try$.apply(Try.scala:161)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:161)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:105)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:70)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [error] (run-main-0) java.lang.NullPointerException
> java.lang.NullPointerException
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$count$3.apply(DStream.scala:487)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1.apply(DStream.scala:536)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:547)
> 	at
> org.apache.spark.streaming.dstream.DStream$$anonfun$4.apply(DStream.scala:545)
> 	at
> org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
> 	at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
> 	at
> org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
> 	at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
> 	at
> org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
> 	at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
> 	at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> 	at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> 	at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
> 	at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> 	at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> 	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> 	at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:161)
> 	at scala.util.Try$.apply(Try.scala:161)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:161)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:105)
> 	at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:70)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [trace] Stack trace suppressed: run last *:runMain for the full output.

What is wrong with doing 'foreachRDD' on a 'Count'-ed DStream?

Thank you.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-Count-on-DStream-tp2066.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: NullPointerException from '.count.foreachRDD'

Posted by anoldbrain <an...@gmail.com>.
Thank you for the reply. I implemented my InputDStream to return None when
there's no data. After changing it to return empty RDD, the exception is
gone.

I am curious as to why all other processings worked correctly with my old
incorrect implementation, with or without data? My actual codes, without the
count() part, did glom then foreachRDD.

I have StatsReportListener registered and when there was no data, there was
no StatsReportListener output. And I thought this was Spark's smart logic to
avoid launching workers when there was no data. Wouldn't have thought it was
actually an indication that I had my InputDStream implementation wrong. On
the other hand, why use return type "Option" if None should not be used at
all?

Thanks for help solving my problem.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12476.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: NullPointerException from '.count.foreachRDD'

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi,

I don't think there's a NPE issue when using DStream/count() even there is no data feed into Spark Streaming. I tested using Kafka in my local settings, both are OK with and without data consumed.

Actually you can see the details in ReceiverInputDStream, even there is no data in this batch duration, it will generate an empty BlockRDD, so map() and transformation() in count() operator will never meet NPE. I think the problem may lies on your customized InputDStream, you should make sure to generate an empty RDD even when there is no data feed in.

Thanks
Jerry

-----Original Message-----
From: anoldbrain [mailto:anoldbrain@gmail.com] 
Sent: Wednesday, August 20, 2014 4:13 PM
To: user@spark.incubator.apache.org
Subject: Re: NullPointerException from '.count.foreachRDD'

Looking at the source codes of DStream.scala


>   /**
>    * Return a new DStream in which each RDD has a single element 
> generated by counting each RDD
>    * of this DStream.
>    */
>   def count(): DStream[Long] = {
>     this.map(_ => (null, 1L))
>         .transform(_.union(context.sparkContext.makeRDD(Seq((null, 
> 0L)),
> 1)))
>         .reduceByKey(_ + _)
>         .map(_._2)
>   }

transform is the line throwing the NullPointerException. Can anyone give some hints as what would cause "_" to be null (it is indeed null)? This only happens when there is no data to process.

When there's data, no NullPointerException is thrown, and all the processing/counting proceeds correctly.

I am using my custom InputDStream. Could it be that this is the source of the problem?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: NullPointerException from '.count.foreachRDD'

Posted by anoldbrain <an...@gmail.com>.
Looking at the source codes of DStream.scala


>   /**
>    * Return a new DStream in which each RDD has a single element generated
> by counting each RDD
>    * of this DStream.
>    */
>   def count(): DStream[Long] = {
>     this.map(_ => (null, 1L))
>         .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)),
> 1)))
>         .reduceByKey(_ + _)
>         .map(_._2)
>   }

transform is the line throwing the NullPointerException. Can anyone give
some hints as what would cause "_" to be null (it is indeed null)? This only
happens when there is no data to process.

When there's data, no NullPointerException is thrown, and all the
processing/counting proceeds correctly.

I am using my custom InputDStream. Could it be that this is the source of
the problem?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-from-count-foreachRDD-tp2066p12461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org