You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "Dipl.-Inf. Rico Bergmann" <in...@ricobergmann.de> on 2022/09/28 15:10:34 UTC
Updating Broadcast Variable in Spark Streaming 2.4.4
Hi folks!
I'm trying to implement an update of a broadcast var in Spark Streaming.
The idea is that whenever some configuration value has changed (this is
periodically checked by the driver) the existing broadcast variable is
unpersisted and then (re-)broadcasted.
In a local test setup (using a local Spark) it works fine but on a real
cluster it doesn't work. The broadcast variable never gets updated. What
I can see after adding some log messages is that the BroadcastUpdater
thread is only called twice and then never again. Anyone any idea why
this happens?
Code snippet:
@RequiredArgsConstructor
public class BroadcastUpdater implements Runnable {
private final transient JavaSparkContext sparkContext;
@Getter
private transient volatile Broadcast<Map<String, String>> broadcastVar;
private transient volatile Map<String, String> configMap;
public void run() {
Map<String, String> configMap = getConfigMap();
if (this.broadcastVar == null ||
!configMap.equals(this.configMap)) {
this.configMap = configMap;
if (broadcastVar != null) {
broadcastVar.unpersist(true);
broadcastVar.destroy(true);
}
this.broadcastVar =
this.sparkContext.broadcast(this.configMap);
}
}
private Map<String, String> getConfigMap() {
//impl details
}
}
public class StreamingFunction implements Serializable {
private transient volatile BroadcastUpdater broadcastUpdater;
private transient ScheduledThreadPoolExecutor
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
protected JavaStreamingContext startStreaming(JavaStreamingContext
context, ConsumerStrategy<String, ChangeDataRecord> consumerStrategy) {
broadcastUpdater = new BroadcastUpdater(context.sparkContext());
scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
3, TimeUnit.SECONDS);
final JavaInputDStream<ConsumerRecord<String,
ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
LocationStrategies.PreferConsistent(), consumerStrategy);
inputStream.foreachRDD(rdd -> {
Broadcast<Map<String, String>> broadcastVar =
broadcastUpdater.getBroadcastVar();
rdd.foreachPartition(partition -> {
if (partition.hasNext()) {
Map<String, String> configMap =
broadcastVar.getValue();
// iterate
while (partition.hasNext()) {
//impl logic using broadcast variable
}
}
}
}
}
}
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org
Re: Updating Broadcast Variable in Spark Streaming 2.4.4
Posted by Sean Owen <sr...@gmail.com>.
I don't think that can work. Your BroadcastUpdater is copied to the task,
with a reference to an initial broadcast. When that is later updated on the
driver, this does not affect the broadcast inside the copy in the tasks.
On Wed, Sep 28, 2022 at 10:11 AM Dipl.-Inf. Rico Bergmann <
info@ricobergmann.de> wrote:
> Hi folks!
>
>
> I'm trying to implement an update of a broadcast var in Spark Streaming.
> The idea is that whenever some configuration value has changed (this is
> periodically checked by the driver) the existing broadcast variable is
> unpersisted and then (re-)broadcasted.
>
> In a local test setup (using a local Spark) it works fine but on a real
> cluster it doesn't work. The broadcast variable never gets updated. What
> I can see after adding some log messages is that the BroadcastUpdater
> thread is only called twice and then never again. Anyone any idea why
> this happens?
>
> Code snippet:
>
> @RequiredArgsConstructor
> public class BroadcastUpdater implements Runnable {
> private final transient JavaSparkContext sparkContext;
> @Getter
> private transient volatile Broadcast<Map<String, String>>
> broadcastVar;
> private transient volatile Map<String, String> configMap;
>
> public void run() {
> Map<String, String> configMap = getConfigMap();
> if (this.broadcastVar == null ||
> !configMap.equals(this.configMap)) {
> this.configMap = configMap;
> if (broadcastVar != null) {
> broadcastVar.unpersist(true);
> broadcastVar.destroy(true);
> }
> this.broadcastVar =
> this.sparkContext.broadcast(this.configMap);
> }
> }
>
> private Map<String, String> getConfigMap() {
> //impl details
> }
> }
>
> public class StreamingFunction implements Serializable {
>
> private transient volatile BroadcastUpdater broadcastUpdater;
> private transient ScheduledThreadPoolExecutor
> scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
>
> protected JavaStreamingContext startStreaming(JavaStreamingContext
> context, ConsumerStrategy<String, ChangeDataRecord> consumerStrategy) {
> broadcastUpdater = new BroadcastUpdater(context.sparkContext());
> scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
> 3, TimeUnit.SECONDS);
>
> final JavaInputDStream<ConsumerRecord<String,
> ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
> LocationStrategies.PreferConsistent(), consumerStrategy);
>
> inputStream.foreachRDD(rdd -> {
> Broadcast<Map<String, String>> broadcastVar =
> broadcastUpdater.getBroadcastVar();
> rdd.foreachPartition(partition -> {
> if (partition.hasNext()) {
> Map<String, String> configMap =
> broadcastVar.getValue();
>
> // iterate
> while (partition.hasNext()) {
> //impl logic using broadcast variable
> }
> }
> }
> }
> }
> }
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>