You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "NITESH VERMA (JIRA)" <ji...@apache.org> on 2016/02/25 14:10:18 UTC

[jira] [Comment Edited] (SPARK-13488) PairDStreamFunctions.mapWithState fails in case timeout is set java.util.NoSuchElementException: None.get

    [ https://issues.apache.org/jira/browse/SPARK-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15167177#comment-15167177 ] 

NITESH VERMA edited comment on SPARK-13488 at 2/25/16 1:10 PM:
---------------------------------------------------------------

Hi Sean, well let me try to explain my problem.

i am trying to use new API mapWithState sample code as below from the link
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java

code snippet
 JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
        wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

here i have introduced timeout call in the last as below
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000)));

now i have faced above attached exceptions. One thing that i have changed in the deps is to use Guava library public library for optional instead of spark reported spark-4819 seems org.apache.spark.api.java.Optional  is not available with 1.6. could you please guide me the reason or understanding that i am missing.i am new to spark. 
my maven deps for spark
 <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.11</artifactId>
			<version>1.6.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.11</artifactId>
			<version>1.6.0</version>
		</dependency>
thanks






was (Author: niteshverma.aec@gmail.com):
Hi Sean, well let me try to explain my problem.

i am trying to use new API mapWithState sample code as below from the link
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java

code snippet
 JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
        wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

here i have introduced timeout call in the last as below
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000)));

now i have faced above attached exceptions. One thing that i have changed in the deps is to use Guava library public library for optional instead of spark reported spark-4819. could you please guide me the reason or understanding that i am missing.i am new to spark. thanks





> PairDStreamFunctions.mapWithState fails in case timeout is set java.util.NoSuchElementException: None.get
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-13488
>                 URL: https://issues.apache.org/jira/browse/SPARK-13488
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.6.0
>            Reporter: NITESH VERMA
>
> Using the new spark mapWithState API, I've encountered a issue when setting a timeout for mapWithState 
> hi i am using mapwithstate api with timeout functionality and  i am getting below mentioned exception when timeout interval hits for ideal data
> i am using example located here at this location https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
> but some changes done: 
>  1. org.apache.spark.api.java.Optional class is not available in 1.6 so i am using guava library for Optional
> 2. i have used timeout fucnctionality
> below is part of code :
>            
>     JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
>             new PairFunction<String, String, Integer>() {
>               @Override
>               public Tuple2<String, Integer> call(String s) {
>                 return new Tuple2<>(s, 1);
>               }
>             });
>     
>     
>     **// Update the cumulative count function
>     Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
>         new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
>           @Override
>           public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
>         	  
>         	
>             int sum = one.or(0) + (state.exists() ? state.get() : 0);
>             Tuple2<String, Integer> output = new Tuple2<>(word, sum);
>             state.update(sum);
>             return output;
>           }
>         };
>         
>         
>     // DStream made of get cumulative counts that get updated in every batch
>     JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
>     wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000) ));**
> when i ran above mentioned code i was getting below mentioned exception
>     16/02/25 11:41:33 ERROR Executor: Exception in task 0.0 in stage 157.0 (TID 22)
>     java.util.NoSuchElementException: None.get
>             at scala.None$.get(Option.scala:313)
>             at scala.None$.get(Option.scala:311)
>             at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
>             at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
>             at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>             at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
>             at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>             at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>             at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>             at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>             at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>             at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>             at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>             at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>             at org.apache.spark.scheduler.Task.run(Task.scala:89)
>             at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>             at java.lang.Thread.run(Thread.java:745)
>     16/02/25 11:41:33 WARN TaskSetManager: Lost task 0.0 in stage 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get
>             at scala.None$.get(Option.scala:313)
>             at scala.None$.get(Option.scala:311)
>             at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
>             at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
>             at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>             at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
>             at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>             at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>             at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>             at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>             at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>             at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>             at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>             at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>             at org.apache.spark.scheduler.Task.run(Task.scala:89)
>             at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>             at java.lang.Thread.run(Thread.java:745)
>     
>     16/02/25 11:41:33 ERROR TaskSetManager: Task 0 in stage 157.0 failed 1 times; aborting job
>     16/02/25 11:41:33 ERROR JobScheduler: Error running job streaming job 1456380693000 ms.0
>     org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get
>             at scala.None$.get(Option.scala:313)
>             at scala.None$.get(Option.scala:311)
>             at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
>             at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
>             at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
>             at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
>             at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>             at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>             at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
>             at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>             at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>             at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>             at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>             at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>             at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>             at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>             at org.apache.spark.scheduler.Task.run(Task.scala:89)
>             at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>             at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org