You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Yuval.Itzchakov" <yu...@gmail.com> on 2016/02/04 15:56:53 UTC

PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Hi,
I've been playing with the expiramental PairDStreamFunctions.mapWithState
feature and I've seem to have stumbled across a bug, and was wondering if
anyone else has been seeing this behavior.

I've opened up an issue in the Spark JIRA, I simply want to pass this along
in case anyone else is experiencing such a failure or perhaps someone has
insightful information if this is actually a bug:  SPARK-13195
<https://issues.apache.org/jira/browse/SPARK-13195>  

Using the new spark mapWithState API, I've encountered a bug when setting a
timeout for mapWithState but no explicit state handling.

h1. Steps to reproduce:

1. Create a method which conforms to the StateSpec signature, make sure to
not update any state inside it using *state.update*. Simply create a "pass
through" method, may even be empty.
2. Create a StateSpec object with method from step 1, which explicitly sets
a timeout using *StateSpec.timeout* method.
3. Create a DStream pipeline that uses mapWithState with the given
StateSpec.
4. Run code using spark-submit. You'll see that the method ends up throwing
the following exception:

{code}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set
	at org.apache.spark.streaming.StateImpl.get(State.scala:150)
	at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
	at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at
org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
	at
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
{code}

h1. Sample code to reproduce the issue:

{code:Title=MainObject}
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by yuvali on 04/02/2016.
  */
object Program {

  def main(args: Array[String]): Unit = {
    
    val sc = new SparkConf().setAppName("mapWithState bug reproduce")
    val sparkContext = new SparkContext(sc)

    val ssc = new StreamingContext(sparkContext, Seconds(4))
    val stateSpec = StateSpec.function(trackStateFunc
_).timeout(Seconds(60))

    // Create a stream that generates 1000 lines per second
    val stream = ssc.receiverStream(new DummySource(10))

    // Split the lines into words, and create a paired (key-value) dstream
    val wordStream = stream.flatMap {
      _.split(" ")
    }.map(word => (word, 1))

    // This represents the emitted stream from the trackStateFunc. Since we
emit every input record with the updated value,
    // this stream will contain the same # of records as the input dstream.
    val wordCountStateStream = wordStream.mapWithState(stateSpec)
    wordCountStateStream.print()

    ssc.remember(Minutes(1)) // To make sure data is not deleted by the time
we query it interactively

    // Don't forget to set checkpoint directory
    ssc.checkpoint("")
    ssc.start()
    ssc.awaitTermination()
  }

  def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
    val output = (key, sum)
    Some(output)
  }
}
{code}

{code:Title=DummySource}

/**
  * Created by yuvali on 04/02/2016.
  */

import org.apache.spark.storage.StorageLevel
import scala.util.Random
import org.apache.spark.streaming.receiver._

class DummySource(ratePerSec: Int) extends
Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Dummy Source") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped
*/
  private def receive() {
    while(!isStopped()) {
      store("I am a dummy source " + Random.nextInt(10))
      Thread.sleep((1000.toDouble / ratePerSec).toInt)
    }
  }
}
{code}

The given issue resides in the following
*MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
following code block:

{code}
dataIterator.foreach { case (key, value) =>
      wrappedState.wrap(newStateMap.get(key))
      val returned = mappingFunction(batchTime, key, Some(value),
wrappedState)
      if (wrappedState.isRemoved) {
        newStateMap.remove(key)
      } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined)
/* <--- problem is here */ {
        newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
      }
      mappedData ++= returned
}
{code}

In case the stream has a timeout set, but the state wasn't set at all, the
"else-if" will still follow through because the timeout is defined but
"wrappedState" is empty and wasn't set.

If it is mandatory to update state for each entry of *mapWithState*, then
this code should throw a better exception than "NoSuchElementException",
which doesn't really saw anything to the developer.

