You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Eric Beabes <ma...@gmail.com> on 2021/05/03 16:36:18 UTC

Stream which needs to be “joined” with another Stream of “Reference” data.

I would like to develop a Spark Structured Streaming job that reads
messages in a Stream which needs to be “joined” with another Stream of
“Reference” data.

For example, let’s say I’m reading messages from Kafka coming in from (lots
of) IOT devices. This message has a ‘device_id’. We have a DEVICE table on
a relational database. What I need to do is “join” the ‘device_id’ in the
message with the ‘device_id’ on the table to enrich the incoming message.
Somewhere I read that, this can be done by joining two streams. I guess, we
can create a “Stream” that reads the DEVICE table once every hour or so.

Questions:
1) Is this the right way to solve this use case?
2) Should we use a Stateful Stream for reading DEVICE table with State
timeout set to an hour?
3) What would happen while the DEVICE state is getting updated from the
table on the relational database?

Guidance would be greatly appreciated. Thanks.

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by Mich Talebzadeh <mi...@gmail.com>.
Well that versioned table is CDC trail files that are landed on an external
storage as immutable data.

What happens is that you read the table itself at time T0 and then keep
reading committed transaction changes as trail files.

Kafka can do that as well. Read the files (CDC changes from say Oracle redo
logs for committed transactions) and land them somewhere as trail files. Then
you can use a SQL query to determine the latest version before you apply
further transformations.

WITH ranked AS (
  SELECT
    *,
    ROW_NUMBER() OVER (
      PARTITION BY device_id
      ORDER BY <event_timestamp> DESC
    ) AS rank
  FROM device_table
)
SELECT *
FROM ranked
WHERE rank = 1


If your data on reference table does not change in an hour's time, then you
will be OK reading as snapshot at time T0


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 3 May 2021 at 19:48, Eric Beabes <ma...@gmail.com> wrote:

> I was looking for something like "Processing Time Temporal Join" in Flink
> as described here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#processing-time-temporal-join
>
>
> On Mon, May 3, 2021 at 11:07 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> You are welcome Yuri. However, I stand corrected :)
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> ‪On Mon, 3 May 2021 at 19:02, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
>> yurkao@gmail.com> wrote:‬
>>
>>> Always nice to learn something new about jdbc.
>>> Thanks, Mich **thumbsup**
>>>
>>>
>>> On 3 May 2021, at 20:54, Mich Talebzadeh <mi...@gmail.com>
>>> wrote:
>>>
>>> 
>>> i would have assumed that reference data like device_id are pretty
>>> static so a snapshot will do.
>>>
>>> JDBC connection is lazy so it will not materialise until the join uses
>>> it. Then data will be collected from the underlying RDBMS table for
>>> COMMITED transactions
>>>
>>> However, this is something that I discussed in another thread
>>>
>>> *Spark Streaming with Files*
>>>
>>> There is an option that one can trigger once
>>>
>>>               result = streamingDataFrame.select( \
>>>                      col("parsed_value.rowkey").alias("rowkey") \
>>>                    , col("parsed_value.ticker").alias("ticker") \
>>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>>                    , col("parsed_value.price").alias("price")). \
>>>                      writeStream. \
>>>                      outputMode('append'). \
>>>                      option("truncate", "false"). \
>>>                      foreachBatch(sendToSink). \
>>>                      queryName('trailFiles'). \
>>>                     * trigger(once = True). \*
>>>  *                    option('checkpointLocation', checkpoint_path). \*
>>>                      start(data_path)
>>>
>>> This means that the streaming job will run for all data connected and
>>> terminate. In that case JDBC connection will be refreshed according to your
>>> batch interval that restarts the streaming process for unprocessed data and
>>> critically your JDBC snapshot will be updated as read
>>>
>>> This can be done through airflow etc. You won't lose data as the
>>> checkpoint will mark processed records.
>>>
>>> That might be an option.
>>>
>>> HTH
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
>>> yurkao@gmail.com> wrote:‬
>>>
>>>> You can do the enrichment with stream(events)-static(device table) join
>>>> when the device table is slow changing dimension (let’s say once a day
>>>> change) and it’s in delta format, then for every micro batch with
>>>> stream-static John the device table will be rescanned and up to date device
>>>> data will be loaded.
>>>>
>>>> If device table is not slow dimension(once an hour change), then you’d
>>>> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in
>>>> Spark supports streaming mode.
>>>> So I’d better sync jdbc with parquet/delta periodically in order to
>>>> emulate streaming source
>>>>
>>>>
>>>> On 3 May 2021, at 20:02, Eric Beabes <ma...@gmail.com> wrote:
>>>>
>>>> 
>>>> 1) Device_id might be different for messages in a batch.
>>>> 2) It's a Streaming application. The IOT messages are getting read in a
>>>> Structured Streaming job in a "Stream". The Dataframe would need to be
>>>> updated every hour. Have you done something similar in the past? Do you
>>>> have an example to share?
>>>>
>>>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Can you please clarify:
>>>>>
>>>>>
>>>>>    1. The IOT messages in one batch have the same device_id or every
>>>>>    row has different device_id?
>>>>>    2. The RDBMS table can be read through JDBC in Spark and a
>>>>>    dataframe can be created on. Does that work for you? You do not really need
>>>>>    to stream the reference table.
>>>>>
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I would like to develop a Spark Structured Streaming job that reads
>>>>>> messages in a Stream which needs to be “joined” with another Stream of
>>>>>> “Reference” data.
>>>>>>
>>>>>> For example, let’s say I’m reading messages from Kafka coming in from
>>>>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE
>>>>>> table on a relational database. What I need to do is “join” the ‘device_id’
>>>>>> in the message with the ‘device_id’ on the table to enrich the incoming
>>>>>> message. Somewhere I read that, this can be done by joining two streams. I
>>>>>> guess, we can create a “Stream” that reads the DEVICE table once every hour
>>>>>> or so.
>>>>>>
>>>>>> Questions:
>>>>>> 1) Is this the right way to solve this use case?
>>>>>> 2) Should we use a Stateful Stream for reading DEVICE table with
>>>>>> State timeout set to an hour?
>>>>>> 3) What would happen while the DEVICE state is getting updated from
>>>>>> the table on the relational database?
>>>>>>
>>>>>> Guidance would be greatly appreciated. Thanks.
>>>>>>
>>>>>

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by Eric Beabes <ma...@gmail.com>.
I was looking for something like "Processing Time Temporal Join" in Flink
as described here:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#processing-time-temporal-join


