You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Srikanth <sr...@gmail.com> on 2016/02/17 22:13:07 UTC

Streaming with broadcast joins

Hello,

I have a streaming use case where I plan to keep a dataset broadcasted and
cached on each executor.
Every micro batch in streaming will create a DF out of the RDD and join the
batch.
The below code will perform the broadcast operation for each RDD. Is there
a way to broadcast it just once?

Alternate approachs are also welcome.

    val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)

    val metaDF = sqlContext.read.format("json").schema(schema1).load(file2)
                              .join(DF1, "id")
    metaDF.cache


  val lines = streamingcontext.textFileStream(path)

  lines.foreachRDD( rdd => {
      val recordDF = rdd.flatMap(r => Record(r)).toDF()
      val joinedDF = recordDF.join(broadcast(metaDF), "id")

      joinedDF.rdd.foreachPartition ( partition => {
        partition.foreach( row => {
             ...
             ...
        })
      })
  })

 streamingcontext.start

On a similar note, if the metaDF is too big for broadcast, can I partition
it(df.repartition($"col")) and also partition each streaming RDD?
This way I can avoid shuffling metaDF each time.

Let me know you thoughts.

Thanks

Re: Streaming with broadcast joins

Posted by Srikanth <sr...@gmail.com>.
Hmmm..OK.

Srikanth

On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <se...@gmail.com>
wrote:

> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <sr...@gmail.com> wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <se...@gmail.com>
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <sr...@gmail.com> wrote:
>>>
>>>> Code with SQL broadcast hint. This worked and I was able to see that
>>>> broadcastjoin was performed.
>>>>
>>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>    val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>> But for every batch this file was read and broadcast was performed.
>>>> Evaluating the entire DAG I guess.
>>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>
>>>> Then I changed code to broadcast the dataframe. This didn't work
>>>> either. Not sure if this is what you meant by broadcasting a dataframe.
>>>>
>>>> val testDF =
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>              )
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>     val resultDF = recordDF.join(testDF.value, "Age")
>>>>
>>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <
>>>> sebastian.piu@gmail.com> wrote:
>>>>
>>>>> Can you paste the code where you use sc.broadcast ?
>>>>>
>>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sebastian,
>>>>>>
>>>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>>>> prevent this broadcast for each RDD.
>>>>>> Is there a way where it can be broadcast once and used locally for
>>>>>> each RDD?
>>>>>> Right now every batch the metadata file is read and the DF is
>>>>>> broadcasted.
>>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>>
>>>>>> Srikanth
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>
>>>>>>> You should be able to broadcast that data frame using sc.broadcast
>>>>>>> and join against it.
>>>>>>>
>>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>>>> broadcasted and cached on each executor.
>>>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>>>> join the batch.
>>>>>>>> The below code will perform the broadcast operation for each RDD.
>>>>>>>> Is there a way to broadcast it just once?
>>>>>>>>
>>>>>>>> Alternate approachs are also welcome.
>>>>>>>>
>>>>>>>>     val DF1 =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>>
>>>>>>>>     val metaDF =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>>                               .join(DF1, "id")
>>>>>>>>     metaDF.cache
>>>>>>>>
>>>>>>>>
>>>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>>>
>>>>>>>>   lines.foreachRDD( rdd => {
>>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>>
>>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>>         partition.foreach( row => {
>>>>>>>>              ...
>>>>>>>>              ...
>>>>>>>>         })
>>>>>>>>       })
>>>>>>>>   })
>>>>>>>>
>>>>>>>>  streamingcontext.start
>>>>>>>>
>>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>>
>>>>>>>> Let me know you thoughts.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Re: Streaming with broadcast joins

Posted by Srikanth <sr...@gmail.com>.
Sabastian,

*Update:-*  This is not possible. Probably will remain this way for the
foreseeable future.
https://issues.apache.org/jira/browse/SPARK-3863

Srikanth

On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <se...@gmail.com>
wrote:

> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <sr...@gmail.com> wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <se...@gmail.com>
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <sr...@gmail.com> wrote:
>>>
>>>> Code with SQL broadcast hint. This worked and I was able to see that
>>>> broadcastjoin was performed.
>>>>
>>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>    val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>> But for every batch this file was read and broadcast was performed.
>>>> Evaluating the entire DAG I guess.
>>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>
>>>> Then I changed code to broadcast the dataframe. This didn't work
>>>> either. Not sure if this is what you meant by broadcasting a dataframe.
>>>>
>>>> val testDF =
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>              )
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>     val resultDF = recordDF.join(testDF.value, "Age")
>>>>
>>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <
>>>> sebastian.piu@gmail.com> wrote:
>>>>
>>>>> Can you paste the code where you use sc.broadcast ?
>>>>>
>>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sebastian,
>>>>>>
>>>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>>>> prevent this broadcast for each RDD.
>>>>>> Is there a way where it can be broadcast once and used locally for
>>>>>> each RDD?
>>>>>> Right now every batch the metadata file is read and the DF is
>>>>>> broadcasted.
>>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>>
>>>>>> Srikanth
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>
>>>>>>> You should be able to broadcast that data frame using sc.broadcast
>>>>>>> and join against it.
>>>>>>>
>>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>>>> broadcasted and cached on each executor.
>>>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>>>> join the batch.
>>>>>>>> The below code will perform the broadcast operation for each RDD.
>>>>>>>> Is there a way to broadcast it just once?
>>>>>>>>
>>>>>>>> Alternate approachs are also welcome.
>>>>>>>>
>>>>>>>>     val DF1 =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>>
>>>>>>>>     val metaDF =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>>                               .join(DF1, "id")
>>>>>>>>     metaDF.cache
>>>>>>>>
>>>>>>>>
>>>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>>>
>>>>>>>>   lines.foreachRDD( rdd => {
>>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>>
>>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>>         partition.foreach( row => {
>>>>>>>>              ...
>>>>>>>>              ...
>>>>>>>>         })
>>>>>>>>       })
>>>>>>>>   })
>>>>>>>>
>>>>>>>>  streamingcontext.start
>>>>>>>>
>>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>>
>>>>>>>> Let me know you thoughts.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Re: Streaming with broadcast joins

Posted by Srikanth <sr...@gmail.com>.
Sure. These may be unrelated.

On Fri, Feb 19, 2016 at 10:39 AM, Jerry Lam <ch...@gmail.com> wrote:

> Hi guys,
>
> I also encounter broadcast dataframe issue not for steaming jobs but
> regular dataframe join. In my case, the executors died probably due to OOM
> which I don't think it should use that much memory. Anyway, I'm going to
> craft an example and send it here to see if it is a bug or something I've
> misunderstood.
>
> Best Regards,
>
> Jerry
>
> Sent from my iPhone
>
> On 19 Feb, 2016, at 10:20 am, Sebastian Piu <se...@gmail.com>
> wrote:
>
> I don't have the code with me now, and I ended moving everything to RDD in
> the end and using map operations to do some lookups, i.e. instead of
> broadcasting a Dataframe I ended broadcasting a Map
>
>
> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <sr...@gmail.com> wrote:
>
>> It didn't fail. It wasn't broadcasting. I just ran the test again and
>> here are the logs.
>> Every batch is reading the metadata file.
>>
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>> If I remember, foreachRDD is executed in the driver's context. Not sure
>> how we'll be able to achieve broadcast in this approach(unless we use SQL
>> broadcast hint again)
>>
>> When you say "it worked before",  was it with an older version of spark?
>> I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI
>> that broadcast join is being used. Also, if the files are read and
>> broadcasted each batch??
>>
>> Thanks for the help!
>>
>>
>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <se...@gmail.com>
>> wrote:
>>
>>> I don't see anything obviously wrong on your second approach, I've done
>>> it like that before and it worked. When you say that it didn't work what do
>>> you mean? did it fail? it didnt broadcast?
>>>
>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <sr...@gmail.com> wrote:
>>>
>>>> Code with SQL broadcast hint. This worked and I was able to see that
>>>> broadcastjoin was performed.
>>>>
>>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>    val resultDF = recordDF.join(testDF, "Age")
>>>>
>>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>> But for every batch this file was read and broadcast was performed.
>>>> Evaluating the entire DAG I guess.
>>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>>> file:/shared/data/test-data.txt:0+27
>>>>
>>>>
>>>> Then I changed code to broadcast the dataframe. This didn't work
>>>> either. Not sure if this is what you meant by broadcasting a dataframe.
>>>>
>>>> val testDF =
>>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>>
>>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>>              )
>>>>
>>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>>
>>>> lines.foreachRDD((rdd, timestamp) => {
>>>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>>> l(1))).toDF()
>>>>     val resultDF = recordDF.join(testDF.value, "Age")
>>>>
>>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>>
>>>>
>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <
>>>> sebastian.piu@gmail.com> wrote:
>>>>
>>>>> Can you paste the code where you use sc.broadcast ?
>>>>>
>>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sebastian,
>>>>>>
>>>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>>>> prevent this broadcast for each RDD.
>>>>>> Is there a way where it can be broadcast once and used locally for
>>>>>> each RDD?
>>>>>> Right now every batch the metadata file is read and the DF is
>>>>>> broadcasted.
>>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>>
>>>>>> Srikanth
>>>>>>
>>>>>>
>>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>>>>>> sebastian.piu@gmail.com> wrote:
>>>>>>
>>>>>>> You should be able to broadcast that data frame using sc.broadcast
>>>>>>> and join against it.
>>>>>>>
>>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>>>> broadcasted and cached on each executor.
>>>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>>>> join the batch.
>>>>>>>> The below code will perform the broadcast operation for each RDD.
>>>>>>>> Is there a way to broadcast it just once?
>>>>>>>>
>>>>>>>> Alternate approachs are also welcome.
>>>>>>>>
>>>>>>>>     val DF1 =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>>
>>>>>>>>     val metaDF =
>>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>>                               .join(DF1, "id")
>>>>>>>>     metaDF.cache
>>>>>>>>
>>>>>>>>
>>>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>>>
>>>>>>>>   lines.foreachRDD( rdd => {
>>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>>
>>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>>         partition.foreach( row => {
>>>>>>>>              ...
>>>>>>>>              ...
>>>>>>>>         })
>>>>>>>>       })
>>>>>>>>   })
>>>>>>>>
>>>>>>>>  streamingcontext.start
>>>>>>>>
>>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>>
>>>>>>>> Let me know you thoughts.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Re: Streaming with broadcast joins