I haven't provided a fix myself because I'm not familiar with the spark
implementation, but it seems to be there needs to either be an extra check
if the state is set, or as previously stated a better exception message.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Posted by Sachin Aggarwal <di...@gmail.com>.
I am sorry for spam, I replied in wrong thread sleepy head :-(

On Fri, Feb 5, 2016 at 1:15 AM, Sachin Aggarwal <di...@gmail.com>
wrote:

>
> http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/
>
> The default Tomcat server uses port 8080. You need to open that port on
> your instance to make sure your Tomcat server is available on the Web (you
> could also change the default port). In the AWS Management Console, select
> Security Groups (left navigation bar), select the quick-start group, the
> Inbound tab and add port 8080. Make sure you click “Add Rule” and then
> “Apply Rule Changes”.
>
> On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal <
> different.sachin@gmail.com> wrote:
>
>> i think we need to add port
>> http://serverfault.com/questions/317903/aws-ec2-open-port-8080
>>
>>
>> do u remember doing anything like this earlier for aws 1
>>
>> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Awesome. Thanks for the super fast reply.
>>>
>>>
>>> On Thu, Feb 4, 2016, 21:16 Tathagata Das <ta...@gmail.com>
>>> wrote:
>>>
>>>> Shixiong has already opened the PR -
>>>> https://github.com/apache/spark/pull/11081
>>>>
>>>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Let me know if you do need a pull request for this, I can make that
>>>>> happen (given someone does a vast PR to make sure I'm understanding this
>>>>> problem right).
>>>>>
>>>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>>>> shixiong@databricks.com> wrote:
>>>>>
>>>>>> Thanks for reporting it. I will take a look.
>>>>>>
>>>>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yu...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I've been playing with the expiramental
>>>>>>> PairDStreamFunctions.mapWithState
>>>>>>> feature and I've seem to have stumbled across a bug, and was
>>>>>>> wondering if
>>>>>>> anyone else has been seeing this behavior.
>>>>>>>
>>>>>>> I've opened up an issue in the Spark JIRA, I simply want to pass
>>>>>>> this along
>>>>>>> in case anyone else is experiencing such a failure or perhaps
>>>>>>> someone has
>>>>>>> insightful information if this is actually a bug:  SPARK-13195
>>>>>>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>>>>>>
>>>>>>> Using the new spark mapWithState API, I've encountered a bug when
>>>>>>> setting a
>>>>>>> timeout for mapWithState but no explicit state handling.
>>>>>>>
>>>>>>> h1. Steps to reproduce:
>>>>>>>
>>>>>>> 1. Create a method which conforms to the StateSpec signature, make
>>>>>>> sure to
>>>>>>> not update any state inside it using *state.update*. Simply create a
>>>>>>> "pass
>>>>>>> through" method, may even be empty.
>>>>>>> 2. Create a StateSpec object with method from step 1, which
>>>>>>> explicitly sets
>>>>>>> a timeout using *StateSpec.timeout* method.
>>>>>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>>>>>> StateSpec.
>>>>>>> 4. Run code using spark-submit. You'll see that the method ends up
>>>>>>> throwing
>>>>>>> the following exception:
>>>>>>>
>>>>>>> {code}
>>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 0 in
>>>>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in
>>>>>>> stage
>>>>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is
>>>>>>> not set
>>>>>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>>>>         at
>>>>>>> scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>>>>         at
>>>>>>>
>>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>>>         at
>>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>>         at
>>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>>>>         at
>>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>         at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>         at
>>>>>>>
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>>> {code}
>>>>>>>
>>>>>>> h1. Sample code to reproduce the issue:
>>>>>>>
>>>>>>> {code:Title=MainObject}
>>>>>>> import org.apache.spark.streaming._
>>>>>>> import org.apache.spark.{SparkConf, SparkContext}
>>>>>>> /**
>>>>>>>   * Created by yuvali on 04/02/2016.
>>>>>>>   */
>>>>>>> object Program {
>>>>>>>
>>>>>>>   def main(args: Array[String]): Unit = {
>>>>>>>
>>>>>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>>>>>     val sparkContext = new SparkContext(sc)
>>>>>>>
>>>>>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>>>>>     val stateSpec = StateSpec.function(trackStateFunc
>>>>>>> _).timeout(Seconds(60))
>>>>>>>
>>>>>>>     // Create a stream that generates 1000 lines per second
>>>>>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>>>>>
>>>>>>>     // Split the lines into words, and create a paired (key-value)
>>>>>>> dstream
>>>>>>>     val wordStream = stream.flatMap {
>>>>>>>       _.split(" ")
>>>>>>>     }.map(word => (word, 1))
>>>>>>>
>>>>>>>     // This represents the emitted stream from the trackStateFunc.
>>>>>>> Since we
>>>>>>> emit every input record with the updated value,
>>>>>>>     // this stream will contain the same # of records as the input
>>>>>>> dstream.
>>>>>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>>>>>     wordCountStateStream.print()
>>>>>>>
>>>>>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by
>>>>>>> the time
>>>>>>> we query it interactively
>>>>>>>
>>>>>>>     // Don't forget to set checkpoint directory
>>>>>>>     ssc.checkpoint("")
>>>>>>>     ssc.start()
>>>>>>>     ssc.awaitTermination()
>>>>>>>   }
>>>>>>>
>>>>>>>   def trackStateFunc(batchTime: Time, key: String, value:
>>>>>>> Option[Int],
>>>>>>> state: State[Long]): Option[(String, Long)] = {
>>>>>>>     val sum = value.getOrElse(0).toLong +
>>>>>>> state.getOption.getOrElse(0L)
>>>>>>>     val output = (key, sum)
>>>>>>>     Some(output)
>>>>>>>   }
>>>>>>> }
>>>>>>> {code}
>>>>>>>
>>>>>>> {code:Title=DummySource}
>>>>>>>
>>>>>>> /**
>>>>>>>   * Created by yuvali on 04/02/2016.
>>>>>>>   */
>>>>>>>
>>>>>>> import org.apache.spark.storage.StorageLevel
>>>>>>> import scala.util.Random
>>>>>>> import org.apache.spark.streaming.receiver._
>>>>>>>
>>>>>>> class DummySource(ratePerSec: Int) extends
>>>>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>>>>>
>>>>>>>   def onStart() {
>>>>>>>     // Start the thread that receives data over a connection
>>>>>>>     new Thread("Dummy Source") {
>>>>>>>       override def run() { receive() }
>>>>>>>     }.start()
>>>>>>>   }
>>>>>>>
>>>>>>>   def onStop() {
>>>>>>>     // There is nothing much to do as the thread calling receive()
>>>>>>>     // is designed to stop by itself isStopped() returns false
>>>>>>>   }
>>>>>>>
>>>>>>>   /** Create a socket connection and receive data until receiver is
>>>>>>> stopped
>>>>>>> */
>>>>>>>   private def receive() {
>>>>>>>     while(!isStopped()) {
>>>>>>>       store("I am a dummy source " + Random.nextInt(10))
>>>>>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>>>>>     }
>>>>>>>   }
>>>>>>> }
>>>>>>> {code}
>>>>>>>
>>>>>>> The given issue resides in the following
>>>>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in
>>>>>>> the
>>>>>>> following code block:
>>>>>>>
>>>>>>> {code}
>>>>>>> dataIterator.foreach { case (key, value) =>
>>>>>>>       wrappedState.wrap(newStateMap.get(key))
>>>>>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>>>>>> wrappedState)
>>>>>>>       if (wrappedState.isRemoved) {
>>>>>>>         newStateMap.remove(key)
>>>>>>>       } else if (wrappedState.isUpdated ||
>>>>>>> timeoutThresholdTime.isDefined)
>>>>>>> /* <--- problem is here */ {
>>>>>>>         newStateMap.put(key, wrappedState.get(),
>>>>>>> batchTime.milliseconds)
>>>>>>>       }
>>>>>>>       mappedData ++= returned
>>>>>>> }
>>>>>>> {code}
>>>>>>>
>>>>>>> In case the stream has a timeout set, but the state wasn't set at
>>>>>>> all, the
>>>>>>> "else-if" will still follow through because the timeout is defined
>>>>>>> but
>>>>>>> "wrappedState" is empty and wasn't set.
>>>>>>>
>>>>>>> If it is mandatory to update state for each entry of *mapWithState*,
>>>>>>> then
>>>>>>> this code should throw a better exception than
>>>>>>> "NoSuchElementException",
>>>>>>> which doesn't really saw anything to the developer.
>>>>>>>
>>>>>>> I haven't provided a fix myself because I'm not familiar with the
>>>>>>> spark
>>>>>>> implementation, but it seems to be there needs to either be an extra
>>>>>>> check
>>>>>>> if the state is set, or as previously stated a better exception
>>>>>>> message.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Yuval Itzchakov.
>>>>>
>>>>
>>>>
>>
>>
>> --
>>
>> Thanks & Regards
>>
>> Sachin Aggarwal
>> 7760502772
>>
>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>