On Mon, May 3, 2021 at 11:07 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> You are welcome Yuri. However, I stand corrected :)
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> ‪On Mon, 3 May 2021 at 19:02, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
> yurkao@gmail.com> wrote:‬
>
>> Always nice to learn something new about jdbc.
>> Thanks, Mich **thumbsup**
>>
>>
>> On 3 May 2021, at 20:54, Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>> 
>> i would have assumed that reference data like device_id are pretty static
>> so a snapshot will do.
>>
>> JDBC connection is lazy so it will not materialise until the join uses
>> it. Then data will be collected from the underlying RDBMS table for
>> COMMITED transactions
>>
>> However, this is something that I discussed in another thread
>>
>> *Spark Streaming with Files*
>>
>> There is an option that one can trigger once
>>
>>               result = streamingDataFrame.select( \
>>                      col("parsed_value.rowkey").alias("rowkey") \
>>                    , col("parsed_value.ticker").alias("ticker") \
>>                    , col("parsed_value.timeissued").alias("timeissued") \
>>                    , col("parsed_value.price").alias("price")). \
>>                      writeStream. \
>>                      outputMode('append'). \
>>                      option("truncate", "false"). \
>>                      foreachBatch(sendToSink). \
>>                      queryName('trailFiles'). \
>>                     * trigger(once = True). \*
>>  *                    option('checkpointLocation', checkpoint_path). \*
>>                      start(data_path)
>>
>> This means that the streaming job will run for all data connected and
>> terminate. In that case JDBC connection will be refreshed according to your
>> batch interval that restarts the streaming process for unprocessed data and
>> critically your JDBC snapshot will be updated as read
>>
>> This can be done through airflow etc. You won't lose data as the
>> checkpoint will mark processed records.
>>
>> That might be an option.
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
>> yurkao@gmail.com> wrote:‬
>>
>>> You can do the enrichment with stream(events)-static(device table) join
>>> when the device table is slow changing dimension (let’s say once a day
>>> change) and it’s in delta format, then for every micro batch with
>>> stream-static John the device table will be rescanned and up to date device
>>> data will be loaded.
>>>
>>> If device table is not slow dimension(once an hour change), then you’d
>>> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in
>>> Spark supports streaming mode.
>>> So I’d better sync jdbc with parquet/delta periodically in order to
>>> emulate streaming source
>>>
>>>
>>> On 3 May 2021, at 20:02, Eric Beabes <ma...@gmail.com> wrote:
>>>
>>> 
>>> 1) Device_id might be different for messages in a batch.
>>> 2) It's a Streaming application. The IOT messages are getting read in a
>>> Structured Streaming job in a "Stream". The Dataframe would need to be
>>> updated every hour. Have you done something similar in the past? Do you
>>> have an example to share?
>>>
>>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Can you please clarify:
>>>>
>>>>
>>>>    1. The IOT messages in one batch have the same device_id or every
>>>>    row has different device_id?
>>>>    2. The RDBMS table can be read through JDBC in Spark and a
>>>>    dataframe can be created on. Does that work for you? You do not really need
>>>>    to stream the reference table.
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com>
>>>> wrote:
>>>>
>>>>> I would like to develop a Spark Structured Streaming job that reads
>>>>> messages in a Stream which needs to be “joined” with another Stream of
>>>>> “Reference” data.
>>>>>
>>>>> For example, let’s say I’m reading messages from Kafka coming in from
>>>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE
>>>>> table on a relational database. What I need to do is “join” the ‘device_id’
>>>>> in the message with the ‘device_id’ on the table to enrich the incoming
>>>>> message. Somewhere I read that, this can be done by joining two streams. I
>>>>> guess, we can create a “Stream” that reads the DEVICE table once every hour
>>>>> or so.
>>>>>
>>>>> Questions:
>>>>> 1) Is this the right way to solve this use case?
>>>>> 2) Should we use a Stateful Stream for reading DEVICE table with State
>>>>> timeout set to an hour?
>>>>> 3) What would happen while the DEVICE state is getting updated from
>>>>> the table on the relational database?
>>>>>
>>>>> Guidance would be greatly appreciated. Thanks.
>>>>>
>>>>

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by Mich Talebzadeh <mi...@gmail.com>.
You are welcome Yuri. However, I stand corrected :)


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