Posted by Jerry Lam <ch...@gmail.com>.
Hi guys,

I also encounter broadcast dataframe issue not for steaming jobs but regular dataframe join. In my case, the executors died probably due to OOM which I don't think it should use that much memory. Anyway, I'm going to craft an example and send it here to see if it is a bug or something I've misunderstood.

Best Regards,

Jerry

Sent from my iPhone

> On 19 Feb, 2016, at 10:20 am, Sebastian Piu <se...@gmail.com> wrote:
> 
> I don't have the code with me now, and I ended moving everything to RDD in the end and using map operations to do some lookups, i.e. instead of broadcasting a Dataframe I ended broadcasting a Map 
> 
> 
>> On Fri, Feb 19, 2016 at 11:39 AM Srikanth <sr...@gmail.com> wrote:
>> It didn't fail. It wasn't broadcasting. I just ran the test again and here are the logs.
>> Every batch is reading the metadata file.
>> 
>> 	16/02/19 06:27:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
>> 	16/02/19 06:27:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
>> 
>> 	16/02/19 06:27:40 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
>> 	16/02/19 06:27:40 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
>> 
>> If I remember, foreachRDD is executed in the driver's context. Not sure how we'll be able to achieve broadcast in this approach(unless we use SQL broadcast hint again)
>> 
>> When you say "it worked before",  was it with an older version of spark? I'm trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI that broadcast join is being used. Also, if the files are read and broadcasted each batch??
>> 
>> Thanks for the help!
>> 
>> 
>>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <se...@gmail.com> wrote:
>>> I don't see anything obviously wrong on your second approach, I've done it like that before and it worked. When you say that it didn't work what do you mean? did it fail? it didnt broadcast? 
>>> 
>>>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <sr...@gmail.com> wrote:
>>>> Code with SQL broadcast hint. This worked and I was able to see that broadcastjoin was performed.
>>>> 
>>>> 	val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>> 	                .schema(schema).load("file:///shared/data/test-data.txt") 
>>>> 
>>>> 	val lines = ssc.socketTextStream("DevNode", 9999)
>>>> 
>>>> 	lines.foreachRDD((rdd, timestamp) => {
>>>> 	    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
>>>> 	    val resultDF = recordDF.join(testDF, "Age")
>>>> 	    resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>> 
>>>> But for every batch this file was read and broadcast was performed. Evaluating the entire DAG I guess.
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
>>>> 
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:27+28
>>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split: file:/shared/data/test-data.txt:0+27
>>>> 
>>>> 
>>>> Then I changed code to broadcast the dataframe. This didn't work either. Not sure if this is what you meant by broadcasting a dataframe.
>>>> 
>>>> 	val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>> 	                .schema(schema).load("file:///shared/data/test-data.txt") 
>>>> 	             )
>>>> 
>>>> 	val lines = ssc.socketTextStream("DevNode", 9999)
>>>> 
>>>> 	lines.foreachRDD((rdd, timestamp) => {
>>>> 	    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, l(1))).toDF()
>>>> 	    val resultDF = recordDF.join(testDF.value, "Age")
>>>> 	    resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>>         }
>>>> 
>>>> 
>>>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <se...@gmail.com> wrote:
>>>>> Can you paste the code where you use sc.broadcast ?
>>>>> 
>>>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com> wrote:
>>>>>> Sebastian,
>>>>>> 
>>>>>> I was able to broadcast using sql broadcast hint. Question is how to prevent this broadcast for each RDD.
>>>>>> Is there a way where it can be broadcast once and used locally for each RDD?
>>>>>> Right now every batch the metadata file is read and the DF is broadcasted.
>>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>> 
>>>>>> Srikanth
>>>>>> 
>>>>>> 
>>>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <se...@gmail.com> wrote:
>>>>>>> You should be able to broadcast that data frame using sc.broadcast and join against it.
>>>>>>> 
>>>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>>>>>> Hello,
>>>>>>>> 
>>>>>>>> I have a streaming use case where I plan to keep a dataset broadcasted and cached on each executor.
>>>>>>>> Every micro batch in streaming will create a DF out of the RDD and join the batch.
>>>>>>>> The below code will perform the broadcast operation for each RDD. Is there a way to broadcast it just once?
>>>>>>>> 
>>>>>>>> Alternate approachs are also welcome.
>>>>>>>> 
>>>>>>>>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>> 
>>>>>>>>     val metaDF = sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>>                               .join(DF1, "id")
>>>>>>>>     metaDF.cache
>>>>>>>> 
>>>>>>>> 
>>>>>>>>     val lines = streamingcontext.textFileStream(path)
>>>>>>>> 
>>>>>>>>     lines.foreachRDD( rdd => {
>>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>> 
>>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>>         partition.foreach( row => {
>>>>>>>>              ...
>>>>>>>>              ...
>>>>>>>>         })
>>>>>>>>       })
>>>>>>>>     })
>>>>>>>> 
>>>>>>>>     streamingcontext.start
>>>>>>>> 
>>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>> 
>>>>>>>> Let me know you thoughts.
>>>>>>>> 
>>>>>>>> Thanks