-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Posted by Sachin Aggarwal <di...@gmail.com>.
http://coenraets.org/blog/2011/11/set-up-an-amazon-ec2-instance-with-tomcat-and-mysql-5-minutes-tutorial/

The default Tomcat server uses port 8080. You need to open that port on
your instance to make sure your Tomcat server is available on the Web (you
could also change the default port). In the AWS Management Console, select
Security Groups (left navigation bar), select the quick-start group, the
Inbound tab and add port 8080. Make sure you click “Add Rule” and then
“Apply Rule Changes”.

On Fri, Feb 5, 2016 at 1:14 AM, Sachin Aggarwal <di...@gmail.com>
wrote:

> i think we need to add port
> http://serverfault.com/questions/317903/aws-ec2-open-port-8080
>
>
> do u remember doing anything like this earlier for aws 1
>
> On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov <yu...@gmail.com> wrote:
>
>> Awesome. Thanks for the super fast reply.
>>
>>
>> On Thu, Feb 4, 2016, 21:16 Tathagata Das <ta...@gmail.com>
>> wrote:
>>
>>> Shixiong has already opened the PR -
>>> https://github.com/apache/spark/pull/11081
>>>
>>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>>> Let me know if you do need a pull request for this, I can make that
>>>> happen (given someone does a vast PR to make sure I'm understanding this
>>>> problem right).
>>>>
>>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>>> shixiong@databricks.com> wrote:
>>>>
>>>>> Thanks for reporting it. I will take a look.
>>>>>
>>>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I've been playing with the expiramental
>>>>>> PairDStreamFunctions.mapWithState
>>>>>> feature and I've seem to have stumbled across a bug, and was
>>>>>> wondering if
>>>>>> anyone else has been seeing this behavior.
>>>>>>
>>>>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>>>>> along
>>>>>> in case anyone else is experiencing such a failure or perhaps someone
>>>>>> has
>>>>>> insightful information if this is actually a bug:  SPARK-13195
>>>>>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>>>>>
>>>>>> Using the new spark mapWithState API, I've encountered a bug when
>>>>>> setting a
>>>>>> timeout for mapWithState but no explicit state handling.
>>>>>>
>>>>>> h1. Steps to reproduce:
>>>>>>
>>>>>> 1. Create a method which conforms to the StateSpec signature, make
>>>>>> sure to
>>>>>> not update any state inside it using *state.update*. Simply create a
>>>>>> "pass
>>>>>> through" method, may even be empty.
>>>>>> 2. Create a StateSpec object with method from step 1, which
>>>>>> explicitly sets
>>>>>> a timeout using *StateSpec.timeout* method.
>>>>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>>>>> StateSpec.
>>>>>> 4. Run code using spark-submit. You'll see that the method ends up
>>>>>> throwing
>>>>>> the following exception:
>>>>>>
>>>>>> {code}
>>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 0 in
>>>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in
>>>>>> stage
>>>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not
>>>>>> set
>>>>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>>         at
>>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>>>         at
>>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>>         at
>>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>>         at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>         at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>>> {code}
>>>>>>
>>>>>> h1. Sample code to reproduce the issue:
>>>>>>
>>>>>> {code:Title=MainObject}
>>>>>> import org.apache.spark.streaming._
>>>>>> import org.apache.spark.{SparkConf, SparkContext}
>>>>>> /**
>>>>>>   * Created by yuvali on 04/02/2016.
>>>>>>   */
>>>>>> object Program {
>>>>>>
>>>>>>   def main(args: Array[String]): Unit = {
>>>>>>
>>>>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>>>>     val sparkContext = new SparkContext(sc)
>>>>>>
>>>>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>>>>     val stateSpec = StateSpec.function(trackStateFunc
>>>>>> _).timeout(Seconds(60))
>>>>>>
>>>>>>     // Create a stream that generates 1000 lines per second
>>>>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>>>>
>>>>>>     // Split the lines into words, and create a paired (key-value)
>>>>>> dstream
>>>>>>     val wordStream = stream.flatMap {
>>>>>>       _.split(" ")
>>>>>>     }.map(word => (word, 1))
>>>>>>
>>>>>>     // This represents the emitted stream from the trackStateFunc.
>>>>>> Since we
>>>>>> emit every input record with the updated value,
>>>>>>     // this stream will contain the same # of records as the input
>>>>>> dstream.
>>>>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>>>>     wordCountStateStream.print()
>>>>>>
>>>>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by
>>>>>> the time
>>>>>> we query it interactively
>>>>>>
>>>>>>     // Don't forget to set checkpoint directory
>>>>>>     ssc.checkpoint("")
>>>>>>     ssc.start()
>>>>>>     ssc.awaitTermination()
>>>>>>   }
>>>>>>
>>>>>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>>>>>> state: State[Long]): Option[(String, Long)] = {
>>>>>>     val sum = value.getOrElse(0).toLong +
>>>>>> state.getOption.getOrElse(0L)
>>>>>>     val output = (key, sum)
>>>>>>     Some(output)
>>>>>>   }
>>>>>> }
>>>>>> {code}
>>>>>>
>>>>>> {code:Title=DummySource}
>>>>>>
>>>>>> /**
>>>>>>   * Created by yuvali on 04/02/2016.
>>>>>>   */
>>>>>>
>>>>>> import org.apache.spark.storage.StorageLevel
>>>>>> import scala.util.Random
>>>>>> import org.apache.spark.streaming.receiver._
>>>>>>
>>>>>> class DummySource(ratePerSec: Int) extends
>>>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>>>>
>>>>>>   def onStart() {
>>>>>>     // Start the thread that receives data over a connection
>>>>>>     new Thread("Dummy Source") {
>>>>>>       override def run() { receive() }
>>>>>>     }.start()
>>>>>>   }
>>>>>>
>>>>>>   def onStop() {
>>>>>>     // There is nothing much to do as the thread calling receive()
>>>>>>     // is designed to stop by itself isStopped() returns false
>>>>>>   }
>>>>>>
>>>>>>   /** Create a socket connection and receive data until receiver is
>>>>>> stopped
>>>>>> */
>>>>>>   private def receive() {
>>>>>>     while(!isStopped()) {
>>>>>>       store("I am a dummy source " + Random.nextInt(10))
>>>>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>>>>     }
>>>>>>   }
>>>>>> }
>>>>>> {code}
>>>>>>
>>>>>> The given issue resides in the following
>>>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>>>>>> following code block:
>>>>>>
>>>>>> {code}
>>>>>> dataIterator.foreach { case (key, value) =>
>>>>>>       wrappedState.wrap(newStateMap.get(key))
>>>>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>>>>> wrappedState)
>>>>>>       if (wrappedState.isRemoved) {
>>>>>>         newStateMap.remove(key)
>>>>>>       } else if (wrappedState.isUpdated ||
>>>>>> timeoutThresholdTime.isDefined)
>>>>>> /* <--- problem is here */ {
>>>>>>         newStateMap.put(key, wrappedState.get(),
>>>>>> batchTime.milliseconds)
>>>>>>       }
>>>>>>       mappedData ++= returned
>>>>>> }
>>>>>> {code}
>>>>>>
>>>>>> In case the stream has a timeout set, but the state wasn't set at
>>>>>> all, the
>>>>>> "else-if" will still follow through because the timeout is defined but
>>>>>> "wrappedState" is empty and wasn't set.
>>>>>>
>>>>>> If it is mandatory to update state for each entry of *mapWithState*,
>>>>>> then
>>>>>> this code should throw a better exception than
>>>>>> "NoSuchElementException",
>>>>>> which doesn't really saw anything to the developer.
>>>>>>
>>>>>> I haven't provided a fix myself because I'm not familiar with the
>>>>>> spark
>>>>>> implementation, but it seems to be there needs to either be an extra
>>>>>> check
>>>>>> if the state is set, or as previously stated a better exception
>>>>>> message.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Yuval Itzchakov.
>>>>
>>>
>>>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772
>



