You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Joe Panciera <jo...@gmail.com> on 2016/07/22 20:50:22 UTC

Fatal error when using broadcast variables and checkpointing in Spark Streaming

Hi,

I'm attempting to use broadcast variables to update stateful values used
across the cluster for processing. Essentially, I have a function that is
executed in .foreachRDD that updates the broadcast variable by calling
unpersist() and then rebroadcasting. This works without issues when I
execute the code without checkpointing, but as soon as I include
checkpointing it seems to be unable to pickle the function. I get this
error:

*It appears that you are attempting to reference SparkContext from a
broadcast *

  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
268, in __getnewargs__
    "It appears that you are attempting to reference SparkContext from a
broadcast "
Exception: It appears that you are attempting to reference SparkContext
from a broadcast variable, action, or transformation. SparkContext can only
be used on the driver, not in code that it run on workers. For more
information, see SPARK-5063.

        at
org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
        at
org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
        ... 61 more


Here's some simple code that shows this occurring:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream



sc = SparkContext(appName="FileAutomation")

# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
                                         "Events",  # App Name
                                         "Event_Test",  # Stream Name

"https://kinesis.us-west-2.amazonaws.com",
                                         "us-west-2",
                                         InitialPositionInStream.LATEST,
                                         10000
                                         )

events = sc.broadcast(25)


def test(rdd):

    global events
    num = events.value
    print num

    events.unpersist()
    events = sc.broadcast(num + 1)


events.foreachRDD(test)

# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()

Re: Fatal error when using broadcast variables and checkpointing in Spark Streaming

Posted by Joe Panciera <jo...@gmail.com>.
I realized that there's an error in the code. Corrected:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream



sc = SparkContext(appName="FileAutomation")

# Create streaming context from existing spark context
ssc = StreamingContext(sc, 10)
alert_stream = KinesisUtils.createStream(ssc,
                                         "Events",  # App Name
                                         "Event_Test",  # Stream Name

"https://kinesis.us-west-2.amazonaws.com",
                                         "us-west-2",
                                         InitialPositionInStream.LATEST,
                                         10000
                                         )

events = sc.broadcast(25)


def test(rdd):

    global events
    num = events.value
    print num

    events.unpersist()
    events = sc.broadcast(num + 1)


alert_stream.foreachRDD(test)

# Comment this line and no error occurs
ssc.checkpoint('dir')
ssc.start()
ssc.awaitTermination()


On Fri, Jul 22, 2016 at 1:50 PM, Joe Panciera <jo...@gmail.com>
wrote:

> Hi,
>
> I'm attempting to use broadcast variables to update stateful values used
> across the cluster for processing. Essentially, I have a function that is
> executed in .foreachRDD that updates the broadcast variable by calling
> unpersist() and then rebroadcasting. This works without issues when I
> execute the code without checkpointing, but as soon as I include
> checkpointing it seems to be unable to pickle the function. I get this
> error:
>
> *It appears that you are attempting to reference SparkContext from a
> broadcast *
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line
> 268, in __getnewargs__
>     "It appears that you are attempting to reference SparkContext from a
> broadcast "
> Exception: It appears that you are attempting to reference SparkContext
> from a broadcast variable, action, or transformation. SparkContext can only
> be used on the driver, not in code that it run on workers. For more
> information, see SPARK-5063.
>
>         at
> org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144)
>         at
> org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
>         ... 61 more
>
>
> Here's some simple code that shows this occurring:
>
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
>
>
>
> sc = SparkContext(appName="FileAutomation")
>
> # Create streaming context from existing spark context
> ssc = StreamingContext(sc, 10)
> alert_stream = KinesisUtils.createStream(ssc,
>                                          "Events",  # App Name
>                                          "Event_Test",  # Stream Name
>                                          "https://kinesis.us-west-2.amazonaws.com",
>                                          "us-west-2",
>                                          InitialPositionInStream.LATEST,
>                                          10000
>                                          )
>
> events = sc.broadcast(25)
>
>
> def test(rdd):
>
>     global events
>     num = events.value
>     print num
>
>     events.unpersist()
>     events = sc.broadcast(num + 1)
>
>
> events.foreachRDD(test)
>
> # Comment this line and no error occurs
> ssc.checkpoint('dir')
> ssc.start()
> ssc.awaitTermination()
>
>