Re: Streaming with broadcast joins

Posted by Sebastian Piu <se...@gmail.com>.
I don't have the code with me now, and I ended moving everything to RDD in
the end and using map operations to do some lookups, i.e. instead of
broadcasting a Dataframe I ended broadcasting a Map


On Fri, Feb 19, 2016 at 11:39 AM Srikanth <sr...@gmail.com> wrote:

> It didn't fail. It wasn't broadcasting. I just ran the test again and here
> are the logs.
> Every batch is reading the metadata file.
>
> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
> 16/02/19 06:27:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
>
> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
> 16/02/19 06:27:40 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
>
> If I remember, foreachRDD is executed in the driver's context. Not sure
> how we'll be able to achieve broadcast in this approach(unless we use SQL
> broadcast hint again)
>
> When you say "it worked before",  was it with an older version of spark?
> I'm trying this on 1.6.
> If you still have the streaming job running can you verify in spark UI
> that broadcast join is being used. Also, if the files are read and
> broadcasted each batch??
>
> Thanks for the help!
>
>
> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <se...@gmail.com>
> wrote:
>
>> I don't see anything obviously wrong on your second approach, I've done
>> it like that before and it worked. When you say that it didn't work what do
>> you mean? did it fail? it didnt broadcast?
>>
>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <sr...@gmail.com> wrote:
>>
>>> Code with SQL broadcast hint. This worked and I was able to see that
>>> broadcastjoin was performed.
>>>
>>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>>                .schema(schema).load("file:///shared/data/test-data.txt")
>>>
>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>
>>> lines.foreachRDD((rdd, timestamp) => {
>>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>> l(1))).toDF()
>>>    val resultDF = recordDF.join(testDF, "Age")
>>>
>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>         }
>>>
>>> But for every batch this file was read and broadcast was performed.
>>> Evaluating the entire DAG I guess.
>>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:27+28
>>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:0+27
>>>
>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:27+28
>>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>>> file:/shared/data/test-data.txt:0+27
>>>
>>>
>>> Then I changed code to broadcast the dataframe. This didn't work either.
>>> Not sure if this is what you meant by broadcasting a dataframe.
>>>
>>> val testDF =
>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>>
>>>  .schema(schema).load("file:///shared/data/test-data.txt")
>>>              )
>>>
>>> val lines = ssc.socketTextStream("DevNode", 9999)
>>>
>>> lines.foreachRDD((rdd, timestamp) => {
>>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>>> l(1))).toDF()
>>>     val resultDF = recordDF.join(testDF.value, "Age")
>>>
>>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>>         }
>>>
>>>
>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <sebastian.piu@gmail.com
>>> > wrote:
>>>
>>>> Can you paste the code where you use sc.broadcast ?
>>>>
>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com> wrote:
>>>>
>>>>> Sebastian,
>>>>>
>>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>>> prevent this broadcast for each RDD.
>>>>> Is there a way where it can be broadcast once and used locally for
>>>>> each RDD?
>>>>> Right now every batch the metadata file is read and the DF is
>>>>> broadcasted.
>>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>>
>>>>> Srikanth
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <
>>>>> sebastian.piu@gmail.com> wrote:
>>>>>
>>>>>> You should be able to broadcast that data frame using sc.broadcast
>>>>>> and join against it.
>>>>>>
>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>>> broadcasted and cached on each executor.
>>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>>> join the batch.
>>>>>>> The below code will perform the broadcast operation for each RDD. Is
>>>>>>> there a way to broadcast it just once?
>>>>>>>
>>>>>>> Alternate approachs are also welcome.
>>>>>>>
>>>>>>>     val DF1 =
>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>>
>>>>>>>     val metaDF =
>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>>                               .join(DF1, "id")
>>>>>>>     metaDF.cache
>>>>>>>
>>>>>>>
>>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>>
>>>>>>>   lines.foreachRDD( rdd => {
>>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>>
>>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>>         partition.foreach( row => {
>>>>>>>              ...
>>>>>>>              ...
>>>>>>>         })
>>>>>>>       })
>>>>>>>   })
>>>>>>>
>>>>>>>  streamingcontext.start
>>>>>>>
>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>>
>>>>>>> Let me know you thoughts.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>
>>>>>
>>>
>