-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Posted by Sachin Aggarwal <di...@gmail.com>.
i think we need to add port
http://serverfault.com/questions/317903/aws-ec2-open-port-8080


do u remember doing anything like this earlier for aws 1

On Fri, Feb 5, 2016 at 1:07 AM, Yuval Itzchakov <yu...@gmail.com> wrote:

> Awesome. Thanks for the super fast reply.
>
>
> On Thu, Feb 4, 2016, 21:16 Tathagata Das <ta...@gmail.com>
> wrote:
>
>> Shixiong has already opened the PR -
>> https://github.com/apache/spark/pull/11081
>>
>> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Let me know if you do need a pull request for this, I can make that
>>> happen (given someone does a vast PR to make sure I'm understanding this
>>> problem right).
>>>
>>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>>> shixiong@databricks.com> wrote:
>>>
>>>> Thanks for reporting it. I will take a look.
>>>>
>>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I've been playing with the expiramental
>>>>> PairDStreamFunctions.mapWithState
>>>>> feature and I've seem to have stumbled across a bug, and was wondering
>>>>> if
>>>>> anyone else has been seeing this behavior.
>>>>>
>>>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>>>> along
>>>>> in case anyone else is experiencing such a failure or perhaps someone
>>>>> has
>>>>> insightful information if this is actually a bug:  SPARK-13195
>>>>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>>>>
>>>>> Using the new spark mapWithState API, I've encountered a bug when
>>>>> setting a
>>>>> timeout for mapWithState but no explicit state handling.
>>>>>
>>>>> h1. Steps to reproduce:
>>>>>
>>>>> 1. Create a method which conforms to the StateSpec signature, make
>>>>> sure to
>>>>> not update any state inside it using *state.update*. Simply create a
>>>>> "pass
>>>>> through" method, may even be empty.
>>>>> 2. Create a StateSpec object with method from step 1, which explicitly
>>>>> sets
>>>>> a timeout using *StateSpec.timeout* method.
>>>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>>>> StateSpec.
>>>>> 4. Run code using spark-submit. You'll see that the method ends up
>>>>> throwing
>>>>> the following exception:
>>>>>
>>>>> {code}
>>>>> org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>> Task 0 in
>>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not
>>>>> set
>>>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>         at
>>>>>
>>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>>         at
>>>>>
>>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>         at
>>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>>         at
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>>         at
>>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>>         at
>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>         at
>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>>         at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>         at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>> {code}
>>>>>
>>>>> h1. Sample code to reproduce the issue:
>>>>>
>>>>> {code:Title=MainObject}
>>>>> import org.apache.spark.streaming._
>>>>> import org.apache.spark.{SparkConf, SparkContext}
>>>>> /**
>>>>>   * Created by yuvali on 04/02/2016.
>>>>>   */
>>>>> object Program {
>>>>>
>>>>>   def main(args: Array[String]): Unit = {
>>>>>
>>>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>>>     val sparkContext = new SparkContext(sc)
>>>>>
>>>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>>>     val stateSpec = StateSpec.function(trackStateFunc
>>>>> _).timeout(Seconds(60))
>>>>>
>>>>>     // Create a stream that generates 1000 lines per second
>>>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>>>
>>>>>     // Split the lines into words, and create a paired (key-value)
>>>>> dstream
>>>>>     val wordStream = stream.flatMap {
>>>>>       _.split(" ")
>>>>>     }.map(word => (word, 1))
>>>>>
>>>>>     // This represents the emitted stream from the trackStateFunc.
>>>>> Since we
>>>>> emit every input record with the updated value,
>>>>>     // this stream will contain the same # of records as the input
>>>>> dstream.
>>>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>>>     wordCountStateStream.print()
>>>>>
>>>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by
>>>>> the time
>>>>> we query it interactively
>>>>>
>>>>>     // Don't forget to set checkpoint directory
>>>>>     ssc.checkpoint("")
>>>>>     ssc.start()
>>>>>     ssc.awaitTermination()
>>>>>   }
>>>>>
>>>>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>>>>> state: State[Long]): Option[(String, Long)] = {
>>>>>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>>>>>     val output = (key, sum)
>>>>>     Some(output)
>>>>>   }
>>>>> }
>>>>> {code}
>>>>>
>>>>> {code:Title=DummySource}
>>>>>
>>>>> /**
>>>>>   * Created by yuvali on 04/02/2016.
>>>>>   */
>>>>>
>>>>> import org.apache.spark.storage.StorageLevel
>>>>> import scala.util.Random
>>>>> import org.apache.spark.streaming.receiver._
>>>>>
>>>>> class DummySource(ratePerSec: Int) extends
>>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>>>
>>>>>   def onStart() {
>>>>>     // Start the thread that receives data over a connection
>>>>>     new Thread("Dummy Source") {
>>>>>       override def run() { receive() }
>>>>>     }.start()
>>>>>   }
>>>>>
>>>>>   def onStop() {
>>>>>     // There is nothing much to do as the thread calling receive()
>>>>>     // is designed to stop by itself isStopped() returns false
>>>>>   }
>>>>>
>>>>>   /** Create a socket connection and receive data until receiver is
>>>>> stopped
>>>>> */
>>>>>   private def receive() {
>>>>>     while(!isStopped()) {
>>>>>       store("I am a dummy source " + Random.nextInt(10))
>>>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>>>     }
>>>>>   }
>>>>> }
>>>>> {code}
>>>>>
>>>>> The given issue resides in the following
>>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>>>>> following code block:
>>>>>
>>>>> {code}
>>>>> dataIterator.foreach { case (key, value) =>
>>>>>       wrappedState.wrap(newStateMap.get(key))
>>>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>>>> wrappedState)
>>>>>       if (wrappedState.isRemoved) {
>>>>>         newStateMap.remove(key)
>>>>>       } else if (wrappedState.isUpdated ||
>>>>> timeoutThresholdTime.isDefined)
>>>>> /* <--- problem is here */ {
>>>>>         newStateMap.put(key, wrappedState.get(),
>>>>> batchTime.milliseconds)
>>>>>       }
>>>>>       mappedData ++= returned
>>>>> }
>>>>> {code}
>>>>>
>>>>> In case the stream has a timeout set, but the state wasn't set at all,
>>>>> the
>>>>> "else-if" will still follow through because the timeout is defined but
>>>>> "wrappedState" is empty and wasn't set.
>>>>>
>>>>> If it is mandatory to update state for each entry of *mapWithState*,
>>>>> then
>>>>> this code should throw a better exception than
>>>>> "NoSuchElementException",
>>>>> which doesn't really saw anything to the developer.
>>>>>
>>>>> I haven't provided a fix myself because I'm not familiar with the spark
>>>>> implementation, but it seems to be there needs to either be an extra
>>>>> check
>>>>> if the state is set, or as previously stated a better exception
>>>>> message.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>