‪On Mon, 3 May 2021 at 19:02, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yurkao@gmail.com> wrote:‬

> Always nice to learn something new about jdbc.
> Thanks, Mich **thumbsup**
>
>
> On 3 May 2021, at 20:54, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
> 
> i would have assumed that reference data like device_id are pretty static
> so a snapshot will do.
>
> JDBC connection is lazy so it will not materialise until the join uses it.
> Then data will be collected from the underlying RDBMS table for COMMITED
> transactions
>
> However, this is something that I discussed in another thread
>
> *Spark Streaming with Files*
>
> There is an option that one can trigger once
>
>               result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      foreachBatch(sendToSink). \
>                      queryName('trailFiles'). \
>                     * trigger(once = True). \*
>  *                    option('checkpointLocation', checkpoint_path). \*
>                      start(data_path)
>
> This means that the streaming job will run for all data connected and
> terminate. In that case JDBC connection will be refreshed according to your
> batch interval that restarts the streaming process for unprocessed data and
> critically your JDBC snapshot will be updated as read
>
> This can be done through airflow etc. You won't lose data as the
> checkpoint will mark processed records.
>
> That might be an option.
>
> HTH
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
> yurkao@gmail.com> wrote:‬
>
>> You can do the enrichment with stream(events)-static(device table) join
>> when the device table is slow changing dimension (let’s say once a day
>> change) and it’s in delta format, then for every micro batch with
>> stream-static John the device table will be rescanned and up to date device
>> data will be loaded.
>>
>> If device table is not slow dimension(once an hour change), then you’d
>> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in
>> Spark supports streaming mode.
>> So I’d better sync jdbc with parquet/delta periodically in order to
>> emulate streaming source
>>
>>
>> On 3 May 2021, at 20:02, Eric Beabes <ma...@gmail.com> wrote:
>>
>> 
>> 1) Device_id might be different for messages in a batch.
>> 2) It's a Streaming application. The IOT messages are getting read in a
>> Structured Streaming job in a "Stream". The Dataframe would need to be
>> updated every hour. Have you done something similar in the past? Do you
>> have an example to share?
>>
>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mi...@gmail.com>
>> wrote:
>>
>>> Can you please clarify:
>>>
>>>
>>>    1. The IOT messages in one batch have the same device_id or every
>>>    row has different device_id?
>>>    2. The RDBMS table can be read through JDBC in Spark and a dataframe
>>>    can be created on. Does that work for you? You do not really need to stream
>>>    the reference table.
>>>
>>>
>>> HTH
>>>
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com>
>>> wrote:
>>>
>>>> I would like to develop a Spark Structured Streaming job that reads
>>>> messages in a Stream which needs to be “joined” with another Stream of
>>>> “Reference” data.
>>>>
>>>> For example, let’s say I’m reading messages from Kafka coming in from
>>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE
>>>> table on a relational database. What I need to do is “join” the ‘device_id’
>>>> in the message with the ‘device_id’ on the table to enrich the incoming
>>>> message. Somewhere I read that, this can be done by joining two streams. I
>>>> guess, we can create a “Stream” that reads the DEVICE table once every hour
>>>> or so.
>>>>
>>>> Questions:
>>>> 1) Is this the right way to solve this use case?
>>>> 2) Should we use a Stateful Stream for reading DEVICE table with State
>>>> timeout set to an hour?
>>>> 3) What would happen while the DEVICE state is getting updated from the
>>>> table on the relational database?
>>>>
>>>> Guidance would be greatly appreciated. Thanks.
>>>>
>>>

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by "Yuri Oleynikov (‫יורי אולייניקוב‬‎)" <yu...@gmail.com>.
Always nice to learn something new about jdbc.
Thanks, Mich **thumbsup**


