You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by karan alang <ka...@gmail.com> on 2022/02/07 21:05:07 UTC

StructuredStreaming - foreach/foreachBatch

Hello All,

I'm using StructuredStreaming to read data from Kafka, and need to do
transformation on each individual row.

I'm trying to use 'foreach' (or foreachBatch), and running into issues.
Basic question - how is the row passed to the function when foreach is used
?

Also, when I use foreachBatch, seems the BatchId is available in the
function called ? How do I access individual rows ?

Details are in stackoverflow :
https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working

What is the best approach for this use-case ?

tia!

Re: StructuredStreaming - foreach/foreachBatch

Posted by karan alang <ka...@gmail.com>.
Thanks, Gourav - will check out the book.

regds,
Karan Alang

On Thu, Feb 17, 2022 at 9:05 AM Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> The following excellent documentation may help as well:
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
>
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
> fundamental thinking behind these concepts.
>
>
> Regards,
> Gourav Sengupta
>
>
>
> On Wed, Feb 9, 2022 at 8:51 PM karan alang <ka...@gmail.com> wrote:
>
>> Thanks, Mich .. will check it out
>>
>> regds,
>> Karan Alang
>>
>> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> BTW you can check this Linkedin article of mine on Processing Change
>>> Data Capture with Spark Structured Streaming
>>> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>>>
>>>
>>> It covers the concept of triggers including trigger(once = True) or
>>> one-time batch in Spark Structured Streaming
>>>
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 23:06, karan alang <ka...@gmail.com> wrote:
>>>
>>>> Thanks, Mich .. that worked fine!
>>>>
>>>>
>>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> read below
>>>>>
>>>>>             """
>>>>>                "foreach" performs custom write logic on each row and
>>>>> "foreachBatch" performs custom write logic on each micro-batch through
>>>>> SendToBigQuery function
>>>>>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
>>>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>>>> batch --> batchId*
>>>>>                Using foreachBatch, we write each micro batch to
>>>>> storage defined in our custom logic. In this case, we store the output of
>>>>> our streaming application to Google BigQuery table.
>>>>>                Note that we are appending data and column "rowkey" is
>>>>> defined as UUID so it can be used as the primary key
>>>>>             """
>>>>>             result = streamingDataFrame.select( \
>>>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>>>                    ,
>>>>> col("parsed_value.timeissued").alias("timeissued") \
>>>>>                    , col("parsed_value.price").alias("price")). \
>>>>>                      writeStream. \
>>>>>                      outputMode('append'). \
>>>>>                      option("truncate", "false"). \
>>>>>                      *foreachBatch(SendToBigQuery)*. \
>>>>>                      trigger(processingTime='2 seconds'). \
>>>>>                      start()
>>>>>
>>>>> now you define your function *SendToBigQuery() *
>>>>>
>>>>>
>>>>> *def SendToBigQuery(df, batchId):*
>>>>>
>>>>>     if(len(df.take(1))) > 0:
>>>>>
>>>>>         df.printSchema()
>>>>>
>>>>>         print(f"""batchId is {batchId}""")
>>>>>
>>>>>         rows = df.count()
>>>>>
>>>>>         print(f""" Total records processed in this run = {rows}""")
>>>>>
>>>>>         ......
>>>>>
>>>>>     else:
>>>>>
>>>>>         print("DataFrame is empty")
>>>>>
>>>>> *HTH*
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 7 Feb 2022 at 21:06, karan alang <ka...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello All,
>>>>>>
>>>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>>>> transformation on each individual row.
>>>>>>
>>>>>> I'm trying to use 'foreach' (or foreachBatch), and running into
>>>>>> issues.
>>>>>> Basic question - how is the row passed to the function when foreach
>>>>>> is used ?
>>>>>>
>>>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>>>> function called ? How do I access individual rows ?
>>>>>>
>>>>>> Details are in stackoverflow :
>>>>>>
>>>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>>>
>>>>>> What is the best approach for this use-case ?
>>>>>>
>>>>>> tia!
>>>>>>
>>>>>

Re: StructuredStreaming - foreach/foreachBatch

Posted by Danilo Sousa <da...@gmail.com>.
Hello Gourav,

