You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "simone.robutti" <si...@gmail.com> on 2015/08/07 16:07:17 UTC

Issue when rebroadcasting a variable outside of the definition scope

Hello everyone,

this is my first message ever to a mailing list so please pardon me if for
some reason I'm violating the etiquette.

I have a problem with rebroadcasting a variable. How it should work is not
well documented so I could find only a few and simple example to understand
how it should work.

What I'm trying to do is to propagate an update to the option for the
behaviour of my streaming transformations (in this case, the evaluation of
machine learning models). I have a listener on a kafka queue that wait for
messages and update the broadcasted variable. 

I made it to work but the system doesn't rebroadcast anything if I pass the
DStream or the broadcasted variable as a parameter.

So they must be defined both in the same scope and the rebroadcasting should
happen again in the same scope. Right now my main function looks like this:
----------------------------------------------------------------------------------------------------------------------
 var updateVar= sc.broadcast("test")
 val stream=input.map(x => myTransformation(x,updateVar))
 stream.writeToKafka[String, String](outputProps,
        (m: String) => new KeyedMessage[String,
String](configuration.outputTopic, m +updateVar.value ))

val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1,
new DefaultDecoder(), new StringDecoder())(0)
for (messageAndTopic <- controlStream) {
      
        println("ricevo")
        updateVar.unpersist()
        updateVar=ssc.sparkContext.broadcast(messageAndTopic.message)

      
    }

ssc.start()
ssc.awaitTermination()

----------------------------------------------------------------------------------------------------------------------

"updateVar" is correctly updated both in "myTransformation" and in the main
scope and I can access the updated value.

But when I try  to do this moving the logic to a class, it fails. I have
something like this (or the same queue listener from before, but moved to
another class):

class Listener(var updateVar: Broadcast[String]){...
    def someFunc()={
       updateVar.unpersist()
       updateVar=sc.broadcast("new value")
    }
...
}

This fails: the variable can be destroyed but cannot be updated. 

Any suggestion on why there is this behaviour? Also I would like to know how
Spark notices the reassignment to var and start the rebroadcasting.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Issue when rebroadcasting a variable outside of the definition scope

Posted by "Ganelin, Ilya" <Il...@capitalone.com>.
Simone, here are some thoughts. Please check out the "understanding closures" section of the Spark Programming Guide. Secondly, broadcast variables do not propagate updates to the underlying data. You must either create a new broadcast variable or alternately if you simply wish to accumulate results you can use an Accumulator that stores an array or queue as a buffer that you then read from to Kafka.

You should also be able to send the results to a new DStream instead, and link that DStream to Kafka. Hope this gives you some ideas to play with. Thanks!



Thank you,
Ilya Ganelin



-----Original Message-----
From: simone.robutti [simone.robutti@gmail.com<ma...@gmail.com>]
Sent: Friday, August 07, 2015 10:07 AM Eastern Standard Time
To: user@spark.apache.org
Subject: Issue when rebroadcasting a variable outside of the definition scope


Hello everyone,

this is my first message ever to a mailing list so please pardon me if for
some reason I'm violating the etiquette.

I have a problem with rebroadcasting a variable. How it should work is not
well documented so I could find only a few and simple example to understand
how it should work.

What I'm trying to do is to propagate an update to the option for the
behaviour of my streaming transformations (in this case, the evaluation of
machine learning models). I have a listener on a kafka queue that wait for
messages and update the broadcasted variable.

I made it to work but the system doesn't rebroadcast anything if I pass the
DStream or the broadcasted variable as a parameter.

So they must be defined both in the same scope and the rebroadcasting should
happen again in the same scope. Right now my main function looks like this:
----------------------------------------------------------------------------------------------------------------------
 var updateVar= sc.broadcast("test")
 val stream=input.map(x => myTransformation(x,updateVar))
 stream.writeToKafka[String, String](outputProps,
        (m: String) => new KeyedMessage[String,
String](configuration.outputTopic, m +updateVar.value ))

val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1,
new DefaultDecoder(), new StringDecoder())(0)
for (messageAndTopic <- controlStream) {

        println("ricevo")
        updateVar.unpersist()
        updateVar=ssc.sparkContext.broadcast(messageAndTopic.message)


    }

ssc.start()
ssc.awaitTermination()

----------------------------------------------------------------------------------------------------------------------

"updateVar" is correctly updated both in "myTransformation" and in the main
scope and I can access the updated value.

But when I try  to do this moving the logic to a class, it fails. I have
something like this (or the same queue listener from before, but moved to
another class):

class Listener(var updateVar: Broadcast[String]){...
    def someFunc()={
       updateVar.unpersist()
       updateVar=sc.broadcast("new value")
    }
...
}

This fails: the variable can be destroyed but cannot be updated.

Any suggestion on why there is this behaviour? Also I would like to know how
Spark notices the reassignment to var and start the rebroadcasting.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org

________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.