> On 3 May 2021, at 20:54, Mich Talebzadeh <mi...@gmail.com> wrote:
> 
> 
> i would have assumed that reference data like device_id are pretty static so a snapshot will do.
> 
> JDBC connection is lazy so it will not materialise until the join uses it. Then data will be collected from the underlying RDBMS table for COMMITED transactions
> 
> However, this is something that I discussed in another thread
> 
> Spark Streaming with Files
> 
> There is an option that one can trigger once 
> 
>               result = streamingDataFrame.select( \
>                      col("parsed_value.rowkey").alias("rowkey") \
>                    , col("parsed_value.ticker").alias("ticker") \
>                    , col("parsed_value.timeissued").alias("timeissued") \
>                    , col("parsed_value.price").alias("price")). \
>                      writeStream. \
>                      outputMode('append'). \
>                      option("truncate", "false"). \
>                      foreachBatch(sendToSink). \
>                      queryName('trailFiles'). \
>                      trigger(once = True). \
>                      option('checkpointLocation', checkpoint_path). \
>                      start(data_path)
> 
> This means that the streaming job will run for all data connected and terminate. In that case JDBC connection will be refreshed according to your batch interval that restarts the streaming process for unprocessed data and critically your JDBC snapshot will be updated as read
> 
> This can be done through airflow etc. You won't lose data as the checkpoint will mark processed records.
> 
> That might be an option.
> 
> HTH
> 
> 
>    view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>  
> 
> 
> ‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <yu...@gmail.com> wrote:‬
>> You can do the enrichment with stream(events)-static(device table) join when the device table is slow changing dimension (let’s say once a day change) and it’s in delta format, then for every micro batch with stream-static John the device table will be rescanned and up to date device data will be loaded.
>> 
>> If device table is not slow dimension(once an hour change), then you’d probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in Spark supports streaming mode.
>> So I’d better sync jdbc with parquet/delta periodically in order to emulate streaming source
>> 
>> 
>>>> On 3 May 2021, at 20:02, Eric Beabes <ma...@gmail.com> wrote:
>>>> 
>>> 
>>> 1) Device_id might be different for messages in a batch.
>>> 2) It's a Streaming application. The IOT messages are getting read in a Structured Streaming job in a "Stream". The Dataframe would need to be updated every hour. Have you done something similar in the past? Do you have an example to share?
>>> 
>>>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mi...@gmail.com> wrote:
>>>> Can you please clarify:
>>>> 
>>>> The IOT messages in one batch have the same device_id or every row has different device_id?
>>>> The RDBMS table can be read through JDBC in Spark and a dataframe can be created on. Does that work for you? You do not really need to stream the reference table. 
>>>> 
>>>> HTH
>>>> 
>>>>  
>>>> 
>>>>    view my Linkedin profile
>>>> 
>>>>  
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>>>  
>>>> 
>>>> 
>>>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com> wrote:
>>>>> I would like to develop a Spark Structured Streaming job that reads messages in a Stream which needs to be “joined” with another Stream of “Reference” data.
>>>>> 
>>>>> For example, let’s say I’m reading messages from Kafka coming in from (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE table on a relational database. What I need to do is “join” the ‘device_id’ in the message with the ‘device_id’ on the table to enrich the incoming message. Somewhere I read that, this can be done by joining two streams. I guess, we can create a “Stream” that reads the DEVICE table once every hour or so. 
>>>>> 
>>>>> Questions:
>>>>> 1) Is this the right way to solve this use case? 
>>>>> 2) Should we use a Stateful Stream for reading DEVICE table with State timeout set to an hour?
>>>>> 3) What would happen while the DEVICE state is getting updated from the table on the relational database?
>>>>> 
>>>>> Guidance would be greatly appreciated. Thanks.

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by Mich Talebzadeh <mi...@gmail.com>.
i would have assumed that reference data like device_id are pretty static
so a snapshot will do.