Re: Streaming with broadcast joins

Posted by Srikanth <sr...@gmail.com>.
It didn't fail. It wasn't broadcasting. I just ran the test again and here
are the logs.
Every batch is reading the metadata file.

16/02/19 06:27:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27
16/02/19 06:27:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28

16/02/19 06:27:40 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
16/02/19 06:27:40 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27

If I remember, foreachRDD is executed in the driver's context. Not sure how
we'll be able to achieve broadcast in this approach(unless we use SQL
broadcast hint again)

When you say "it worked before",  was it with an older version of spark?
I'm trying this on 1.6.
If you still have the streaming job running can you verify in spark UI that
broadcast join is being used. Also, if the files are read and broadcasted
each batch??

Thanks for the help!


On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <se...@gmail.com>
wrote:

> I don't see anything obviously wrong on your second approach, I've done it
> like that before and it worked. When you say that it didn't work what do
> you mean? did it fail? it didnt broadcast?
>
> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <sr...@gmail.com> wrote:
>
>> Code with SQL broadcast hint. This worked and I was able to see that
>> broadcastjoin was performed.
>>
>> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>>                .schema(schema).load("file:///shared/data/test-data.txt")
>>
>> val lines = ssc.socketTextStream("DevNode", 9999)
>>
>> lines.foreachRDD((rdd, timestamp) => {
>>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>> l(1))).toDF()
>>    val resultDF = recordDF.join(testDF, "Age")
>>
>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>         }
>>
>> But for every batch this file was read and broadcast was performed.
>> Evaluating the entire DAG I guess.
>>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:27+28
>>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
>> file:/shared/data/test-data.txt:0+27
>>
>>
>> Then I changed code to broadcast the dataframe. This didn't work either.
>> Not sure if this is what you meant by broadcasting a dataframe.
>>
>> val testDF =
>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>>                 .schema(schema).load("file:///shared/data/test-data.txt")
>>              )
>>
>> val lines = ssc.socketTextStream("DevNode", 9999)
>>
>> lines.foreachRDD((rdd, timestamp) => {
>>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
>> l(1))).toDF()
>>     val resultDF = recordDF.join(testDF.value, "Age")
>>
>>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>>         }
>>
>>
>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <se...@gmail.com>
>> wrote:
>>
>>> Can you paste the code where you use sc.broadcast ?
>>>
>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com> wrote:
>>>
>>>> Sebastian,
>>>>
>>>> I was able to broadcast using sql broadcast hint. Question is how to
>>>> prevent this broadcast for each RDD.
>>>> Is there a way where it can be broadcast once and used locally for each
>>>> RDD?
>>>> Right now every batch the metadata file is read and the DF is
>>>> broadcasted.
>>>> I tried sc.broadcast and that did not provide this behavior.
>>>>
>>>> Srikanth
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <sebastian.piu@gmail.com
>>>> > wrote:
>>>>
>>>>> You should be able to broadcast that data frame using sc.broadcast and
>>>>> join against it.
>>>>>
>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a streaming use case where I plan to keep a dataset
>>>>>> broadcasted and cached on each executor.
>>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>>> join the batch.
>>>>>> The below code will perform the broadcast operation for each RDD. Is
>>>>>> there a way to broadcast it just once?
>>>>>>
>>>>>> Alternate approachs are also welcome.
>>>>>>
>>>>>>     val DF1 =
>>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>>
>>>>>>     val metaDF =
>>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>>                               .join(DF1, "id")
>>>>>>     metaDF.cache
>>>>>>
>>>>>>
>>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>>
>>>>>>   lines.foreachRDD( rdd => {
>>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>>
>>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>>         partition.foreach( row => {
>>>>>>              ...
>>>>>>              ...
>>>>>>         })
>>>>>>       })
>>>>>>   })
>>>>>>
>>>>>>  streamingcontext.start
>>>>>>
>>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>>>> This way I can avoid shuffling metaDF each time.
>>>>>>
>>>>>> Let me know you thoughts.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>
>>

