You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by James Cole <ja...@binarism.net> on 2015/07/03 08:47:18 UTC

Streaming: updating broadcast variables

Hi all,

I'm filtering a DStream using a function. I need to be able to change this
function while the application is running (I'm polling a service to see if
a user has changed their filtering). The filter function is a
transformation and runs on the workers, so that's where the updates need to
go. I'm not sure of the best way to do this.

Initially broadcasting seemed like the way to go: the filter is actually
quite large. But I don't think I can update something I've broadcasted.
I've tried unpersisting and re-creating the broadcast variable but it
became obvious this wasn't updating the reference on the worker. So am I
correct in thinking I can't use broadcasted variables for this purpose?

The next option seems to be: stopping the JavaStreamingContext, creating a
new one from the SparkContext, updating the filter function, and
re-creating the DStreams (I'm using direct streams from Kafka).

If I re-created the JavaStreamingContext would the accumulators (which are
created from the SparkContext) keep working? (Obviously I'm going to try
this soon)

In summary:

1) Can broadcasted variables be updated?

2) Is there a better way than re-creating the JavaStreamingContext and
DStreams?

Thanks,

James

Re: Streaming: updating broadcast variables

Posted by Conor Fennell <co...@altocloud.com>.
Hi James,

The code below shows one way how you can update the broadcast variable on
the executors:

    // ... events stream setup

    var startTime = new Date().getTime()

    var hashMap = HashMap("1" -> ("1", 1), "2" -> ("2", 2))

    var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap)

    val TWO_MINUTES = 120000

    //eventStream is a DStream

    eventStream.foreachRDD(rdd => {

      // Executed on the driver not the executors

      if (new Date().getTime() - startTime  > TWO_MINUTES) {

        // remove old broadcast variable

        hashMapBroadcast.unpersist()

        // create new one

        hashMapBroadcast = stream.context.sparkContext.broadcast("1" -> ("1",
1000), "2" -> ("2", 2000))

      }

    })

    val broadcastValuesFromStream = activitiesByVisitKey.map(activity =>
hashMapBroadcast.value("1"))

    // should print (1, 1000) after 2 minutes when updated

    broadcastValuesFromStream.print()


Regards,

Conor




On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey <
raghavendra.pandey@gmail.com> wrote:

> You cannot update the broadcasted variable.. It wont get reflected on
> workers.
> On Jul 3, 2015 12:18 PM, "James Cole" <ja...@binarism.net> wrote:
>
>> Hi all,
>>
>> I'm filtering a DStream using a function. I need to be able to change
>> this function while the application is running (I'm polling a service to
>> see if a user has changed their filtering). The filter function is a
>> transformation and runs on the workers, so that's where the updates need to
>> go. I'm not sure of the best way to do this.
>>
>> Initially broadcasting seemed like the way to go: the filter is actually
>> quite large. But I don't think I can update something I've broadcasted.
>> I've tried unpersisting and re-creating the broadcast variable but it
>> became obvious this wasn't updating the reference on the worker. So am I
>> correct in thinking I can't use broadcasted variables for this purpose?
>>
>> The next option seems to be: stopping the JavaStreamingContext, creating
>> a new one from the SparkContext, updating the filter function, and
>> re-creating the DStreams (I'm using direct streams from Kafka).
>>
>> If I re-created the JavaStreamingContext would the accumulators (which
>> are created from the SparkContext) keep working? (Obviously I'm going to
>> try this soon)
>>
>> In summary:
>>
>> 1) Can broadcasted variables be updated?
>>
>> 2) Is there a better way than re-creating the JavaStreamingContext and
>> DStreams?
>>
>> Thanks,
>>
>> James
>>
>>

Re: Streaming: updating broadcast variables

Posted by Raghavendra Pandey <ra...@gmail.com>.
You cannot update the broadcasted variable.. It wont get reflected on
workers.
On Jul 3, 2015 12:18 PM, "James Cole" <ja...@binarism.net> wrote:

> Hi all,
>
> I'm filtering a DStream using a function. I need to be able to change this
> function while the application is running (I'm polling a service to see if
> a user has changed their filtering). The filter function is a
> transformation and runs on the workers, so that's where the updates need to
> go. I'm not sure of the best way to do this.
>
> Initially broadcasting seemed like the way to go: the filter is actually
> quite large. But I don't think I can update something I've broadcasted.
> I've tried unpersisting and re-creating the broadcast variable but it
> became obvious this wasn't updating the reference on the worker. So am I
> correct in thinking I can't use broadcasted variables for this purpose?
>
> The next option seems to be: stopping the JavaStreamingContext, creating a
> new one from the SparkContext, updating the filter function, and
> re-creating the DStreams (I'm using direct streams from Kafka).
>
> If I re-created the JavaStreamingContext would the accumulators (which are
> created from the SparkContext) keep working? (Obviously I'm going to try
> this soon)
>
> In summary:
>
> 1) Can broadcasted variables be updated?
>
> 2) Is there a better way than re-creating the JavaStreamingContext and
> DStreams?
>
> Thanks,
>
> James
>
>