You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andrey Yegorov <an...@gmail.com> on 2016/01/25 22:12:16 UTC

mapWithState and context start when checkpoint exists

Hi,

I am new to spark (and scala) and hope someone can help me with the issue I
got stuck on in my experiments/learning.

mapWithState from spark 1.6 seems to be a great way for the task I want to
implement with spark but unfortunately I am getting error "RDD
transformations and actions can only be invoked by the driver, not inside
of other transformations" on job restart when checkpoint already exists.
Job starts and works ok if checkpoint is empty (this kind of defeats the
point of having the checkpoint).

I can reproduce it with ~65 lines of test code, see below.
Is there something that I am doing wrong there?

code:
----

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext, _}

object TestJob {
  def stateFunc(id: String,
                txt: Option[Iterable[String]],
                state: State[String]) : (String, Long) = {
    if (txt.nonEmpty) {
      val aggregatedString = state.getOption().getOrElse("") + txt
      state.update(aggregatedString)
      (id, aggregatedString.length)
    } else { // happens when state is timing out? any other cases?
      (id, 0)
    }
  }

  def createContext(checkpointDirectory: String): StreamingContext = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("test")

    val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
    ssc.checkpoint(checkpointDirectory)

    val input = Seq("1", "21", "321", "41", "42", "543", "67")
    val inputRdd = ssc.sparkContext.parallelize(input)
    val testStream = new ConstantInputDStream(ssc, inputRdd)

    val streamWithIds = testStream.map(x => (x.substring(0,1), x))
    val batched = streamWithIds.groupByKey()

    val stateSpec = StateSpec.function(stateFunc
_).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days

    val result = batched.mapWithState(stateSpec)
    result.print
    ssc
  }

  def main(args: Array[String]): Unit = {
    val checkpointDirectory = com.google.common.io.Files.createTempDir()
    checkpointDirectory.deleteOnExit()
    val checkpointDirectoryName = checkpointDirectory.getAbsolutePath

    val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
      () => {
        createContext(checkpointDirectoryName)
      })

    ssc.start()
    ssc.awaitTerminationOrTimeout(7000)
    ssc.stop()
    Thread.sleep(5000)

    val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
      () => {
        createContext(checkpointDirectoryName)
      })

    // terminates here with
    // Exception in thread "main" org.apache.spark.SparkException: RDD
transformations and actions can only be invoked by the driver, not inside
of other transformations; for example, rdd1.map(x => rdd2.values.count() *
x) is invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information, see
SPARK-5063.
    ssc2.start()
    ssc2.awaitTerminationOrTimeout(7000)
    ssc2.stop()
  }
}

----------
Andrey Yegorov

Re: mapWithState and context start when checkpoint exists

Posted by Andrey Yegorov <an...@gmail.com>.
Thank you!

what would be the best alternative to simulate a stream for testing
purposes from e.g. sequence or a text file?
In production I'll use kafka as a source but locally I wanted to mock it.
Worst case scenario I'll have setup/tear down kafka cluster in tests but I
think having a mock will be faster.


----------
Andrey Yegorov

On Mon, Jan 25, 2016 at 1:26 PM, Shixiong(Ryan) Zhu <shixiong@databricks.com
> wrote:

> Hey Andrey,
>
> `ConstantInputDStream` doesn't support checkpoint as it contains an RDD
> field. It cannot resume from checkpoints.
>
> On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov <an...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am new to spark (and scala) and hope someone can help me with the issue
>> I got stuck on in my experiments/learning.
>>
>> mapWithState from spark 1.6 seems to be a great way for the task I want
>> to implement with spark but unfortunately I am getting error "RDD
>> transformations and actions can only be invoked by the driver, not inside
>> of other transformations" on job restart when checkpoint already exists.
>> Job starts and works ok if checkpoint is empty (this kind of defeats the
>> point of having the checkpoint).
>>
>> I can reproduce it with ~65 lines of test code, see below.
>> Is there something that I am doing wrong there?
>>
>> code:
>> ----
>>
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming.dstream.ConstantInputDStream
>> import org.apache.spark.streaming.{Durations, StreamingContext, _}
>>
>> object TestJob {
>>   def stateFunc(id: String,
>>                 txt: Option[Iterable[String]],
>>                 state: State[String]) : (String, Long) = {
>>     if (txt.nonEmpty) {
>>       val aggregatedString = state.getOption().getOrElse("") + txt
>>       state.update(aggregatedString)
>>       (id, aggregatedString.length)
>>     } else { // happens when state is timing out? any other cases?
>>       (id, 0)
>>     }
>>   }
>>
>>   def createContext(checkpointDirectory: String): StreamingContext = {
>>     val sparkConf = new
>> SparkConf().setMaster("local[2]").setAppName("test")
>>
>>     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
>>     ssc.checkpoint(checkpointDirectory)
>>
>>     val input = Seq("1", "21", "321", "41", "42", "543", "67")
>>     val inputRdd = ssc.sparkContext.parallelize(input)
>>     val testStream = new ConstantInputDStream(ssc, inputRdd)
>>
>>     val streamWithIds = testStream.map(x => (x.substring(0,1), x))
>>     val batched = streamWithIds.groupByKey()
>>
>>     val stateSpec = StateSpec.function(stateFunc
>> _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days
>>
>>     val result = batched.mapWithState(stateSpec)
>>     result.print
>>     ssc
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>>     val checkpointDirectory = com.google.common.io.Files.createTempDir()
>>     checkpointDirectory.deleteOnExit()
>>     val checkpointDirectoryName = checkpointDirectory.getAbsolutePath
>>
>>     val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
>>       () => {
>>         createContext(checkpointDirectoryName)
>>       })
>>
>>     ssc.start()
>>     ssc.awaitTerminationOrTimeout(7000)
>>     ssc.stop()
>>     Thread.sleep(5000)
>>
>>     val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
>>       () => {
>>         createContext(checkpointDirectoryName)
>>       })
>>
>>     // terminates here with
>>     // Exception in thread "main" org.apache.spark.SparkException: RDD
>> transformations and actions can only be invoked by the driver, not inside
>> of other transformations; for example, rdd1.map(x => rdd2.values.count() *
>> x) is invalid because the values transformation and count action cannot be
>> performed inside of the rdd1.map transformation. For more information, see
>> SPARK-5063.
>>     ssc2.start()
>>     ssc2.awaitTerminationOrTimeout(7000)
>>     ssc2.stop()
>>   }
>> }
>>
>> ----------
>> Andrey Yegorov
>>
>
>