-- 

Thanks & Regards

Sachin Aggarwal
7760502772

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Posted by Yuval Itzchakov <yu...@gmail.com>.
Awesome. Thanks for the super fast reply.

On Thu, Feb 4, 2016, 21:16 Tathagata Das <ta...@gmail.com>
wrote:

> Shixiong has already opened the PR -
> https://github.com/apache/spark/pull/11081
>
> On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yu...@gmail.com>
> wrote:
>
>> Let me know if you do need a pull request for this, I can make that
>> happen (given someone does a vast PR to make sure I'm understanding this
>> problem right).
>>
>> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
>> shixiong@databricks.com> wrote:
>>
>>> Thanks for reporting it. I will take a look.
>>>
>>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yu...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I've been playing with the expiramental
>>>> PairDStreamFunctions.mapWithState
>>>> feature and I've seem to have stumbled across a bug, and was wondering
>>>> if
>>>> anyone else has been seeing this behavior.
>>>>
>>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>>> along
>>>> in case anyone else is experiencing such a failure or perhaps someone
>>>> has
>>>> insightful information if this is actually a bug:  SPARK-13195
>>>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>>>
>>>> Using the new spark mapWithState API, I've encountered a bug when
>>>> setting a
>>>> timeout for mapWithState but no explicit state handling.
>>>>
>>>> h1. Steps to reproduce:
>>>>
>>>> 1. Create a method which conforms to the StateSpec signature, make sure
>>>> to
>>>> not update any state inside it using *state.update*. Simply create a
>>>> "pass
>>>> through" method, may even be empty.
>>>> 2. Create a StateSpec object with method from step 1, which explicitly
>>>> sets
>>>> a timeout using *StateSpec.timeout* method.
>>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>>> StateSpec.
>>>> 4. Run code using spark-submit. You'll see that the method ends up
>>>> throwing
>>>> the following exception:
>>>>
>>>> {code}
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 0 in
>>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not
>>>> set
>>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>         at
>>>>
>>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>>         at
>>>>
>>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at
>>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>> {code}
>>>>
>>>> h1. Sample code to reproduce the issue:
>>>>
>>>> {code:Title=MainObject}
>>>> import org.apache.spark.streaming._
>>>> import org.apache.spark.{SparkConf, SparkContext}
>>>> /**
>>>>   * Created by yuvali on 04/02/2016.
>>>>   */
>>>> object Program {
>>>>
>>>>   def main(args: Array[String]): Unit = {
>>>>
>>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>>     val sparkContext = new SparkContext(sc)
>>>>
>>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>>     val stateSpec = StateSpec.function(trackStateFunc
>>>> _).timeout(Seconds(60))
>>>>
>>>>     // Create a stream that generates 1000 lines per second
>>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>>
>>>>     // Split the lines into words, and create a paired (key-value)
>>>> dstream
>>>>     val wordStream = stream.flatMap {
>>>>       _.split(" ")
>>>>     }.map(word => (word, 1))
>>>>
>>>>     // This represents the emitted stream from the trackStateFunc.
>>>> Since we
>>>> emit every input record with the updated value,
>>>>     // this stream will contain the same # of records as the input
>>>> dstream.
>>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>>     wordCountStateStream.print()
>>>>
>>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the
>>>> time
>>>> we query it interactively
>>>>
>>>>     // Don't forget to set checkpoint directory
>>>>     ssc.checkpoint("")
>>>>     ssc.start()
>>>>     ssc.awaitTermination()
>>>>   }
>>>>
>>>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>>>> state: State[Long]): Option[(String, Long)] = {
>>>>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>>>>     val output = (key, sum)
>>>>     Some(output)
>>>>   }
>>>> }
>>>> {code}
>>>>
>>>> {code:Title=DummySource}
>>>>
>>>> /**
>>>>   * Created by yuvali on 04/02/2016.
>>>>   */
>>>>
>>>> import org.apache.spark.storage.StorageLevel
>>>> import scala.util.Random
>>>> import org.apache.spark.streaming.receiver._
>>>>
>>>> class DummySource(ratePerSec: Int) extends
>>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>>
>>>>   def onStart() {
>>>>     // Start the thread that receives data over a connection
>>>>     new Thread("Dummy Source") {
>>>>       override def run() { receive() }
>>>>     }.start()
>>>>   }
>>>>
>>>>   def onStop() {
>>>>     // There is nothing much to do as the thread calling receive()
>>>>     // is designed to stop by itself isStopped() returns false
>>>>   }
>>>>
>>>>   /** Create a socket connection and receive data until receiver is
>>>> stopped
>>>> */
>>>>   private def receive() {
>>>>     while(!isStopped()) {
>>>>       store("I am a dummy source " + Random.nextInt(10))
>>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>>     }
>>>>   }
>>>> }
>>>> {code}
>>>>
>>>> The given issue resides in the following
>>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>>>> following code block:
>>>>
>>>> {code}
>>>> dataIterator.foreach { case (key, value) =>
>>>>       wrappedState.wrap(newStateMap.get(key))
>>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>>> wrappedState)
>>>>       if (wrappedState.isRemoved) {
>>>>         newStateMap.remove(key)
>>>>       } else if (wrappedState.isUpdated ||
>>>> timeoutThresholdTime.isDefined)
>>>> /* <--- problem is here */ {
>>>>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>>>>       }
>>>>       mappedData ++= returned
>>>> }
>>>> {code}
>>>>
>>>> In case the stream has a timeout set, but the state wasn't set at all,
>>>> the
>>>> "else-if" will still follow through because the timeout is defined but
>>>> "wrappedState" is empty and wasn't set.
>>>>
>>>> If it is mandatory to update state for each entry of *mapWithState*,
>>>> then
>>>> this code should throw a better exception than "NoSuchElementException",
>>>> which doesn't really saw anything to the developer.
>>>>
>>>> I haven't provided a fix myself because I'm not familiar with the spark
>>>> implementation, but it seems to be there needs to either be an extra
>>>> check
>>>> if the state is set, or as previously stated a better exception message.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Posted by Tathagata Das <ta...@gmail.com>.
Shixiong has already opened the PR -
https://github.com/apache/spark/pull/11081