I`’ll read this Document.


Thanks.

> On 17 Feb 2022, at 14:05, Gourav Sengupta <go...@gmail.com> wrote:
> 
> Hi,
> 
> The following excellent documentation may help as well: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch <https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch> 
> 
> The book from Dr. Zaharia on SPARK does a fantastic job in explaining the fundamental thinking behind these concepts.
> 
> 
> Regards,
> Gourav Sengupta
> 
> 
> 
> On Wed, Feb 9, 2022 at 8:51 PM karan alang <karan.alang@gmail.com <ma...@gmail.com>> wrote:
> Thanks, Mich .. will check it out
> 
> regds,
> Karan Alang
> 
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
> BTW you can check this Linkedin article of mine on Processing Change Data Capture with Spark Structured Streaming <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
> 
> It covers the concept of triggers including trigger(once = True) or one-time batch in Spark Structured Streaming
> 
> HTH
> 
>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 23:06, karan alang <karan.alang@gmail.com <ma...@gmail.com>> wrote:
> Thanks, Mich .. that worked fine!
> 
> 
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mich.talebzadeh@gmail.com <ma...@gmail.com>> wrote:
> read below
> 
>             """
>                "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
>                 foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch --> batchId
>                Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
>                Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
>             """
>             result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      foreachBatch(SendToBigQuery). \
>                      trigger(processingTime='2 seconds'). \
>                      start()
> 
> now you define your function SendToBigQuery() 
> 
> def SendToBigQuery(df, batchId):
>     if(len(df.take(1))) > 0:
>         df.printSchema()
>         print(f"""batchId is {batchId}""")
>         rows = df.count()
>         print(f""" Total records processed in this run = {rows}""")
>         ......
>     else:
>         print("DataFrame is empty")
> 
> HTH
> 
>    view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
> On Mon, 7 Feb 2022 at 21:06, karan alang <karan.alang@gmail.com <ma...@gmail.com>> wrote:
> Hello All,
> 
> I'm using StructuredStreaming to read data from Kafka, and need to do transformation on each individual row.
> 
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is used ?
> 
> Also, when I use foreachBatch, seems the BatchId is available in the function called ? How do I access individual rows ?
> 
> Details are in stackoverflow :   
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working <https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working>
> 
> What is the best approach for this use-case ?
> 
> tia!


Re: StructuredStreaming - foreach/foreachBatch

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

The following excellent documentation may help as well:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

The book from Dr. Zaharia on SPARK does a fantastic job in explaining the
fundamental thinking behind these concepts.


Regards,
Gourav Sengupta



On Wed, Feb 9, 2022 at 8:51 PM karan alang <ka...@gmail.com> wrote:

> Thanks, Mich .. will check it out
>
> regds,
> Karan Alang
>
> On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> BTW you can check this Linkedin article of mine on Processing Change
>> Data Capture with Spark Structured Streaming
>> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>>
>>
>> It covers the concept of triggers including trigger(once = True) or
>> one-time batch in Spark Structured Streaming
>>
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 23:06, karan alang <ka...@gmail.com> wrote:
>>
>>> Thanks, Mich .. that worked fine!
>>>
>>>
>>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> read below
>>>>
>>>>             """
>>>>                "foreach" performs custom write logic on each row and
>>>> "foreachBatch" performs custom write logic on each micro-batch through
>>>> SendToBigQuery function
>>>>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
>>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>>> batch --> batchId*
>>>>                Using foreachBatch, we write each micro batch to storage
>>>> defined in our custom logic. In this case, we store the output of our
>>>> streaming application to Google BigQuery table.
>>>>                Note that we are appending data and column "rowkey" is
>>>> defined as UUID so it can be used as the primary key
>>>>             """
>>>>             result = streamingDataFrame.select( \
>>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>>                    , col("parsed_value.timeissued").alias("timeissued")
>>>> \
>>>>                    , col("parsed_value.price").alias("price")). \
>>>>                      writeStream. \
>>>>                      outputMode('append'). \
>>>>                      option("truncate", "false"). \
>>>>                      *foreachBatch(SendToBigQuery)*. \
>>>>                      trigger(processingTime='2 seconds'). \
>>>>                      start()
>>>>
>>>> now you define your function *SendToBigQuery() *
>>>>
>>>>
>>>> *def SendToBigQuery(df, batchId):*
>>>>
>>>>     if(len(df.take(1))) > 0:
>>>>
>>>>         df.printSchema()
>>>>
>>>>         print(f"""batchId is {batchId}""")
>>>>
>>>>         rows = df.count()
>>>>
>>>>         print(f""" Total records processed in this run = {rows}""")
>>>>
>>>>         ......
>>>>
>>>>     else:
>>>>
>>>>         print("DataFrame is empty")
>>>>
>>>> *HTH*
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 7 Feb 2022 at 21:06, karan alang <ka...@gmail.com> wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>>> transformation on each individual row.
>>>>>
>>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>>>> Basic question - how is the row passed to the function when foreach is
>>>>> used ?
>>>>>
>>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>>> function called ? How do I access individual rows ?
>>>>>
>>>>> Details are in stackoverflow :
>>>>>
>>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>>
>>>>> What is the best approach for this use-case ?
>>>>>
>>>>> tia!
>>>>>
>>>>

Re: StructuredStreaming - foreach/foreachBatch

Posted by karan alang <ka...@gmail.com>.
Thanks, Mich .. will check it out

regds,
Karan Alang

On Tue, Feb 8, 2022 at 3:06 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> BTW you can check this Linkedin article of mine on Processing Change Data
> Capture with Spark Structured Streaming
> <https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>
>
>
> It covers the concept of triggers including trigger(once = True) or
> one-time batch in Spark Structured Streaming
>
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 23:06, karan alang <ka...@gmail.com> wrote:
>
>> Thanks, Mich .. that worked fine!
>>
>>
>> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> read below
>>>
>>>             """
>>>                "foreach" performs custom write logic on each row and
>>> "foreachBatch" performs custom write logic on each micro-batch through
>>> SendToBigQuery function
>>>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
>>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>>> batch --> batchId*
>>>                Using foreachBatch, we write each micro batch to storage
>>> defined in our custom logic. In this case, we store the output of our
>>> streaming application to Google BigQuery table.
>>>                Note that we are appending data and column "rowkey" is
>>> defined as UUID so it can be used as the primary key
>>>             """
>>>             result = streamingDataFrame.select( \
>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>>                    , col("parsed_value.price").alias("price")). \
>>>                      writeStream. \
>>>                      outputMode('append'). \
>>>                      option("truncate", "false"). \
>>>                      *foreachBatch(SendToBigQuery)*. \
>>>                      trigger(processingTime='2 seconds'). \
>>>                      start()
>>>
>>> now you define your function *SendToBigQuery() *
>>>
>>>
>>> *def SendToBigQuery(df, batchId):*
>>>
>>>     if(len(df.take(1))) > 0:
>>>
>>>         df.printSchema()
>>>
>>>         print(f"""batchId is {batchId}""")
>>>
>>>         rows = df.count()
>>>
>>>         print(f""" Total records processed in this run = {rows}""")
>>>
>>>         ......
>>>
>>>     else:
>>>
>>>         print("DataFrame is empty")
>>>
>>> *HTH*
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 7 Feb 2022 at 21:06, karan alang <ka...@gmail.com> wrote:
>>>
>>>> Hello All,
>>>>
>>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>>> transformation on each individual row.
>>>>
>>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>>> Basic question - how is the row passed to the function when foreach is
>>>> used ?
>>>>
>>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>>> function called ? How do I access individual rows ?
>>>>
>>>> Details are in stackoverflow :
>>>>
>>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>>
>>>> What is the best approach for this use-case ?
>>>>
>>>> tia!
>>>>
>>>

Re: StructuredStreaming - foreach/foreachBatch

Posted by Mich Talebzadeh <mi...@gmail.com>.
BTW you can check this Linkedin article of mine on Processing Change Data
Capture with Spark Structured Streaming
<https://www.linkedin.com/pulse/processing-change-data-capture-spark-structured-talebzadeh-ph-d-/?trackingId=pRPrKh1iQzq4EJ%2BuiAoWGQ%3D%3D>


It covers the concept of triggers including trigger(once = True) or
one-time batch in Spark Structured Streaming


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Feb 2022 at 23:06, karan alang <ka...@gmail.com> wrote:

> Thanks, Mich .. that worked fine!
>
>
> On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> read below
>>
>>             """
>>                "foreach" performs custom write logic on each row and
>> "foreachBatch" performs custom write logic on each micro-batch through
>> SendToBigQuery function
>>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
>> first: micro-batch as DataFrame or Dataset and second: unique id for each
>> batch --> batchId*
>>                Using foreachBatch, we write each micro batch to storage
>> defined in our custom logic. In this case, we store the output of our
>> streaming application to Google BigQuery table.
>>                Note that we are appending data and column "rowkey" is
>> defined as UUID so it can be used as the primary key
>>             """
>>             result = streamingDataFrame.select( \
>>                      col("parsed_value.rowkey").alias("rowkey") \
>>                    , col("parsed_value.ticker").alias("ticker") \
>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>                    , col("parsed_value.price").alias("price")). \
>>                      writeStream. \
>>                      outputMode('append'). \
>>                      option("truncate", "false"). \
>>                      *foreachBatch(SendToBigQuery)*. \
>>                      trigger(processingTime='2 seconds'). \
>>                      start()
>>
>> now you define your function *SendToBigQuery() *
>>
>>
>> *def SendToBigQuery(df, batchId):*
>>
>>     if(len(df.take(1))) > 0:
>>
>>         df.printSchema()
>>
>>         print(f"""batchId is {batchId}""")
>>
>>         rows = df.count()
>>
>>         print(f""" Total records processed in this run = {rows}""")
>>
>>         ......
>>
>>     else:
>>
>>         print("DataFrame is empty")
>>
>> *HTH*
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 7 Feb 2022 at 21:06, karan alang <ka...@gmail.com> wrote:
>>
>>> Hello All,
>>>
>>> I'm using StructuredStreaming to read data from Kafka, and need to do
>>> transformation on each individual row.
>>>
>>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>>> Basic question - how is the row passed to the function when foreach is
>>> used ?
>>>
>>> Also, when I use foreachBatch, seems the BatchId is available in the
>>> function called ? How do I access individual rows ?
>>>
>>> Details are in stackoverflow :
>>>
>>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>>
>>> What is the best approach for this use-case ?
>>>
>>> tia!
>>>
>>

Re: StructuredStreaming - foreach/foreachBatch

Posted by karan alang <ka...@gmail.com>.
Thanks, Mich .. that worked fine!


On Mon, Feb 7, 2022 at 1:55 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> read below
>
>             """
>                "foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
>                 *foreachBatch(SendToBigQuery) expects 2 parameters,
> first: micro-batch as DataFrame or Dataset and second: unique id for each
> batch --> batchId*
>                Using foreachBatch, we write each micro batch to storage
> defined in our custom logic. In this case, we store the output of our
> streaming application to Google BigQuery table.
>                Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
>             """
>             result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      *foreachBatch(SendToBigQuery)*. \
>                      trigger(processingTime='2 seconds'). \
>                      start()
>
> now you define your function *SendToBigQuery() *
>
>
> *def SendToBigQuery(df, batchId):*
>
>     if(len(df.take(1))) > 0:
>
>         df.printSchema()
>
>         print(f"""batchId is {batchId}""")
>
>         rows = df.count()
>
>         print(f""" Total records processed in this run = {rows}""")
>
>         ......
>
>     else:
>
>         print("DataFrame is empty")
>
> *HTH*
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 7 Feb 2022 at 21:06, karan alang <ka...@gmail.com> wrote:
>
>> Hello All,
>>
>> I'm using StructuredStreaming to read data from Kafka, and need to do
>> transformation on each individual row.
>>
>> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
>> Basic question - how is the row passed to the function when foreach is
>> used ?
>>
>> Also, when I use foreachBatch, seems the BatchId is available in the
>> function called ? How do I access individual rows ?
>>
>> Details are in stackoverflow :
>>
>> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>>
>> What is the best approach for this use-case ?
>>
>> tia!
>>
>

Re: StructuredStreaming - foreach/foreachBatch

Posted by Mich Talebzadeh <mi...@gmail.com>.
read below

            """
               "foreach" performs custom write logic on each row and
"foreachBatch" performs custom write logic on each micro-batch through
SendToBigQuery function
                *foreachBatch(SendToBigQuery) expects 2 parameters, first:
micro-batch as DataFrame or Dataset and second: unique id for each batch
--> batchId*
               Using foreachBatch, we write each micro batch to storage
defined in our custom logic. In this case, we store the output of our
streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is
defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     *foreachBatch(SendToBigQuery)*. \
                     trigger(processingTime='2 seconds'). \
                     start()

now you define your function *SendToBigQuery() *


*def SendToBigQuery(df, batchId):*

    if(len(df.take(1))) > 0:

        df.printSchema()

        print(f"""batchId is {batchId}""")

        rows = df.count()

        print(f""" Total records processed in this run = {rows}""")

        ......

    else:

        print("DataFrame is empty")

*HTH*


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 7 Feb 2022 at 21:06, karan alang <ka...@gmail.com> wrote:

> Hello All,
>
> I'm using StructuredStreaming to read data from Kafka, and need to do
> transformation on each individual row.
>
> I'm trying to use 'foreach' (or foreachBatch), and running into issues.
> Basic question - how is the row passed to the function when foreach is
> used ?
>
> Also, when I use foreachBatch, seems the BatchId is available in the
> function called ? How do I access individual rows ?
>
> Details are in stackoverflow :
>
> https://stackoverflow.com/questions/71025312/structuredstreaming-foreach-foreachbatch-not-working
>
> What is the best approach for this use-case ?
>
> tia!
>