You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "NITESH VERMA (JIRA)" <ji...@apache.org> on 2016/02/25 14:10:18 UTC
[jira] [Comment Edited] (SPARK-13488)
PairDStreamFunctions.mapWithState fails in case timeout is set
java.util.NoSuchElementException: None.get
[ https://issues.apache.org/jira/browse/SPARK-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15167177#comment-15167177 ]
NITESH VERMA edited comment on SPARK-13488 at 2/25/16 1:10 PM:
---------------------------------------------------------------
Hi Sean, well let me try to explain my problem.
i am trying to use new API mapWithState sample code as below from the link
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
code snippet
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
here i have introduced timeout call in the last as below
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000)));
now i have faced above attached exceptions. One thing that i have changed in the deps is to use Guava library public library for optional instead of spark reported spark-4819 seems org.apache.spark.api.java.Optional is not available with 1.6. could you please guide me the reason or understanding that i am missing.i am new to spark.
my maven deps for spark
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>1.6.0</version>
</dependency>
thanks
was (Author: niteshverma.aec@gmail.com):
Hi Sean, well let me try to explain my problem.
i am trying to use new API mapWithState sample code as below from the link
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
code snippet
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
here i have introduced timeout call in the last as below
wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000)));
now i have faced above attached exceptions. One thing that i have changed in the deps is to use Guava library public library for optional instead of spark reported spark-4819. could you please guide me the reason or understanding that i am missing.i am new to spark. thanks
> PairDStreamFunctions.mapWithState fails in case timeout is set java.util.NoSuchElementException: None.get
> ---------------------------------------------------------------------------------------------------------
>
> Key: SPARK-13488
> URL: https://issues.apache.org/jira/browse/SPARK-13488
> Project: Spark
> Issue Type: Bug
> Affects Versions: 1.6.0
> Reporter: NITESH VERMA
>
> Using the new spark mapWithState API, I've encountered a issue when setting a timeout for mapWithState
> hi i am using mapwithstate api with timeout functionality and i am getting below mentioned exception when timeout interval hits for ideal data
> i am using example located here at this location https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
> but some changes done:
> 1. org.apache.spark.api.java.Optional class is not available in 1.6 so i am using guava library for Optional
> 2. i have used timeout fucnctionality
> below is part of code :
>
> JavaPairDStream<String, Integer> wordsDstream = words.mapToPair(
> new PairFunction<String, String, Integer>() {
> @Override
> public Tuple2<String, Integer> call(String s) {
> return new Tuple2<>(s, 1);
> }
> });
>
>
> **// Update the cumulative count function
> Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
> new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
> @Override
> public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
>
>
> int sum = one.or(0) + (state.exists() ? state.get() : 0);
> Tuple2<String, Integer> output = new Tuple2<>(word, sum);
> state.update(sum);
> return output;
> }
> };
>
>
> // DStream made of get cumulative counts that get updated in every batch
> JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> stateDstream =
> wordsDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD).timeout(new Duration(1000) ));**
> when i ran above mentioned code i was getting below mentioned exception
> 16/02/25 11:41:33 ERROR Executor: Exception in task 0.0 in stage 157.0 (TID 22)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
> at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
> at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 16/02/25 11:41:33 WARN TaskSetManager: Lost task 0.0 in stage 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
> at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
> at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> 16/02/25 11:41:33 ERROR TaskSetManager: Task 0 in stage 157.0 failed 1 times; aborting job
> 16/02/25 11:41:33 ERROR JobScheduler: Error running job streaming job 1456380693000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 157.0 failed 1 times, most recent failure: Lost task 0.0 in stage 157.0 (TID 22, localhost): java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:222)
> at org.apache.spark.streaming.StateSpec$$anonfun$3.apply(StateSpec.scala:221)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:71)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$2.apply(MapWithStateRDD.scala:69)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
> at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org