On Thu, Feb 4, 2016 at 11:11 AM, Yuval Itzchakov <yu...@gmail.com> wrote:

> Let me know if you do need a pull request for this, I can make that happen
> (given someone does a vast PR to make sure I'm understanding this problem
> right).
>
> On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <
> shixiong@databricks.com> wrote:
>
>> Thanks for reporting it. I will take a look.
>>
>> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I've been playing with the expiramental PairDStreamFunctions.mapWithState
>>> feature and I've seem to have stumbled across a bug, and was wondering if
>>> anyone else has been seeing this behavior.
>>>
>>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>>> along
>>> in case anyone else is experiencing such a failure or perhaps someone has
>>> insightful information if this is actually a bug:  SPARK-13195
>>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>>
>>> Using the new spark mapWithState API, I've encountered a bug when
>>> setting a
>>> timeout for mapWithState but no explicit state handling.
>>>
>>> h1. Steps to reproduce:
>>>
>>> 1. Create a method which conforms to the StateSpec signature, make sure
>>> to
>>> not update any state inside it using *state.update*. Simply create a
>>> "pass
>>> through" method, may even be empty.
>>> 2. Create a StateSpec object with method from step 1, which explicitly
>>> sets
>>> a timeout using *StateSpec.timeout* method.
>>> 3. Create a DStream pipeline that uses mapWithState with the given
>>> StateSpec.
>>> 4. Run code using spark-submit. You'll see that the method ends up
>>> throwing
>>> the following exception:
>>>
>>> {code}
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 0 in
>>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set
>>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>>         at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>>         at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>         at
>>>
>>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>>         at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>>         at
>>>
>>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at
>>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>>         at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>>         at
>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>>         at
>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>>
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>> {code}
>>>
>>> h1. Sample code to reproduce the issue:
>>>
>>> {code:Title=MainObject}
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.{SparkConf, SparkContext}
>>> /**
>>>   * Created by yuvali on 04/02/2016.
>>>   */
>>> object Program {
>>>
>>>   def main(args: Array[String]): Unit = {
>>>
>>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>>     val sparkContext = new SparkContext(sc)
>>>
>>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>>     val stateSpec = StateSpec.function(trackStateFunc
>>> _).timeout(Seconds(60))
>>>
>>>     // Create a stream that generates 1000 lines per second
>>>     val stream = ssc.receiverStream(new DummySource(10))
>>>
>>>     // Split the lines into words, and create a paired (key-value)
>>> dstream
>>>     val wordStream = stream.flatMap {
>>>       _.split(" ")
>>>     }.map(word => (word, 1))
>>>
>>>     // This represents the emitted stream from the trackStateFunc. Since
>>> we
>>> emit every input record with the updated value,
>>>     // this stream will contain the same # of records as the input
>>> dstream.
>>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>>     wordCountStateStream.print()
>>>
>>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the
>>> time
>>> we query it interactively
>>>
>>>     // Don't forget to set checkpoint directory
>>>     ssc.checkpoint("")
>>>     ssc.start()
>>>     ssc.awaitTermination()
>>>   }
>>>
>>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>>> state: State[Long]): Option[(String, Long)] = {
>>>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>>>     val output = (key, sum)
>>>     Some(output)
>>>   }
>>> }
>>> {code}
>>>
>>> {code:Title=DummySource}
>>>
>>> /**
>>>   * Created by yuvali on 04/02/2016.
>>>   */
>>>
>>> import org.apache.spark.storage.StorageLevel
>>> import scala.util.Random
>>> import org.apache.spark.streaming.receiver._
>>>
>>> class DummySource(ratePerSec: Int) extends
>>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>>
>>>   def onStart() {
>>>     // Start the thread that receives data over a connection
>>>     new Thread("Dummy Source") {
>>>       override def run() { receive() }
>>>     }.start()
>>>   }
>>>
>>>   def onStop() {
>>>     // There is nothing much to do as the thread calling receive()
>>>     // is designed to stop by itself isStopped() returns false
>>>   }
>>>
>>>   /** Create a socket connection and receive data until receiver is
>>> stopped
>>> */
>>>   private def receive() {
>>>     while(!isStopped()) {
>>>       store("I am a dummy source " + Random.nextInt(10))
>>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>>     }
>>>   }
>>> }
>>> {code}
>>>
>>> The given issue resides in the following
>>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>>> following code block:
>>>
>>> {code}
>>> dataIterator.foreach { case (key, value) =>
>>>       wrappedState.wrap(newStateMap.get(key))
>>>       val returned = mappingFunction(batchTime, key, Some(value),
>>> wrappedState)
>>>       if (wrappedState.isRemoved) {
>>>         newStateMap.remove(key)
>>>       } else if (wrappedState.isUpdated ||
>>> timeoutThresholdTime.isDefined)
>>> /* <--- problem is here */ {
>>>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>>>       }
>>>       mappedData ++= returned
>>> }
>>> {code}
>>>
>>> In case the stream has a timeout set, but the state wasn't set at all,
>>> the
>>> "else-if" will still follow through because the timeout is defined but
>>> "wrappedState" is empty and wasn't set.
>>>
>>> If it is mandatory to update state for each entry of *mapWithState*, then
>>> this code should throw a better exception than "NoSuchElementException",
>>> which doesn't really saw anything to the developer.
>>>
>>> I haven't provided a fix myself because I'm not familiar with the spark
>>> implementation, but it seems to be there needs to either be an extra
>>> check
>>> if the state is set, or as previously stated a better exception message.
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Posted by Yuval Itzchakov <yu...@gmail.com>.
Let me know if you do need a pull request for this, I can make that happen
(given someone does a vast PR to make sure I'm understanding this problem
right).