Re: Streaming with broadcast joins

Posted by Sebastian Piu <se...@gmail.com>.
I don't see anything obviously wrong on your second approach, I've done it
like that before and it worked. When you say that it didn't work what do
you mean? did it fail? it didnt broadcast?

On Thu, Feb 18, 2016 at 11:43 PM Srikanth <sr...@gmail.com> wrote:

> Code with SQL broadcast hint. This worked and I was able to see that
> broadcastjoin was performed.
>
> val testDF = sqlContext.read.format("com.databricks.spark.csv")
>                .schema(schema).load("file:///shared/data/test-data.txt")
>
> val lines = ssc.socketTextStream("DevNode", 9999)
>
> lines.foreachRDD((rdd, timestamp) => {
>    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
> l(1))).toDF()
>    val resultDF = recordDF.join(testDF, "Age")
>
>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>         }
>
> But for every batch this file was read and broadcast was performed.
> Evaluating the entire DAG I guess.
>   16/02/18 12:24:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
>     16/02/18 12:24:02 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
>
>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:27+28
>     16/02/18 12:25:00 INFO HadoopRDD: Input split:
> file:/shared/data/test-data.txt:0+27
>
>
> Then I changed code to broadcast the dataframe. This didn't work either.
> Not sure if this is what you meant by broadcasting a dataframe.
>
> val testDF =
> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
>                 .schema(schema).load("file:///shared/data/test-data.txt")
>              )
>
> val lines = ssc.socketTextStream("DevNode", 9999)
>
> lines.foreachRDD((rdd, timestamp) => {
>     val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
> l(1))).toDF()
>     val resultDF = recordDF.join(testDF.value, "Age")
>
>  resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
>         }
>
>
> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <se...@gmail.com>
> wrote:
>
>> Can you paste the code where you use sc.broadcast ?
>>
>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com> wrote:
>>
>>> Sebastian,
>>>
>>> I was able to broadcast using sql broadcast hint. Question is how to
>>> prevent this broadcast for each RDD.
>>> Is there a way where it can be broadcast once and used locally for each
>>> RDD?
>>> Right now every batch the metadata file is read and the DF is
>>> broadcasted.
>>> I tried sc.broadcast and that did not provide this behavior.
>>>
>>> Srikanth
>>>
>>>
>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <se...@gmail.com>
>>> wrote:
>>>
>>>> You should be able to broadcast that data frame using sc.broadcast and
>>>> join against it.
>>>>
>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have a streaming use case where I plan to keep a dataset broadcasted
>>>>> and cached on each executor.
>>>>> Every micro batch in streaming will create a DF out of the RDD and
>>>>> join the batch.
>>>>> The below code will perform the broadcast operation for each RDD. Is
>>>>> there a way to broadcast it just once?
>>>>>
>>>>> Alternate approachs are also welcome.
>>>>>
>>>>>     val DF1 =
>>>>> sqlContext.read.format("json").schema(schema1).load(file1)
>>>>>
>>>>>     val metaDF =
>>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>>                               .join(DF1, "id")
>>>>>     metaDF.cache
>>>>>
>>>>>
>>>>>   val lines = streamingcontext.textFileStream(path)
>>>>>
>>>>>   lines.foreachRDD( rdd => {
>>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>>
>>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>>         partition.foreach( row => {
>>>>>              ...
>>>>>              ...
>>>>>         })
>>>>>       })
>>>>>   })
>>>>>
>>>>>  streamingcontext.start
>>>>>
>>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>>> This way I can avoid shuffling metaDF each time.
>>>>>
>>>>> Let me know you thoughts.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>
>