JDBC connection is lazy so it will not materialise until the join uses it.
Then data will be collected from the underlying RDBMS table for COMMITED
transactions

However, this is something that I discussed in another thread

*Spark Streaming with Files*

There is an option that one can trigger once

              result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(sendToSink). \
                     queryName('trailFiles'). \
                    * trigger(once = True). \*
 *                    option('checkpointLocation', checkpoint_path). \*
                     start(data_path)

This means that the streaming job will run for all data connected and
terminate. In that case JDBC connection will be refreshed according to your
batch interval that restarts the streaming process for unprocessed data and
critically your JDBC snapshot will be updated as read

This can be done through airflow etc. You won't lose data as the checkpoint
will mark processed records.

That might be an option.

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




‪On Mon, 3 May 2021 at 18:27, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yurkao@gmail.com> wrote:‬

> You can do the enrichment with stream(events)-static(device table) join
> when the device table is slow changing dimension (let’s say once a day
> change) and it’s in delta format, then for every micro batch with
> stream-static John the device table will be rescanned and up to date device
> data will be loaded.
>
> If device table is not slow dimension(once an hour change), then you’d
> probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in
> Spark supports streaming mode.
> So I’d better sync jdbc with parquet/delta periodically in order to
> emulate streaming source
>
>
> On 3 May 2021, at 20:02, Eric Beabes <ma...@gmail.com> wrote:
>
> 
> 1) Device_id might be different for messages in a batch.
> 2) It's a Streaming application. The IOT messages are getting read in a
> Structured Streaming job in a "Stream". The Dataframe would need to be
> updated every hour. Have you done something similar in the past? Do you
> have an example to share?
>
> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Can you please clarify:
>>
>>
>>    1. The IOT messages in one batch have the same device_id or every row
>>    has different device_id?
>>    2. The RDBMS table can be read through JDBC in Spark and a dataframe
>>    can be created on. Does that work for you? You do not really need to stream
>>    the reference table.
>>
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com>
>> wrote:
>>
>>> I would like to develop a Spark Structured Streaming job that reads
>>> messages in a Stream which needs to be “joined” with another Stream of
>>> “Reference” data.
>>>
>>> For example, let’s say I’m reading messages from Kafka coming in from
>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE
>>> table on a relational database. What I need to do is “join” the ‘device_id’
>>> in the message with the ‘device_id’ on the table to enrich the incoming
>>> message. Somewhere I read that, this can be done by joining two streams. I
>>> guess, we can create a “Stream” that reads the DEVICE table once every hour
>>> or so.
>>>
>>> Questions:
>>> 1) Is this the right way to solve this use case?
>>> 2) Should we use a Stateful Stream for reading DEVICE table with State
>>> timeout set to an hour?
>>> 3) What would happen while the DEVICE state is getting updated from the
>>> table on the relational database?
>>>
>>> Guidance would be greatly appreciated. Thanks.
>>>
>>

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by "Yuri Oleynikov (‫יורי אולייניקוב‬‎)" <yu...@gmail.com>.
You can do the enrichment with stream(events)-static(device table) join when the device table is slow changing dimension (let’s say once a day change) and it’s in delta format, then for every micro batch with stream-static John the device table will be rescanned and up to date device data will be loaded.