On Thu, Feb 4, 2016 at 8:21 PM, Shixiong(Ryan) Zhu <sh...@databricks.com>
wrote:

> Thanks for reporting it. I will take a look.
>
> On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yu...@gmail.com> wrote:
>
>> Hi,
>> I've been playing with the expiramental PairDStreamFunctions.mapWithState
>> feature and I've seem to have stumbled across a bug, and was wondering if
>> anyone else has been seeing this behavior.
>>
>> I've opened up an issue in the Spark JIRA, I simply want to pass this
>> along
>> in case anyone else is experiencing such a failure or perhaps someone has
>> insightful information if this is actually a bug:  SPARK-13195
>> <https://issues.apache.org/jira/browse/SPARK-13195>
>>
>> Using the new spark mapWithState API, I've encountered a bug when setting
>> a
>> timeout for mapWithState but no explicit state handling.
>>
>> h1. Steps to reproduce:
>>
>> 1. Create a method which conforms to the StateSpec signature, make sure to
>> not update any state inside it using *state.update*. Simply create a "pass
>> through" method, may even be empty.
>> 2. Create a StateSpec object with method from step 1, which explicitly
>> sets
>> a timeout using *StateSpec.timeout* method.
>> 3. Create a DStream pipeline that uses mapWithState with the given
>> StateSpec.
>> 4. Run code using spark-submit. You'll see that the method ends up
>> throwing
>> the following exception:
>>
>> {code}
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in
>> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set
>>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at
>>
>> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>>         at
>>
>> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at
>> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>>         at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>         at java.lang.Thread.run(Thread.java:745)
>> {code}
>>
>> h1. Sample code to reproduce the issue:
>>
>> {code:Title=MainObject}
>> import org.apache.spark.streaming._
>> import org.apache.spark.{SparkConf, SparkContext}
>> /**
>>   * Created by yuvali on 04/02/2016.
>>   */
>> object Program {
>>
>>   def main(args: Array[String]): Unit = {
>>
>>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>>     val sparkContext = new SparkContext(sc)
>>
>>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>>     val stateSpec = StateSpec.function(trackStateFunc
>> _).timeout(Seconds(60))
>>
>>     // Create a stream that generates 1000 lines per second
>>     val stream = ssc.receiverStream(new DummySource(10))
>>
>>     // Split the lines into words, and create a paired (key-value) dstream
>>     val wordStream = stream.flatMap {
>>       _.split(" ")
>>     }.map(word => (word, 1))
>>
>>     // This represents the emitted stream from the trackStateFunc. Since
>> we
>> emit every input record with the updated value,
>>     // this stream will contain the same # of records as the input
>> dstream.
>>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>>     wordCountStateStream.print()
>>
>>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the
>> time
>> we query it interactively
>>
>>     // Don't forget to set checkpoint directory
>>     ssc.checkpoint("")
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>>
>>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
>> state: State[Long]): Option[(String, Long)] = {
>>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>>     val output = (key, sum)
>>     Some(output)
>>   }
>> }
>> {code}
>>
>> {code:Title=DummySource}
>>
>> /**
>>   * Created by yuvali on 04/02/2016.
>>   */
>>
>> import org.apache.spark.storage.StorageLevel
>> import scala.util.Random
>> import org.apache.spark.streaming.receiver._
>>
>> class DummySource(ratePerSec: Int) extends
>> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>>
>>   def onStart() {
>>     // Start the thread that receives data over a connection
>>     new Thread("Dummy Source") {
>>       override def run() { receive() }
>>     }.start()
>>   }
>>
>>   def onStop() {
>>     // There is nothing much to do as the thread calling receive()
>>     // is designed to stop by itself isStopped() returns false
>>   }
>>
>>   /** Create a socket connection and receive data until receiver is
>> stopped
>> */
>>   private def receive() {
>>     while(!isStopped()) {
>>       store("I am a dummy source " + Random.nextInt(10))
>>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>>     }
>>   }
>> }
>> {code}
>>
>> The given issue resides in the following
>> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
>> following code block:
>>
>> {code}
>> dataIterator.foreach { case (key, value) =>
>>       wrappedState.wrap(newStateMap.get(key))
>>       val returned = mappingFunction(batchTime, key, Some(value),
>> wrappedState)
>>       if (wrappedState.isRemoved) {
>>         newStateMap.remove(key)
>>       } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined)
>> /* <--- problem is here */ {
>>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>>       }
>>       mappedData ++= returned
>> }
>> {code}
>>
>> In case the stream has a timeout set, but the state wasn't set at all, the
>> "else-if" will still follow through because the timeout is defined but
>> "wrappedState" is empty and wasn't set.
>>
>> If it is mandatory to update state for each entry of *mapWithState*, then
>> this code should throw a better exception than "NoSuchElementException",
>> which doesn't really saw anything to the developer.
>>
>> I haven't provided a fix myself because I'm not familiar with the spark
>> implementation, but it seems to be there needs to either be an extra check
>> if the state is set, or as previously stated a better exception message.
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>