Re: Streaming with broadcast joins

Posted by Srikanth <sr...@gmail.com>.
Code with SQL broadcast hint. This worked and I was able to see that
broadcastjoin was performed.

val testDF = sqlContext.read.format("com.databricks.spark.csv")
               .schema(schema).load("file:///shared/data/test-data.txt")

val lines = ssc.socketTextStream("DevNode", 9999)

lines.foreachRDD((rdd, timestamp) => {
   val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
   val resultDF = recordDF.join(testDF, "Age")

 resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
        }

But for every batch this file was read and broadcast was performed.
Evaluating the entire DAG I guess.
  16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
    16/02/18 12:24:02 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27

    16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:27+28
    16/02/18 12:25:00 INFO HadoopRDD: Input split:
file:/shared/data/test-data.txt:0+27


Then I changed code to broadcast the dataframe. This didn't work either.
Not sure if this is what you meant by broadcasting a dataframe.

val testDF = sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")
                .schema(schema).load("file:///shared/data/test-data.txt")
             )

val lines = ssc.socketTextStream("DevNode", 9999)

lines.foreachRDD((rdd, timestamp) => {
    val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt,
l(1))).toDF()
    val resultDF = recordDF.join(testDF.value, "Age")

 resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
        }


On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu <se...@gmail.com>
wrote:

> Can you paste the code where you use sc.broadcast ?
>
> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com> wrote:
>
>> Sebastian,
>>
>> I was able to broadcast using sql broadcast hint. Question is how to
>> prevent this broadcast for each RDD.
>> Is there a way where it can be broadcast once and used locally for each
>> RDD?
>> Right now every batch the metadata file is read and the DF is broadcasted.
>> I tried sc.broadcast and that did not provide this behavior.
>>
>> Srikanth
>>
>>
>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <se...@gmail.com>
>> wrote:
>>
>>> You should be able to broadcast that data frame using sc.broadcast and
>>> join against it.
>>>
>>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a streaming use case where I plan to keep a dataset broadcasted
>>>> and cached on each executor.
>>>> Every micro batch in streaming will create a DF out of the RDD and join
>>>> the batch.
>>>> The below code will perform the broadcast operation for each RDD. Is
>>>> there a way to broadcast it just once?
>>>>
>>>> Alternate approachs are also welcome.
>>>>
>>>>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>>>
>>>>     val metaDF =
>>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>>                               .join(DF1, "id")
>>>>     metaDF.cache
>>>>
>>>>
>>>>   val lines = streamingcontext.textFileStream(path)
>>>>
>>>>   lines.foreachRDD( rdd => {
>>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>>
>>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>>         partition.foreach( row => {
>>>>              ...
>>>>              ...
>>>>         })
>>>>       })
>>>>   })
>>>>
>>>>  streamingcontext.start
>>>>
>>>> On a similar note, if the metaDF is too big for broadcast, can I
>>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>>> This way I can avoid shuffling metaDF each time.
>>>>
>>>> Let me know you thoughts.
>>>>
>>>> Thanks
>>>>
>>>>
>>

Re: Streaming with broadcast joins

Posted by Sebastian Piu <se...@gmail.com>.
Can you paste the code where you use sc.broadcast ?

On Thu, Feb 18, 2016 at 5:32 PM Srikanth <sr...@gmail.com> wrote:

> Sebastian,
>
> I was able to broadcast using sql broadcast hint. Question is how to
> prevent this broadcast for each RDD.
> Is there a way where it can be broadcast once and used locally for each
> RDD?
> Right now every batch the metadata file is read and the DF is broadcasted.
> I tried sc.broadcast and that did not provide this behavior.
>
> Srikanth
>
>
> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <se...@gmail.com>
> wrote:
>
>> You should be able to broadcast that data frame using sc.broadcast and
>> join against it.
>>
>> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a streaming use case where I plan to keep a dataset broadcasted
>>> and cached on each executor.
>>> Every micro batch in streaming will create a DF out of the RDD and join
>>> the batch.
>>> The below code will perform the broadcast operation for each RDD. Is
>>> there a way to broadcast it just once?
>>>
>>> Alternate approachs are also welcome.
>>>
>>>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>>
>>>     val metaDF =
>>> sqlContext.read.format("json").schema(schema1).load(file2)
>>>                               .join(DF1, "id")
>>>     metaDF.cache
>>>
>>>
>>>   val lines = streamingcontext.textFileStream(path)
>>>
>>>   lines.foreachRDD( rdd => {
>>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>>
>>>       joinedDF.rdd.foreachPartition ( partition => {
>>>         partition.foreach( row => {
>>>              ...
>>>              ...
>>>         })
>>>       })
>>>   })
>>>
>>>  streamingcontext.start
>>>
>>> On a similar note, if the metaDF is too big for broadcast, can I
>>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>>> This way I can avoid shuffling metaDF each time.
>>>
>>> Let me know you thoughts.
>>>
>>> Thanks
>>>
>>>
>

Re: Streaming with broadcast joins

Posted by Srikanth <sr...@gmail.com>.
Sebastian,

I was able to broadcast using sql broadcast hint. Question is how to
prevent this broadcast for each RDD.
Is there a way where it can be broadcast once and used locally for each RDD?
Right now every batch the metadata file is read and the DF is broadcasted.
I tried sc.broadcast and that did not provide this behavior.

Srikanth


On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu <se...@gmail.com>
wrote:

> You should be able to broadcast that data frame using sc.broadcast and
> join against it.
>
> On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a streaming use case where I plan to keep a dataset broadcasted
>> and cached on each executor.
>> Every micro batch in streaming will create a DF out of the RDD and join
>> the batch.
>> The below code will perform the broadcast operation for each RDD. Is
>> there a way to broadcast it just once?
>>
>> Alternate approachs are also welcome.
>>
>>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>>
>>     val metaDF =
>> sqlContext.read.format("json").schema(schema1).load(file2)
>>                               .join(DF1, "id")
>>     metaDF.cache
>>
>>
>>   val lines = streamingcontext.textFileStream(path)
>>
>>   lines.foreachRDD( rdd => {
>>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>>
>>       joinedDF.rdd.foreachPartition ( partition => {
>>         partition.foreach( row => {
>>              ...
>>              ...
>>         })
>>       })
>>   })
>>
>>  streamingcontext.start
>>
>> On a similar note, if the metaDF is too big for broadcast, can I
>> partition it(df.repartition($"col")) and also partition each streaming RDD?
>> This way I can avoid shuffling metaDF each time.
>>
>> Let me know you thoughts.
>>
>> Thanks
>>
>>

Re: Streaming with broadcast joins

Posted by Sebastian Piu <se...@gmail.com>.
You should be able to broadcast that data frame using sc.broadcast and join
against it.

On Wed, 17 Feb 2016, 21:13 Srikanth <sr...@gmail.com> wrote:

> Hello,
>
> I have a streaming use case where I plan to keep a dataset broadcasted and
> cached on each executor.
> Every micro batch in streaming will create a DF out of the RDD and join
> the batch.
> The below code will perform the broadcast operation for each RDD. Is there
> a way to broadcast it just once?
>
> Alternate approachs are also welcome.
>
>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>
>     val metaDF = sqlContext.read.format("json").schema(schema1).load(file2)
>                               .join(DF1, "id")
>     metaDF.cache
>
>
>   val lines = streamingcontext.textFileStream(path)
>
>   lines.foreachRDD( rdd => {
>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>
>       joinedDF.rdd.foreachPartition ( partition => {
>         partition.foreach( row => {
>              ...
>              ...
>         })
>       })
>   })
>
>  streamingcontext.start
>
> On a similar note, if the metaDF is too big for broadcast, can I partition
> it(df.repartition($"col")) and also partition each streaming RDD?
> This way I can avoid shuffling metaDF each time.
>
> Let me know you thoughts.
>
> Thanks
>
>