If device table is not slow dimension(once an hour change), then you’d probably need stream-stream join but I’m not sure if RDBMS (aka jdbc) in Spark supports streaming mode.
So I’d better sync jdbc with parquet/delta periodically in order to emulate streaming source


> On 3 May 2021, at 20:02, Eric Beabes <ma...@gmail.com> wrote:
> 
> 
> 1) Device_id might be different for messages in a batch.
> 2) It's a Streaming application. The IOT messages are getting read in a Structured Streaming job in a "Stream". The Dataframe would need to be updated every hour. Have you done something similar in the past? Do you have an example to share?
> 
>> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mi...@gmail.com> wrote:
>> Can you please clarify:
>> 
>> The IOT messages in one batch have the same device_id or every row has different device_id?
>> The RDBMS table can be read through JDBC in Spark and a dataframe can be created on. Does that work for you? You do not really need to stream the reference table. 
>> 
>> HTH
>> 
>>  
>> 
>>    view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
>>  
>> 
>> 
>>> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com> wrote:
>>> I would like to develop a Spark Structured Streaming job that reads messages in a Stream which needs to be “joined” with another Stream of “Reference” data.
>>> 
>>> For example, let’s say I’m reading messages from Kafka coming in from (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE table on a relational database. What I need to do is “join” the ‘device_id’ in the message with the ‘device_id’ on the table to enrich the incoming message. Somewhere I read that, this can be done by joining two streams. I guess, we can create a “Stream” that reads the DEVICE table once every hour or so. 
>>> 
>>> Questions:
>>> 1) Is this the right way to solve this use case? 
>>> 2) Should we use a Stateful Stream for reading DEVICE table with State timeout set to an hour?
>>> 3) What would happen while the DEVICE state is getting updated from the table on the relational database?
>>> 
>>> Guidance would be greatly appreciated. Thanks.

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by Mich Talebzadeh <mi...@gmail.com>.
Let us look at your kafka streams.

Say we just read them like below

first read data from the topic

        schema = StructType().add("rowkey", StringType()).add("ticker",
StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that
subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))

# then do writestream


           result = streamingDataFrame.select( \

                     col("parsed_value.rowkey").alias("rowkey") \

                   , col("parsed_value.ticker").alias("ticker") \

                   , col("parsed_value.timeissued").alias("timeissued") \

                   , col("parsed_value.price").alias("price")). \

                     writeStream. \

                     outputMode('append'). \

                     option("truncate", "false"). \

                     foreachBatch(SendToBigQuery). \

                     trigger(processingTime='2 seconds'). \

                     start()

        except Exception as e:

                print(f"""{e}, quitting""")

                sys.exit(1)


        result.awaitTermination()


Now you have microbatch every 2 seconds that calls method SendToBigQuery()
and that is what you need for your work


def SendToBigQuery(df, batchId):   ## this is where you are receiving your
message read in dataframe DF and the batchId


    if(len(df.take(1))) > 0:

        #df.printSchema()

        df. persist()

         spark_session = s.spark_session(config['common']['appName'])

        spark_session = s.setSparkConfBQ(spark_session)

        # read from BigQuery  that is your reference data that you are
reading from your RDBMS table

        read_df = s.loadTableFromBQ(spark_session,
config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])

        # look for high value tickers    This is the one where you need to
go through every row of streaming dataframe and get your device_id  (rowkey
is your device_id here)

        *for row in df.rdd.collect():*

            rowkey = row.rowkey

            ticker = row.ticker

            price = row.price

            values = bigQueryAverages(ticker,price,read_df)

            Average = values["average"]

            standardDeviation = values["standardDeviation"]

            lower = values["lower"]

            upper = values["upper"]

            if lower is not None and upper is not None:

              hvTicker = priceComparison(ticker,price,lower,upper)

              if(hvTicker == 1):

                 writeHighValueData(df,rowkey)

        df.unpersist()

    else:

        print("DataFrame is empty")


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 3 May 2021 at 18:01, Eric Beabes <ma...@gmail.com> wrote:

> 1) Device_id might be different for messages in a batch.
> 2) It's a Streaming application. The IOT messages are getting read in a
> Structured Streaming job in a "Stream". The Dataframe would need to be
> updated every hour. Have you done something similar in the past? Do you
> have an example to share?
>
> On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Can you please clarify:
>>
>>
>>    1. The IOT messages in one batch have the same device_id or every row
>>    has different device_id?
>>    2. The RDBMS table can be read through JDBC in Spark and a dataframe
>>    can be created on. Does that work for you? You do not really need to stream
>>    the reference table.
>>
>>
>> HTH
>>
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com>
>> wrote:
>>
>>> I would like to develop a Spark Structured Streaming job that reads
>>> messages in a Stream which needs to be “joined” with another Stream of
>>> “Reference” data.
>>>
>>> For example, let’s say I’m reading messages from Kafka coming in from
>>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE
>>> table on a relational database. What I need to do is “join” the ‘device_id’
>>> in the message with the ‘device_id’ on the table to enrich the incoming
>>> message. Somewhere I read that, this can be done by joining two streams. I
>>> guess, we can create a “Stream” that reads the DEVICE table once every hour
>>> or so.
>>>
>>> Questions:
>>> 1) Is this the right way to solve this use case?
>>> 2) Should we use a Stateful Stream for reading DEVICE table with State
>>> timeout set to an hour?
>>> 3) What would happen while the DEVICE state is getting updated from the
>>> table on the relational database?
>>>
>>> Guidance would be greatly appreciated. Thanks.
>>>
>>

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by Eric Beabes <ma...@gmail.com>.
1) Device_id might be different for messages in a batch.
2) It's a Streaming application. The IOT messages are getting read in a
Structured Streaming job in a "Stream". The Dataframe would need to be
updated every hour. Have you done something similar in the past? Do you
have an example to share?

On Mon, May 3, 2021 at 9:52 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Can you please clarify:
>
>
>    1. The IOT messages in one batch have the same device_id or every row
>    has different device_id?
>    2. The RDBMS table can be read through JDBC in Spark and a dataframe
>    can be created on. Does that work for you? You do not really need to stream
>    the reference table.
>
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com> wrote:
>
>> I would like to develop a Spark Structured Streaming job that reads
>> messages in a Stream which needs to be “joined” with another Stream of
>> “Reference” data.
>>
>> For example, let’s say I’m reading messages from Kafka coming in from
>> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE
>> table on a relational database. What I need to do is “join” the ‘device_id’
>> in the message with the ‘device_id’ on the table to enrich the incoming
>> message. Somewhere I read that, this can be done by joining two streams. I
>> guess, we can create a “Stream” that reads the DEVICE table once every hour
>> or so.
>>
>> Questions:
>> 1) Is this the right way to solve this use case?
>> 2) Should we use a Stateful Stream for reading DEVICE table with State
>> timeout set to an hour?
>> 3) What would happen while the DEVICE state is getting updated from the
>> table on the relational database?
>>
>> Guidance would be greatly appreciated. Thanks.
>>
>

Re: Stream which needs to be “joined” with another Stream of “Reference” data.

Posted by Mich Talebzadeh <mi...@gmail.com>.
Can you please clarify:


   1. The IOT messages in one batch have the same device_id or every row
   has different device_id?
   2. The RDBMS table can be read through JDBC in Spark and a dataframe can
   be created on. Does that work for you? You do not really need to stream the
   reference table.


HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 3 May 2021 at 17:37, Eric Beabes <ma...@gmail.com> wrote:

> I would like to develop a Spark Structured Streaming job that reads
> messages in a Stream which needs to be “joined” with another Stream of
> “Reference” data.
>
> For example, let’s say I’m reading messages from Kafka coming in from
> (lots of) IOT devices. This message has a ‘device_id’. We have a DEVICE
> table on a relational database. What I need to do is “join” the ‘device_id’
> in the message with the ‘device_id’ on the table to enrich the incoming
> message. Somewhere I read that, this can be done by joining two streams. I
> guess, we can create a “Stream” that reads the DEVICE table once every hour
> or so.
>
> Questions:
> 1) Is this the right way to solve this use case?
> 2) Should we use a Stateful Stream for reading DEVICE table with State
> timeout set to an hour?
> 3) What would happen while the DEVICE state is getting updated from the
> table on the relational database?
>
> Guidance would be greatly appreciated. Thanks.
>