-- 
Best Regards,
Yuval Itzchakov.

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Thanks for reporting it. I will take a look.

On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov <yu...@gmail.com> wrote:

> Hi,
> I've been playing with the expiramental PairDStreamFunctions.mapWithState
> feature and I've seem to have stumbled across a bug, and was wondering if
> anyone else has been seeing this behavior.
>
> I've opened up an issue in the Spark JIRA, I simply want to pass this along
> in case anyone else is experiencing such a failure or perhaps someone has
> insightful information if this is actually a bug:  SPARK-13195
> <https://issues.apache.org/jira/browse/SPARK-13195>
>
> Using the new spark mapWithState API, I've encountered a bug when setting a
> timeout for mapWithState but no explicit state handling.
>
> h1. Steps to reproduce:
>
> 1. Create a method which conforms to the StateSpec signature, make sure to
> not update any state inside it using *state.update*. Simply create a "pass
> through" method, may even be empty.
> 2. Create a StateSpec object with method from step 1, which explicitly sets
> a timeout using *StateSpec.timeout* method.
> 3. Create a DStream pipeline that uses mapWithState with the given
> StateSpec.
> 4. Run code using spark-submit. You'll see that the method ends up throwing
> the following exception:
>
> {code}
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 136.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 136.0 (TID 176, ****): java.util.NoSuchElementException: State is not set
>         at org.apache.spark.streaming.StateImpl.get(State.scala:150)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:61)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at
>
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
>         at
>
> org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
>         at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>         at org.apache.spark.scheduler.Task.run(Task.scala:89)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
>
> h1. Sample code to reproduce the issue:
>
> {code:Title=MainObject}
> import org.apache.spark.streaming._
> import org.apache.spark.{SparkConf, SparkContext}
> /**
>   * Created by yuvali on 04/02/2016.
>   */
> object Program {
>
>   def main(args: Array[String]): Unit = {
>
>     val sc = new SparkConf().setAppName("mapWithState bug reproduce")
>     val sparkContext = new SparkContext(sc)
>
>     val ssc = new StreamingContext(sparkContext, Seconds(4))
>     val stateSpec = StateSpec.function(trackStateFunc
> _).timeout(Seconds(60))
>
>     // Create a stream that generates 1000 lines per second
>     val stream = ssc.receiverStream(new DummySource(10))
>
>     // Split the lines into words, and create a paired (key-value) dstream
>     val wordStream = stream.flatMap {
>       _.split(" ")
>     }.map(word => (word, 1))
>
>     // This represents the emitted stream from the trackStateFunc. Since we
> emit every input record with the updated value,
>     // this stream will contain the same # of records as the input dstream.
>     val wordCountStateStream = wordStream.mapWithState(stateSpec)
>     wordCountStateStream.print()
>
>     ssc.remember(Minutes(1)) // To make sure data is not deleted by the
> time
> we query it interactively
>
>     // Don't forget to set checkpoint directory
>     ssc.checkpoint("")
>     ssc.start()
>     ssc.awaitTermination()
>   }
>
>   def trackStateFunc(batchTime: Time, key: String, value: Option[Int],
> state: State[Long]): Option[(String, Long)] = {
>     val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
>     val output = (key, sum)
>     Some(output)
>   }
> }
> {code}
>
> {code:Title=DummySource}
>
> /**
>   * Created by yuvali on 04/02/2016.
>   */
>
> import org.apache.spark.storage.StorageLevel
> import scala.util.Random
> import org.apache.spark.streaming.receiver._
>
> class DummySource(ratePerSec: Int) extends
> Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
>
>   def onStart() {
>     // Start the thread that receives data over a connection
>     new Thread("Dummy Source") {
>       override def run() { receive() }
>     }.start()
>   }
>
>   def onStop() {
>     // There is nothing much to do as the thread calling receive()
>     // is designed to stop by itself isStopped() returns false
>   }
>
>   /** Create a socket connection and receive data until receiver is stopped
> */
>   private def receive() {
>     while(!isStopped()) {
>       store("I am a dummy source " + Random.nextInt(10))
>       Thread.sleep((1000.toDouble / ratePerSec).toInt)
>     }
>   }
> }
> {code}
>
> The given issue resides in the following
> *MapWithStateRDDRecord.updateRecordWithData*, starting line 55, in the
> following code block:
>
> {code}
> dataIterator.foreach { case (key, value) =>
>       wrappedState.wrap(newStateMap.get(key))
>       val returned = mappingFunction(batchTime, key, Some(value),
> wrappedState)
>       if (wrappedState.isRemoved) {
>         newStateMap.remove(key)
>       } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined)
> /* <--- problem is here */ {
>         newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
>       }
>       mappedData ++= returned
> }
> {code}
>
> In case the stream has a timeout set, but the state wasn't set at all, the
> "else-if" will still follow through because the timeout is defined but
> "wrappedState" is empty and wasn't set.
>
> If it is mandatory to update state for each entry of *mapWithState*, then
> this code should throw a better exception than "NoSuchElementException",
> which doesn't really saw anything to the developer.
>
> I haven't provided a fix myself because I'm not familiar with the spark
> implementation, but it seems to be there needs to either be an extra check
> if the state is set, or as previously stated a better exception message.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/PairDStreamFunctions-mapWithState-fails-in-case-timeout-is-set-without-updating-State-S-tp26147.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>