Re: mapWithState and context start when checkpoint exists

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
Hey Andrey,

`ConstantInputDStream` doesn't support checkpoint as it contains an RDD
field. It cannot resume from checkpoints.

On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov <an...@gmail.com>
wrote:

> Hi,
>
> I am new to spark (and scala) and hope someone can help me with the issue
> I got stuck on in my experiments/learning.
>
> mapWithState from spark 1.6 seems to be a great way for the task I want to
> implement with spark but unfortunately I am getting error "RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations" on job restart when checkpoint already exists.
> Job starts and works ok if checkpoint is empty (this kind of defeats the
> point of having the checkpoint).
>
> I can reproduce it with ~65 lines of test code, see below.
> Is there something that I am doing wrong there?
>
> code:
> ----
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.dstream.ConstantInputDStream
> import org.apache.spark.streaming.{Durations, StreamingContext, _}
>
> object TestJob {
>   def stateFunc(id: String,
>                 txt: Option[Iterable[String]],
>                 state: State[String]) : (String, Long) = {
>     if (txt.nonEmpty) {
>       val aggregatedString = state.getOption().getOrElse("") + txt
>       state.update(aggregatedString)
>       (id, aggregatedString.length)
>     } else { // happens when state is timing out? any other cases?
>       (id, 0)
>     }
>   }
>
>   def createContext(checkpointDirectory: String): StreamingContext = {
>     val sparkConf = new
> SparkConf().setMaster("local[2]").setAppName("test")
>
>     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
>     ssc.checkpoint(checkpointDirectory)
>
>     val input = Seq("1", "21", "321", "41", "42", "543", "67")
>     val inputRdd = ssc.sparkContext.parallelize(input)
>     val testStream = new ConstantInputDStream(ssc, inputRdd)
>
>     val streamWithIds = testStream.map(x => (x.substring(0,1), x))
>     val batched = streamWithIds.groupByKey()
>
>     val stateSpec = StateSpec.function(stateFunc
> _).numPartitions(3).timeout(Minutes(3 * 60 * 24)) // 3 days
>
>     val result = batched.mapWithState(stateSpec)
>     result.print
>     ssc
>   }
>
>   def main(args: Array[String]): Unit = {
>     val checkpointDirectory = com.google.common.io.Files.createTempDir()
>     checkpointDirectory.deleteOnExit()
>     val checkpointDirectoryName = checkpointDirectory.getAbsolutePath
>
>     val ssc = StreamingContext.getOrCreate(checkpointDirectoryName,
>       () => {
>         createContext(checkpointDirectoryName)
>       })
>
>     ssc.start()
>     ssc.awaitTerminationOrTimeout(7000)
>     ssc.stop()
>     Thread.sleep(5000)
>
>     val ssc2 = StreamingContext.getOrCreate(checkpointDirectoryName,
>       () => {
>         createContext(checkpointDirectoryName)
>       })
>
>     // terminates here with
>     // Exception in thread "main" org.apache.spark.SparkException: RDD
> transformations and actions can only be invoked by the driver, not inside
> of other transformations; for example, rdd1.map(x => rdd2.values.count() *
> x) is invalid because the values transformation and count action cannot be
> performed inside of the rdd1.map transformation. For more information, see
> SPARK-5063.
>     ssc2.start()
>     ssc2.awaitTerminationOrTimeout(7000)
>     ssc2.stop()
>   }
> }
>
> ----------
> Andrey Yegorov
>