You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Marco Costantini <ma...@rocketfncl.com> on 2023/04/24 21:55:58 UTC

What is the best way to organize a join within a foreach?

I have two tables: {users, orders}. In this example, let's say that for
each 1 User in the users table, there are 100000 Orders in the orders table.

I have to use pyspark to generate a statement of Orders for each User. So,
a single user will need his/her own list of Orders. Additionally, I need to
send this statement to the real-world user via email (for example).

My first intuition was to apply a DataFrame.foreach() on the users
DataFrame. This way, I can rely on the spark workers to handle the email
sending individually. However, I now do not know the best way to get each
User's Orders.

I will soon try the following (pseudo-code):

```
users_df = <my entire users DataFrame>
orders_df = <my entire orders DataFrame>

#this is poorly named for max understandability in this context
def foreach_function(row):
  user_id = row.user_id
  user_orders_df = orders_df.select(f'user_id = {user_id}')

  #here, I'd get any User info from 'row'
  #then, I'd convert all 'user_orders' to JSON
  #then, I'd prepare the email and send it

users_df.foreach(foreach_function)
```

It is my understanding that if I do my user-specific work in the foreach
function, I will capitalize on Spark's scalability when doing that work.
However, I am worried of two things:

If I take all Orders up front...

Will that work?
Will I be taking too much? Will I be taking Orders on partitions who won't
handle them (different User).

If I create the orders_df (filtered) within the foreach function...

Will it work?
Will that be too much IO to DB?

The question ultimately is: How can I achieve this goal efficiently?

I have not yet tried anything here. I am doing so as we speak, but am
suffering from choice-paralysis.

Please and thank you.

Re: What is the best way to organize a join within a foreach?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Again one try is worth many opinions. Try it and gather matrix from spark
UI and see how it performs.

On Wed, 26 Apr 2023 at 14:57, Marco Costantini <
marco.costantini@rocketfncl.com> wrote:

> Thanks team,
> Email was just an example. The point was to illustrate that some actions
> could be chained using Spark's foreach. In reality, this is an S3 write and
> a Kafka message production, which I think is quite reasonable for spark to
> do.
>
> To answer Ayan's first question. Yes, all a users orders, prepared for
> each and every user.
>
> Other than the remarks that email transmission is unwise (which I've now
> reminded is irrelevant) I am not seeing an alternative to using Spark's
> foreach. Unless, your proposal is for the Spark job to target 1 user, and
> just run the job 1000's of times taking the user_id as input. That doesn't
> sound attractive.
>
> Also, while we say that foreach is not optimal, I cannot find any evidence
> of it; neither here nor online. If there are any docs about the inner
> workings of this functionality, please pass them to me. I continue to
> search for them. Even late last night!
>
> Thanks for your help team,
> Marco.
>
> On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Indeed very valid points by Ayan. How email is going to handle 1000s of
>> records. As a solution architect I tend to replace. Users by customers and
>> for each order there must be products sort of many to many relationship. If
>> I was a customer I would also be interested in product details as
>> well.sending via email sounds like a Jurassic park solution 😗
>>
>> On Wed, 26 Apr 2023 at 10:24, ayan guha <gu...@gmail.com> wrote:
>>
>>> Adding to what Mitch said,
>>>
>>> 1. Are you trying to send statements of all orders to all users? Or the
>>> latest order only?
>>>
>>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>>> notification service or function. Spark should write to a queue (kafka,
>>> sqs...pick your choice here).
>>>
>>> Best regards
>>> Ayan
>>>
>>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Well OK in a nutshell you want the result set for every user prepared
>>>> and email to that user right.
>>>>
>>>> This is a form of ETL where those result sets need to be posted
>>>> somewhere. Say you create a table based on the result set prepared for each
>>>> user. You may have many raw target tables at the end of the first ETL. How
>>>> does this differ from using forEach? Performance wise forEach may not be
>>>> optimal.
>>>>
>>>> Can you take the sample tables and try your method?
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies Limited
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>>>> marco.costantini@rocketfncl.com> wrote:
>>>>
>>>>> Hi Mich,
>>>>> First, thank you for that. Great effort put into helping.
>>>>>
>>>>> Second, I don't think this tackles the technical challenge here. I
>>>>> understand the windowing as it serves those ranks you created, but I don't
>>>>> see how the ranks contribute to the solution.
>>>>> Third, the core of the challenge is about performing this kind of
>>>>> 'statement' but for all users. In this example we target Mich, but that
>>>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>>>> solve that one.
>>>>>
>>>>> Any thoughts on that? For me, the foreach is desirable because I can
>>>>> have the workers chain other actions to each iteration (send email, send
>>>>> HTTP request, etc).
>>>>>
>>>>> Thanks Mich,
>>>>> Marco.
>>>>>
>>>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Hi Marco,
>>>>>>
>>>>>> First thoughts.
>>>>>>
>>>>>> foreach() is an action operation that is to iterate/loop over each
>>>>>> element in the dataset, meaning cursor based. That is different from
>>>>>> operating over the dataset as a set which is far more efficient.
>>>>>>
>>>>>> So in your case as I understand it correctly, you want to get order
>>>>>> for each user (say Mich), convert the result set to json and send it to
>>>>>> Mich via email
>>>>>>
>>>>>> Let us try this based on sample data
>>>>>>
>>>>>> Put your csv files into HDFS directory
>>>>>>
>>>>>> hdfs dfs -put users.csv /data/stg/test
>>>>>> hdfs dfs -put orders.csv /data/stg/test
>>>>>>
>>>>>> Then create dataframes from csv files, create temp views and do a
>>>>>> join on result sets with some slicing and dicing on orders table
>>>>>>
>>>>>> #! /usr/bin/env python3
>>>>>> from __future__ import print_function
>>>>>> import sys
>>>>>> import findspark
>>>>>> findspark.init()
>>>>>> from pyspark.sql import SparkSession
>>>>>> from pyspark import SparkContext
>>>>>> from pyspark.sql import SQLContext, HiveContext
>>>>>> from pyspark.sql.window import Window
>>>>>>
>>>>>> def spark_session(appName):
>>>>>>   return SparkSession.builder \
>>>>>>         .appName(appName) \
>>>>>>         .enableHiveSupport() \
>>>>>>         .getOrCreate()
>>>>>>
>>>>>> def main():
>>>>>>     appName = "ORDERS"
>>>>>>     spark =spark_session(appName)
>>>>>>     # get the sample
>>>>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>>>>     users_df =
>>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>>> "true").option("header", "true").load(users_file)
>>>>>>     users_df.printSchema()
>>>>>>     """
>>>>>>     root
>>>>>>     |-- id: integer (nullable = true)
>>>>>>     |-- name: string (nullable = true)
>>>>>>     """
>>>>>>
>>>>>>     print(f"""\n Reading from  {users_file}\n""")
>>>>>>     users_df.show(5,False)
>>>>>>     orders_df =
>>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>>> "true").option("header", "true").load(orders_file)
>>>>>>     orders_df.printSchema()
>>>>>>     """
>>>>>>     root
>>>>>>     |-- id: integer (nullable = true)
>>>>>>     |-- description: string (nullable = true)
>>>>>>     |-- amount: double (nullable = true)
>>>>>>     |-- user_id: integer (nullable = true)
>>>>>>      """
>>>>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>>>>     orders_df.show(50,False)
>>>>>>     users_df.createOrReplaceTempView("users")
>>>>>>     orders_df.createOrReplaceTempView("orders")
>>>>>>     # Create a list of orders for each user
>>>>>>     print(f"""\n Doing a join on two temp views\n""")
>>>>>>
>>>>>>     sqltext = """
>>>>>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>>>>>     FROM
>>>>>>     (
>>>>>>     SELECT
>>>>>>             user_id AS user_id
>>>>>>         ,   id as order_id
>>>>>>         ,   description as description
>>>>>>         ,   amount AS amount
>>>>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount)
>>>>>> AS RANK
>>>>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>>>>> maxorders
>>>>>>     FROM orders
>>>>>>     ) t
>>>>>>     INNER JOIN users u ON t.user_id = u.id
>>>>>>     AND  u.name = 'Mich'
>>>>>>     ORDER BY t.order_id
>>>>>>     """
>>>>>>     spark.sql(sqltext).show(50)
>>>>>> if __name__ == '__main__':
>>>>>>     main()
>>>>>>
>>>>>> Final outcome displaying orders for user Mich
>>>>>>
>>>>>> Doing a join on two temp views
>>>>>>
>>>>>>  Doing a join on two temp views
>>>>>>
>>>>>> +----+--------+-----------------+------+---------+
>>>>>> |name|order_id|      description|amount|maxorders|
>>>>>> +----+--------+-----------------+------+---------+
>>>>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>>>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>>>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>>>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>>>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>>>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>>>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>>>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>>>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>>>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>>>>> +----+--------+-----------------+------+---------+
>>>>>>
>>>>>> You can start on this.  Happy coding
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>> Palantir Technologies Limited
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>
>>>>>>> Thanks Mich,
>>>>>>>
>>>>>>> Great idea. I have done it. Those files are attached. I'm interested
>>>>>>> to know your thoughts. Let's imagine this same structure, but with huge
>>>>>>> amounts of data as well.
>>>>>>>
>>>>>>> Please and thank you,
>>>>>>> Marco.
>>>>>>>
>>>>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Marco,
>>>>>>>>
>>>>>>>> Let us start simple,
>>>>>>>>
>>>>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>>>>
>>>>>>>> Also for each user_id, provide 10 rows of orders table, meaning
>>>>>>>> that orders table has 5 x 10 rows for each user_id.
>>>>>>>>
>>>>>>>> both as comma separated csv file
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>> Mich Talebzadeh,
>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>> Palantir Technologies Limited
>>>>>>>> London
>>>>>>>> United Kingdom
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Mich,
>>>>>>>>> I have not but I will certainly read up on this today.
>>>>>>>>>
>>>>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>>>>> questions on which someone may be able to shed some light.
>>>>>>>>>
>>>>>>>>> 1) If my 'orders' table is very large, and will need to be
>>>>>>>>> aggregated by 'user_id', how will Spark intelligently optimize on that
>>>>>>>>> constraint (only read data for relevent 'user_id's). Is that something I
>>>>>>>>> have to instruct Spark to do?
>>>>>>>>>
>>>>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>>>>> search too much?
>>>>>>>>>
>>>>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>>>>> you give them. Spark has become a pillar on my team and knowing it in more
>>>>>>>>> detail is warranted.
>>>>>>>>>
>>>>>>>>> Slightly pivoting the subject here; I have tried something. It was
>>>>>>>>> a suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>>>>> script I now have the line:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> grouped_orders_df =
>>>>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>>>>> ```
>>>>>>>>> (json is ultimately needed)
>>>>>>>>>
>>>>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>>>>> single Array column. Now my worry is, will this column become too large if
>>>>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>>>>> documentation on such a limit but could not find any.
>>>>>>>>>
>>>>>>>>> I truly appreciate your help Mich and team,
>>>>>>>>> Marco.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Have you thought of using  windowing function
>>>>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>>>>> achieve this?
>>>>>>>>>>
>>>>>>>>>> Effectively all your information is in the orders table.
>>>>>>>>>>
>>>>>>>>>> HTH
>>>>>>>>>>
>>>>>>>>>> Mich Talebzadeh,
>>>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>>>> Palantir Technologies Limited
>>>>>>>>>> London
>>>>>>>>>> United Kingdom
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I have two tables: {users, orders}. In this example, let's say
>>>>>>>>>>> that for each 1 User in the users table, there are 100000 Orders in the
>>>>>>>>>>> orders table.
>>>>>>>>>>>
>>>>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>>>>> User. So, a single user will need his/her own list of Orders. Additionally,
>>>>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>>>>> example).
>>>>>>>>>>>
>>>>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the
>>>>>>>>>>> users DataFrame. This way, I can rely on the spark workers to handle the
>>>>>>>>>>> email sending individually. However, I now do not know the best way to get
>>>>>>>>>>> each User's Orders.
>>>>>>>>>>>
>>>>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>>>>
>>>>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>>>>> def foreach_function(row):
>>>>>>>>>>>   user_id = row.user_id
>>>>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>>>>
>>>>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>>>>
>>>>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>>>>>>>> work. However, I am worried of two things:
>>>>>>>>>>>
>>>>>>>>>>> If I take all Orders up front...
>>>>>>>>>>>
>>>>>>>>>>> Will that work?
>>>>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>>>>> who won't handle them (different User).
>>>>>>>>>>>
>>>>>>>>>>> If I create the orders_df (filtered) within the foreach
>>>>>>>>>>> function...
>>>>>>>>>>>
>>>>>>>>>>> Will it work?
>>>>>>>>>>> Will that be too much IO to DB?
>>>>>>>>>>>
>>>>>>>>>>> The question ultimately is: How can I achieve this goal
>>>>>>>>>>> efficiently?
>>>>>>>>>>>
>>>>>>>>>>> I have not yet tried anything here. I am doing so as we speak,
>>>>>>>>>>> but am suffering from choice-paralysis.
>>>>>>>>>>>
>>>>>>>>>>> Please and thank you.
>>>>>>>>>>>
>>>>>>>>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>> --
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
> --
Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: What is the best way to organize a join within a foreach?

Posted by Amit Joshi <ma...@gmail.com>.
Hi Marco,

I am not sure if you will get access to data frame inside the for each, as
spark context used to be non serialized, if I remember correctly.

One thing you can do.
Use cogroup operation on both the dataset.
This will help you have (Key- iter(v1),itr(V2).
And then use for each partition for performing your task of converting to
json and more.

Thus performance wise, you can group batch per user records and also share
the same connection in each partition if needed.

Hope this will help.

Regards
Amit


On Wed, 26 Apr, 2023, 15:58 Marco Costantini, <
marco.costantini@rocketfncl.com> wrote:

> Thanks team,
> Email was just an example. The point was to illustrate that some actions
> could be chained using Spark's foreach. In reality, this is an S3 write and
> a Kafka message production, which I think is quite reasonable for spark to
> do.
>
> To answer Ayan's first question. Yes, all a users orders, prepared for
> each and every user.
>
> Other than the remarks that email transmission is unwise (which I've now
> reminded is irrelevant) I am not seeing an alternative to using Spark's
> foreach. Unless, your proposal is for the Spark job to target 1 user, and
> just run the job 1000's of times taking the user_id as input. That doesn't
> sound attractive.
>
> Also, while we say that foreach is not optimal, I cannot find any evidence
> of it; neither here nor online. If there are any docs about the inner
> workings of this functionality, please pass them to me. I continue to
> search for them. Even late last night!
>
> Thanks for your help team,
> Marco.
>
> On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Indeed very valid points by Ayan. How email is going to handle 1000s of
>> records. As a solution architect I tend to replace. Users by customers and
>> for each order there must be products sort of many to many relationship. If
>> I was a customer I would also be interested in product details as
>> well.sending via email sounds like a Jurassic park solution 😗
>>
>> On Wed, 26 Apr 2023 at 10:24, ayan guha <gu...@gmail.com> wrote:
>>
>>> Adding to what Mitch said,
>>>
>>> 1. Are you trying to send statements of all orders to all users? Or the
>>> latest order only?
>>>
>>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>>> notification service or function. Spark should write to a queue (kafka,
>>> sqs...pick your choice here).
>>>
>>> Best regards
>>> Ayan
>>>
>>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Well OK in a nutshell you want the result set for every user prepared
>>>> and email to that user right.
>>>>
>>>> This is a form of ETL where those result sets need to be posted
>>>> somewhere. Say you create a table based on the result set prepared for each
>>>> user. You may have many raw target tables at the end of the first ETL. How
>>>> does this differ from using forEach? Performance wise forEach may not be
>>>> optimal.
>>>>
>>>> Can you take the sample tables and try your method?
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies Limited
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>>>> marco.costantini@rocketfncl.com> wrote:
>>>>
>>>>> Hi Mich,
>>>>> First, thank you for that. Great effort put into helping.
>>>>>
>>>>> Second, I don't think this tackles the technical challenge here. I
>>>>> understand the windowing as it serves those ranks you created, but I don't
>>>>> see how the ranks contribute to the solution.
>>>>> Third, the core of the challenge is about performing this kind of
>>>>> 'statement' but for all users. In this example we target Mich, but that
>>>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>>>> solve that one.
>>>>>
>>>>> Any thoughts on that? For me, the foreach is desirable because I can
>>>>> have the workers chain other actions to each iteration (send email, send
>>>>> HTTP request, etc).
>>>>>
>>>>> Thanks Mich,
>>>>> Marco.
>>>>>
>>>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Hi Marco,
>>>>>>
>>>>>> First thoughts.
>>>>>>
>>>>>> foreach() is an action operation that is to iterate/loop over each
>>>>>> element in the dataset, meaning cursor based. That is different from
>>>>>> operating over the dataset as a set which is far more efficient.
>>>>>>
>>>>>> So in your case as I understand it correctly, you want to get order
>>>>>> for each user (say Mich), convert the result set to json and send it to
>>>>>> Mich via email
>>>>>>
>>>>>> Let us try this based on sample data
>>>>>>
>>>>>> Put your csv files into HDFS directory
>>>>>>
>>>>>> hdfs dfs -put users.csv /data/stg/test
>>>>>> hdfs dfs -put orders.csv /data/stg/test
>>>>>>
>>>>>> Then create dataframes from csv files, create temp views and do a
>>>>>> join on result sets with some slicing and dicing on orders table
>>>>>>
>>>>>> #! /usr/bin/env python3
>>>>>> from __future__ import print_function
>>>>>> import sys
>>>>>> import findspark
>>>>>> findspark.init()
>>>>>> from pyspark.sql import SparkSession
>>>>>> from pyspark import SparkContext
>>>>>> from pyspark.sql import SQLContext, HiveContext
>>>>>> from pyspark.sql.window import Window
>>>>>>
>>>>>> def spark_session(appName):
>>>>>>   return SparkSession.builder \
>>>>>>         .appName(appName) \
>>>>>>         .enableHiveSupport() \
>>>>>>         .getOrCreate()
>>>>>>
>>>>>> def main():
>>>>>>     appName = "ORDERS"
>>>>>>     spark =spark_session(appName)
>>>>>>     # get the sample
>>>>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>>>>     users_df =
>>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>>> "true").option("header", "true").load(users_file)
>>>>>>     users_df.printSchema()
>>>>>>     """
>>>>>>     root
>>>>>>     |-- id: integer (nullable = true)
>>>>>>     |-- name: string (nullable = true)
>>>>>>     """
>>>>>>
>>>>>>     print(f"""\n Reading from  {users_file}\n""")
>>>>>>     users_df.show(5,False)
>>>>>>     orders_df =
>>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>>> "true").option("header", "true").load(orders_file)
>>>>>>     orders_df.printSchema()
>>>>>>     """
>>>>>>     root
>>>>>>     |-- id: integer (nullable = true)
>>>>>>     |-- description: string (nullable = true)
>>>>>>     |-- amount: double (nullable = true)
>>>>>>     |-- user_id: integer (nullable = true)
>>>>>>      """
>>>>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>>>>     orders_df.show(50,False)
>>>>>>     users_df.createOrReplaceTempView("users")
>>>>>>     orders_df.createOrReplaceTempView("orders")
>>>>>>     # Create a list of orders for each user
>>>>>>     print(f"""\n Doing a join on two temp views\n""")
>>>>>>
>>>>>>     sqltext = """
>>>>>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>>>>>     FROM
>>>>>>     (
>>>>>>     SELECT
>>>>>>             user_id AS user_id
>>>>>>         ,   id as order_id
>>>>>>         ,   description as description
>>>>>>         ,   amount AS amount
>>>>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount)
>>>>>> AS RANK
>>>>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>>>>> maxorders
>>>>>>     FROM orders
>>>>>>     ) t
>>>>>>     INNER JOIN users u ON t.user_id = u.id
>>>>>>     AND  u.name = 'Mich'
>>>>>>     ORDER BY t.order_id
>>>>>>     """
>>>>>>     spark.sql(sqltext).show(50)
>>>>>> if __name__ == '__main__':
>>>>>>     main()
>>>>>>
>>>>>> Final outcome displaying orders for user Mich
>>>>>>
>>>>>> Doing a join on two temp views
>>>>>>
>>>>>>  Doing a join on two temp views
>>>>>>
>>>>>> +----+--------+-----------------+------+---------+
>>>>>> |name|order_id|      description|amount|maxorders|
>>>>>> +----+--------+-----------------+------+---------+
>>>>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>>>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>>>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>>>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>>>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>>>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>>>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>>>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>>>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>>>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>>>>> +----+--------+-----------------+------+---------+
>>>>>>
>>>>>> You can start on this.  Happy coding
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>> Palantir Technologies Limited
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>
>>>>>>> Thanks Mich,
>>>>>>>
>>>>>>> Great idea. I have done it. Those files are attached. I'm interested
>>>>>>> to know your thoughts. Let's imagine this same structure, but with huge
>>>>>>> amounts of data as well.
>>>>>>>
>>>>>>> Please and thank you,
>>>>>>> Marco.
>>>>>>>
>>>>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Marco,
>>>>>>>>
>>>>>>>> Let us start simple,
>>>>>>>>
>>>>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>>>>
>>>>>>>> Also for each user_id, provide 10 rows of orders table, meaning
>>>>>>>> that orders table has 5 x 10 rows for each user_id.
>>>>>>>>
>>>>>>>> both as comma separated csv file
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>> Mich Talebzadeh,
>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>> Palantir Technologies Limited
>>>>>>>> London
>>>>>>>> United Kingdom
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks Mich,
>>>>>>>>> I have not but I will certainly read up on this today.
>>>>>>>>>
>>>>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>>>>> questions on which someone may be able to shed some light.
>>>>>>>>>
>>>>>>>>> 1) If my 'orders' table is very large, and will need to be
>>>>>>>>> aggregated by 'user_id', how will Spark intelligently optimize on that
>>>>>>>>> constraint (only read data for relevent 'user_id's). Is that something I
>>>>>>>>> have to instruct Spark to do?
>>>>>>>>>
>>>>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>>>>> search too much?
>>>>>>>>>
>>>>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>>>>> you give them. Spark has become a pillar on my team and knowing it in more
>>>>>>>>> detail is warranted.
>>>>>>>>>
>>>>>>>>> Slightly pivoting the subject here; I have tried something. It was
>>>>>>>>> a suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>>>>> script I now have the line:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> grouped_orders_df =
>>>>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>>>>> ```
>>>>>>>>> (json is ultimately needed)
>>>>>>>>>
>>>>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>>>>> single Array column. Now my worry is, will this column become too large if
>>>>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>>>>> documentation on such a limit but could not find any.
>>>>>>>>>
>>>>>>>>> I truly appreciate your help Mich and team,
>>>>>>>>> Marco.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Have you thought of using  windowing function
>>>>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>>>>> achieve this?
>>>>>>>>>>
>>>>>>>>>> Effectively all your information is in the orders table.
>>>>>>>>>>
>>>>>>>>>> HTH
>>>>>>>>>>
>>>>>>>>>> Mich Talebzadeh,
>>>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>>>> Palantir Technologies Limited
>>>>>>>>>> London
>>>>>>>>>> United Kingdom
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction of data or any other
>>>>>>>>>> property which may arise from relying on this email's technical content is
>>>>>>>>>> explicitly disclaimed. The author will in no case be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I have two tables: {users, orders}. In this example, let's say
>>>>>>>>>>> that for each 1 User in the users table, there are 100000 Orders in the
>>>>>>>>>>> orders table.
>>>>>>>>>>>
>>>>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>>>>> User. So, a single user will need his/her own list of Orders. Additionally,
>>>>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>>>>> example).
>>>>>>>>>>>
>>>>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the
>>>>>>>>>>> users DataFrame. This way, I can rely on the spark workers to handle the
>>>>>>>>>>> email sending individually. However, I now do not know the best way to get
>>>>>>>>>>> each User's Orders.
>>>>>>>>>>>
>>>>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>>>>
>>>>>>>>>>> ```
>>>>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>>>>
>>>>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>>>>> def foreach_function(row):
>>>>>>>>>>>   user_id = row.user_id
>>>>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>>>>
>>>>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>>>>
>>>>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>>>>> ```
>>>>>>>>>>>
>>>>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>>>>>>>> work. However, I am worried of two things:
>>>>>>>>>>>
>>>>>>>>>>> If I take all Orders up front...
>>>>>>>>>>>
>>>>>>>>>>> Will that work?
>>>>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>>>>> who won't handle them (different User).
>>>>>>>>>>>
>>>>>>>>>>> If I create the orders_df (filtered) within the foreach
>>>>>>>>>>> function...
>>>>>>>>>>>
>>>>>>>>>>> Will it work?
>>>>>>>>>>> Will that be too much IO to DB?
>>>>>>>>>>>
>>>>>>>>>>> The question ultimately is: How can I achieve this goal
>>>>>>>>>>> efficiently?
>>>>>>>>>>>
>>>>>>>>>>> I have not yet tried anything here. I am doing so as we speak,
>>>>>>>>>>> but am suffering from choice-paralysis.
>>>>>>>>>>>
>>>>>>>>>>> Please and thank you.
>>>>>>>>>>>
>>>>>>>>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>> --
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>

Re: What is the best way to organize a join within a foreach?

Posted by Marco Costantini <ma...@rocketfncl.com>.
Thanks team,
Email was just an example. The point was to illustrate that some actions
could be chained using Spark's foreach. In reality, this is an S3 write and
a Kafka message production, which I think is quite reasonable for spark to
do.

To answer Ayan's first question. Yes, all a users orders, prepared for each
and every user.

Other than the remarks that email transmission is unwise (which I've now
reminded is irrelevant) I am not seeing an alternative to using Spark's
foreach. Unless, your proposal is for the Spark job to target 1 user, and
just run the job 1000's of times taking the user_id as input. That doesn't
sound attractive.

Also, while we say that foreach is not optimal, I cannot find any evidence
of it; neither here nor online. If there are any docs about the inner
workings of this functionality, please pass them to me. I continue to
search for them. Even late last night!

Thanks for your help team,
Marco.

On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Indeed very valid points by Ayan. How email is going to handle 1000s of
> records. As a solution architect I tend to replace. Users by customers and
> for each order there must be products sort of many to many relationship. If
> I was a customer I would also be interested in product details as
> well.sending via email sounds like a Jurassic park solution 😗
>
> On Wed, 26 Apr 2023 at 10:24, ayan guha <gu...@gmail.com> wrote:
>
>> Adding to what Mitch said,
>>
>> 1. Are you trying to send statements of all orders to all users? Or the
>> latest order only?
>>
>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>> notification service or function. Spark should write to a queue (kafka,
>> sqs...pick your choice here).
>>
>> Best regards
>> Ayan
>>
>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Well OK in a nutshell you want the result set for every user prepared
>>> and email to that user right.
>>>
>>> This is a form of ETL where those result sets need to be posted
>>> somewhere. Say you create a table based on the result set prepared for each
>>> user. You may have many raw target tables at the end of the first ETL. How
>>> does this differ from using forEach? Performance wise forEach may not be
>>> optimal.
>>>
>>> Can you take the sample tables and try your method?
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>>> marco.costantini@rocketfncl.com> wrote:
>>>
>>>> Hi Mich,
>>>> First, thank you for that. Great effort put into helping.
>>>>
>>>> Second, I don't think this tackles the technical challenge here. I
>>>> understand the windowing as it serves those ranks you created, but I don't
>>>> see how the ranks contribute to the solution.
>>>> Third, the core of the challenge is about performing this kind of
>>>> 'statement' but for all users. In this example we target Mich, but that
>>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>>> solve that one.
>>>>
>>>> Any thoughts on that? For me, the foreach is desirable because I can
>>>> have the workers chain other actions to each iteration (send email, send
>>>> HTTP request, etc).
>>>>
>>>> Thanks Mich,
>>>> Marco.
>>>>
>>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> First thoughts.
>>>>>
>>>>> foreach() is an action operation that is to iterate/loop over each
>>>>> element in the dataset, meaning cursor based. That is different from
>>>>> operating over the dataset as a set which is far more efficient.
>>>>>
>>>>> So in your case as I understand it correctly, you want to get order
>>>>> for each user (say Mich), convert the result set to json and send it to
>>>>> Mich via email
>>>>>
>>>>> Let us try this based on sample data
>>>>>
>>>>> Put your csv files into HDFS directory
>>>>>
>>>>> hdfs dfs -put users.csv /data/stg/test
>>>>> hdfs dfs -put orders.csv /data/stg/test
>>>>>
>>>>> Then create dataframes from csv files, create temp views and do a join
>>>>> on result sets with some slicing and dicing on orders table
>>>>>
>>>>> #! /usr/bin/env python3
>>>>> from __future__ import print_function
>>>>> import sys
>>>>> import findspark
>>>>> findspark.init()
>>>>> from pyspark.sql import SparkSession
>>>>> from pyspark import SparkContext
>>>>> from pyspark.sql import SQLContext, HiveContext
>>>>> from pyspark.sql.window import Window
>>>>>
>>>>> def spark_session(appName):
>>>>>   return SparkSession.builder \
>>>>>         .appName(appName) \
>>>>>         .enableHiveSupport() \
>>>>>         .getOrCreate()
>>>>>
>>>>> def main():
>>>>>     appName = "ORDERS"
>>>>>     spark =spark_session(appName)
>>>>>     # get the sample
>>>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>>>     users_df =
>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>> "true").option("header", "true").load(users_file)
>>>>>     users_df.printSchema()
>>>>>     """
>>>>>     root
>>>>>     |-- id: integer (nullable = true)
>>>>>     |-- name: string (nullable = true)
>>>>>     """
>>>>>
>>>>>     print(f"""\n Reading from  {users_file}\n""")
>>>>>     users_df.show(5,False)
>>>>>     orders_df =
>>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>>> "true").option("header", "true").load(orders_file)
>>>>>     orders_df.printSchema()
>>>>>     """
>>>>>     root
>>>>>     |-- id: integer (nullable = true)
>>>>>     |-- description: string (nullable = true)
>>>>>     |-- amount: double (nullable = true)
>>>>>     |-- user_id: integer (nullable = true)
>>>>>      """
>>>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>>>     orders_df.show(50,False)
>>>>>     users_df.createOrReplaceTempView("users")
>>>>>     orders_df.createOrReplaceTempView("orders")
>>>>>     # Create a list of orders for each user
>>>>>     print(f"""\n Doing a join on two temp views\n""")
>>>>>
>>>>>     sqltext = """
>>>>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>>>>     FROM
>>>>>     (
>>>>>     SELECT
>>>>>             user_id AS user_id
>>>>>         ,   id as order_id
>>>>>         ,   description as description
>>>>>         ,   amount AS amount
>>>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>>>>> RANK
>>>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>>>> maxorders
>>>>>     FROM orders
>>>>>     ) t
>>>>>     INNER JOIN users u ON t.user_id = u.id
>>>>>     AND  u.name = 'Mich'
>>>>>     ORDER BY t.order_id
>>>>>     """
>>>>>     spark.sql(sqltext).show(50)
>>>>> if __name__ == '__main__':
>>>>>     main()
>>>>>
>>>>> Final outcome displaying orders for user Mich
>>>>>
>>>>> Doing a join on two temp views
>>>>>
>>>>>  Doing a join on two temp views
>>>>>
>>>>> +----+--------+-----------------+------+---------+
>>>>> |name|order_id|      description|amount|maxorders|
>>>>> +----+--------+-----------------+------+---------+
>>>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>>>> +----+--------+-----------------+------+---------+
>>>>>
>>>>> You can start on this.  Happy coding
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies Limited
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>
>>>>>> Thanks Mich,
>>>>>>
>>>>>> Great idea. I have done it. Those files are attached. I'm interested
>>>>>> to know your thoughts. Let's imagine this same structure, but with huge
>>>>>> amounts of data as well.
>>>>>>
>>>>>> Please and thank you,
>>>>>> Marco.
>>>>>>
>>>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Marco,
>>>>>>>
>>>>>>> Let us start simple,
>>>>>>>
>>>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>>>
>>>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>>>
>>>>>>> both as comma separated csv file
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>> Palantir Technologies Limited
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>
>>>>>>>> Thanks Mich,
>>>>>>>> I have not but I will certainly read up on this today.
>>>>>>>>
>>>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>>>> questions on which someone may be able to shed some light.
>>>>>>>>
>>>>>>>> 1) If my 'orders' table is very large, and will need to be
>>>>>>>> aggregated by 'user_id', how will Spark intelligently optimize on that
>>>>>>>> constraint (only read data for relevent 'user_id's). Is that something I
>>>>>>>> have to instruct Spark to do?
>>>>>>>>
>>>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>>>> search too much?
>>>>>>>>
>>>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>>>> you give them. Spark has become a pillar on my team and knowing it in more
>>>>>>>> detail is warranted.
>>>>>>>>
>>>>>>>> Slightly pivoting the subject here; I have tried something. It was
>>>>>>>> a suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>>>> script I now have the line:
>>>>>>>>
>>>>>>>> ```
>>>>>>>> grouped_orders_df =
>>>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>>>> ```
>>>>>>>> (json is ultimately needed)
>>>>>>>>
>>>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>>>> single Array column. Now my worry is, will this column become too large if
>>>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>>>> documentation on such a limit but could not find any.
>>>>>>>>
>>>>>>>> I truly appreciate your help Mich and team,
>>>>>>>> Marco.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Have you thought of using  windowing function
>>>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>>>> achieve this?
>>>>>>>>>
>>>>>>>>> Effectively all your information is in the orders table.
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>> Mich Talebzadeh,
>>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>>> Palantir Technologies Limited
>>>>>>>>> London
>>>>>>>>> United Kingdom
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>>>
>>>>>>>>>> I have two tables: {users, orders}. In this example, let's say
>>>>>>>>>> that for each 1 User in the users table, there are 100000 Orders in the
>>>>>>>>>> orders table.
>>>>>>>>>>
>>>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>>>> User. So, a single user will need his/her own list of Orders. Additionally,
>>>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>>>> example).
>>>>>>>>>>
>>>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the
>>>>>>>>>> users DataFrame. This way, I can rely on the spark workers to handle the
>>>>>>>>>> email sending individually. However, I now do not know the best way to get
>>>>>>>>>> each User's Orders.
>>>>>>>>>>
>>>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>>>
>>>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>>>> def foreach_function(row):
>>>>>>>>>>   user_id = row.user_id
>>>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>>>
>>>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>>>
>>>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>>>>>>> work. However, I am worried of two things:
>>>>>>>>>>
>>>>>>>>>> If I take all Orders up front...
>>>>>>>>>>
>>>>>>>>>> Will that work?
>>>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>>>> who won't handle them (different User).
>>>>>>>>>>
>>>>>>>>>> If I create the orders_df (filtered) within the foreach
>>>>>>>>>> function...
>>>>>>>>>>
>>>>>>>>>> Will it work?
>>>>>>>>>> Will that be too much IO to DB?
>>>>>>>>>>
>>>>>>>>>> The question ultimately is: How can I achieve this goal
>>>>>>>>>> efficiently?
>>>>>>>>>>
>>>>>>>>>> I have not yet tried anything here. I am doing so as we speak,
>>>>>>>>>> but am suffering from choice-paralysis.
>>>>>>>>>>
>>>>>>>>>> Please and thank you.
>>>>>>>>>>
>>>>>>>>> --
>> Best Regards,
>> Ayan Guha
>>
> --
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>

Re: What is the best way to organize a join within a foreach?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Indeed very valid points by Ayan. How email is going to handle 1000s of
records. As a solution architect I tend to replace. Users by customers and
for each order there must be products sort of many to many relationship. If
I was a customer I would also be interested in product details as
well.sending via email sounds like a Jurassic park solution 😗

On Wed, 26 Apr 2023 at 10:24, ayan guha <gu...@gmail.com> wrote:

> Adding to what Mitch said,
>
> 1. Are you trying to send statements of all orders to all users? Or the
> latest order only?
>
> 2. Sending email is not a good use of spark. instead, I suggest to use a
> notification service or function. Spark should write to a queue (kafka,
> sqs...pick your choice here).
>
> Best regards
> Ayan
>
> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Well OK in a nutshell you want the result set for every user prepared and
>> email to that user right.
>>
>> This is a form of ETL where those result sets need to be posted
>> somewhere. Say you create a table based on the result set prepared for each
>> user. You may have many raw target tables at the end of the first ETL. How
>> does this differ from using forEach? Performance wise forEach may not be
>> optimal.
>>
>> Can you take the sample tables and try your method?
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>> marco.costantini@rocketfncl.com> wrote:
>>
>>> Hi Mich,
>>> First, thank you for that. Great effort put into helping.
>>>
>>> Second, I don't think this tackles the technical challenge here. I
>>> understand the windowing as it serves those ranks you created, but I don't
>>> see how the ranks contribute to the solution.
>>> Third, the core of the challenge is about performing this kind of
>>> 'statement' but for all users. In this example we target Mich, but that
>>> reduces the complexity by a lot! In fact, a simple join and filter would
>>> solve that one.
>>>
>>> Any thoughts on that? For me, the foreach is desirable because I can
>>> have the workers chain other actions to each iteration (send email, send
>>> HTTP request, etc).
>>>
>>> Thanks Mich,
>>> Marco.
>>>
>>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Hi Marco,
>>>>
>>>> First thoughts.
>>>>
>>>> foreach() is an action operation that is to iterate/loop over each
>>>> element in the dataset, meaning cursor based. That is different from
>>>> operating over the dataset as a set which is far more efficient.
>>>>
>>>> So in your case as I understand it correctly, you want to get order for
>>>> each user (say Mich), convert the result set to json and send it to Mich
>>>> via email
>>>>
>>>> Let us try this based on sample data
>>>>
>>>> Put your csv files into HDFS directory
>>>>
>>>> hdfs dfs -put users.csv /data/stg/test
>>>> hdfs dfs -put orders.csv /data/stg/test
>>>>
>>>> Then create dataframes from csv files, create temp views and do a join
>>>> on result sets with some slicing and dicing on orders table
>>>>
>>>> #! /usr/bin/env python3
>>>> from __future__ import print_function
>>>> import sys
>>>> import findspark
>>>> findspark.init()
>>>> from pyspark.sql import SparkSession
>>>> from pyspark import SparkContext
>>>> from pyspark.sql import SQLContext, HiveContext
>>>> from pyspark.sql.window import Window
>>>>
>>>> def spark_session(appName):
>>>>   return SparkSession.builder \
>>>>         .appName(appName) \
>>>>         .enableHiveSupport() \
>>>>         .getOrCreate()
>>>>
>>>> def main():
>>>>     appName = "ORDERS"
>>>>     spark =spark_session(appName)
>>>>     # get the sample
>>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>>     users_df =
>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>> "true").option("header", "true").load(users_file)
>>>>     users_df.printSchema()
>>>>     """
>>>>     root
>>>>     |-- id: integer (nullable = true)
>>>>     |-- name: string (nullable = true)
>>>>     """
>>>>
>>>>     print(f"""\n Reading from  {users_file}\n""")
>>>>     users_df.show(5,False)
>>>>     orders_df =
>>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>>> "true").option("header", "true").load(orders_file)
>>>>     orders_df.printSchema()
>>>>     """
>>>>     root
>>>>     |-- id: integer (nullable = true)
>>>>     |-- description: string (nullable = true)
>>>>     |-- amount: double (nullable = true)
>>>>     |-- user_id: integer (nullable = true)
>>>>      """
>>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>>     orders_df.show(50,False)
>>>>     users_df.createOrReplaceTempView("users")
>>>>     orders_df.createOrReplaceTempView("orders")
>>>>     # Create a list of orders for each user
>>>>     print(f"""\n Doing a join on two temp views\n""")
>>>>
>>>>     sqltext = """
>>>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>>>     FROM
>>>>     (
>>>>     SELECT
>>>>             user_id AS user_id
>>>>         ,   id as order_id
>>>>         ,   description as description
>>>>         ,   amount AS amount
>>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>>>> RANK
>>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>>> maxorders
>>>>     FROM orders
>>>>     ) t
>>>>     INNER JOIN users u ON t.user_id = u.id
>>>>     AND  u.name = 'Mich'
>>>>     ORDER BY t.order_id
>>>>     """
>>>>     spark.sql(sqltext).show(50)
>>>> if __name__ == '__main__':
>>>>     main()
>>>>
>>>> Final outcome displaying orders for user Mich
>>>>
>>>> Doing a join on two temp views
>>>>
>>>>  Doing a join on two temp views
>>>>
>>>> +----+--------+-----------------+------+---------+
>>>> |name|order_id|      description|amount|maxorders|
>>>> +----+--------+-----------------+------+---------+
>>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>>> +----+--------+-----------------+------+---------+
>>>>
>>>> You can start on this.  Happy coding
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies Limited
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>>>> marco.costantini@rocketfncl.com> wrote:
>>>>
>>>>> Thanks Mich,
>>>>>
>>>>> Great idea. I have done it. Those files are attached. I'm interested
>>>>> to know your thoughts. Let's imagine this same structure, but with huge
>>>>> amounts of data as well.
>>>>>
>>>>> Please and thank you,
>>>>> Marco.
>>>>>
>>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Hi Marco,
>>>>>>
>>>>>> Let us start simple,
>>>>>>
>>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>>
>>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>>
>>>>>> both as comma separated csv file
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>> Palantir Technologies Limited
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>
>>>>>>> Thanks Mich,
>>>>>>> I have not but I will certainly read up on this today.
>>>>>>>
>>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>>> questions on which someone may be able to shed some light.
>>>>>>>
>>>>>>> 1) If my 'orders' table is very large, and will need to be
>>>>>>> aggregated by 'user_id', how will Spark intelligently optimize on that
>>>>>>> constraint (only read data for relevent 'user_id's). Is that something I
>>>>>>> have to instruct Spark to do?
>>>>>>>
>>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>>> search too much?
>>>>>>>
>>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>>> you give them. Spark has become a pillar on my team and knowing it in more
>>>>>>> detail is warranted.
>>>>>>>
>>>>>>> Slightly pivoting the subject here; I have tried something. It was a
>>>>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>>> script I now have the line:
>>>>>>>
>>>>>>> ```
>>>>>>> grouped_orders_df =
>>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>>> ```
>>>>>>> (json is ultimately needed)
>>>>>>>
>>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>>> single Array column. Now my worry is, will this column become too large if
>>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>>> documentation on such a limit but could not find any.
>>>>>>>
>>>>>>> I truly appreciate your help Mich and team,
>>>>>>> Marco.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> Have you thought of using  windowing function
>>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>>> achieve this?
>>>>>>>>
>>>>>>>> Effectively all your information is in the orders table.
>>>>>>>>
>>>>>>>> HTH
>>>>>>>>
>>>>>>>> Mich Talebzadeh,
>>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>>> Palantir Technologies Limited
>>>>>>>> London
>>>>>>>> United Kingdom
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>>
>>>>>>>>> I have two tables: {users, orders}. In this example, let's say
>>>>>>>>> that for each 1 User in the users table, there are 100000 Orders in the
>>>>>>>>> orders table.
>>>>>>>>>
>>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>>> User. So, a single user will need his/her own list of Orders. Additionally,
>>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>>> example).
>>>>>>>>>
>>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the email
>>>>>>>>> sending individually. However, I now do not know the best way to get each
>>>>>>>>> User's Orders.
>>>>>>>>>
>>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>>
>>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>>> def foreach_function(row):
>>>>>>>>>   user_id = row.user_id
>>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>>
>>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>>
>>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>>>>>> work. However, I am worried of two things:
>>>>>>>>>
>>>>>>>>> If I take all Orders up front...
>>>>>>>>>
>>>>>>>>> Will that work?
>>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>>> who won't handle them (different User).
>>>>>>>>>
>>>>>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>>>>>
>>>>>>>>> Will it work?
>>>>>>>>> Will that be too much IO to DB?
>>>>>>>>>
>>>>>>>>> The question ultimately is: How can I achieve this goal
>>>>>>>>> efficiently?
>>>>>>>>>
>>>>>>>>> I have not yet tried anything here. I am doing so as we speak, but
>>>>>>>>> am suffering from choice-paralysis.
>>>>>>>>>
>>>>>>>>> Please and thank you.
>>>>>>>>>
>>>>>>>> --
> Best Regards,
> Ayan Guha
>
-- 
Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.

Re: What is the best way to organize a join within a foreach?

Posted by ayan guha <gu...@gmail.com>.
Adding to what Mitch said,

1. Are you trying to send statements of all orders to all users? Or the
latest order only?

2. Sending email is not a good use of spark. instead, I suggest to use a
notification service or function. Spark should write to a queue (kafka,
sqs...pick your choice here).

Best regards
Ayan

On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <mi...@gmail.com>
wrote:

> Well OK in a nutshell you want the result set for every user prepared and
> email to that user right.
>
> This is a form of ETL where those result sets need to be posted somewhere.
> Say you create a table based on the result set prepared for each user. You
> may have many raw target tables at the end of the first ETL. How does this
> differ from using forEach? Performance wise forEach may not be optimal.
>
> Can you take the sample tables and try your method?
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
> marco.costantini@rocketfncl.com> wrote:
>
>> Hi Mich,
>> First, thank you for that. Great effort put into helping.
>>
>> Second, I don't think this tackles the technical challenge here. I
>> understand the windowing as it serves those ranks you created, but I don't
>> see how the ranks contribute to the solution.
>> Third, the core of the challenge is about performing this kind of
>> 'statement' but for all users. In this example we target Mich, but that
>> reduces the complexity by a lot! In fact, a simple join and filter would
>> solve that one.
>>
>> Any thoughts on that? For me, the foreach is desirable because I can have
>> the workers chain other actions to each iteration (send email, send HTTP
>> request, etc).
>>
>> Thanks Mich,
>> Marco.
>>
>> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> First thoughts.
>>>
>>> foreach() is an action operation that is to iterate/loop over each
>>> element in the dataset, meaning cursor based. That is different from
>>> operating over the dataset as a set which is far more efficient.
>>>
>>> So in your case as I understand it correctly, you want to get order for
>>> each user (say Mich), convert the result set to json and send it to Mich
>>> via email
>>>
>>> Let us try this based on sample data
>>>
>>> Put your csv files into HDFS directory
>>>
>>> hdfs dfs -put users.csv /data/stg/test
>>> hdfs dfs -put orders.csv /data/stg/test
>>>
>>> Then create dataframes from csv files, create temp views and do a join
>>> on result sets with some slicing and dicing on orders table
>>>
>>> #! /usr/bin/env python3
>>> from __future__ import print_function
>>> import sys
>>> import findspark
>>> findspark.init()
>>> from pyspark.sql import SparkSession
>>> from pyspark import SparkContext
>>> from pyspark.sql import SQLContext, HiveContext
>>> from pyspark.sql.window import Window
>>>
>>> def spark_session(appName):
>>>   return SparkSession.builder \
>>>         .appName(appName) \
>>>         .enableHiveSupport() \
>>>         .getOrCreate()
>>>
>>> def main():
>>>     appName = "ORDERS"
>>>     spark =spark_session(appName)
>>>     # get the sample
>>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>>     users_df =
>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>> "true").option("header", "true").load(users_file)
>>>     users_df.printSchema()
>>>     """
>>>     root
>>>     |-- id: integer (nullable = true)
>>>     |-- name: string (nullable = true)
>>>     """
>>>
>>>     print(f"""\n Reading from  {users_file}\n""")
>>>     users_df.show(5,False)
>>>     orders_df =
>>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>>> "true").option("header", "true").load(orders_file)
>>>     orders_df.printSchema()
>>>     """
>>>     root
>>>     |-- id: integer (nullable = true)
>>>     |-- description: string (nullable = true)
>>>     |-- amount: double (nullable = true)
>>>     |-- user_id: integer (nullable = true)
>>>      """
>>>     print(f"""\n Reading from  {orders_file}\n""")
>>>     orders_df.show(50,False)
>>>     users_df.createOrReplaceTempView("users")
>>>     orders_df.createOrReplaceTempView("orders")
>>>     # Create a list of orders for each user
>>>     print(f"""\n Doing a join on two temp views\n""")
>>>
>>>     sqltext = """
>>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>>     FROM
>>>     (
>>>     SELECT
>>>             user_id AS user_id
>>>         ,   id as order_id
>>>         ,   description as description
>>>         ,   amount AS amount
>>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>>> RANK
>>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>>> maxorders
>>>     FROM orders
>>>     ) t
>>>     INNER JOIN users u ON t.user_id = u.id
>>>     AND  u.name = 'Mich'
>>>     ORDER BY t.order_id
>>>     """
>>>     spark.sql(sqltext).show(50)
>>> if __name__ == '__main__':
>>>     main()
>>>
>>> Final outcome displaying orders for user Mich
>>>
>>> Doing a join on two temp views
>>>
>>>  Doing a join on two temp views
>>>
>>> +----+--------+-----------------+------+---------+
>>> |name|order_id|      description|amount|maxorders|
>>> +----+--------+-----------------+------+---------+
>>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>>> +----+--------+-----------------+------+---------+
>>>
>>> You can start on this.  Happy coding
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>>> marco.costantini@rocketfncl.com> wrote:
>>>
>>>> Thanks Mich,
>>>>
>>>> Great idea. I have done it. Those files are attached. I'm interested to
>>>> know your thoughts. Let's imagine this same structure, but with huge
>>>> amounts of data as well.
>>>>
>>>> Please and thank you,
>>>> Marco.
>>>>
>>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Hi Marco,
>>>>>
>>>>> Let us start simple,
>>>>>
>>>>> Provide a csv file of 5 rows for the users table. Each row has a
>>>>> unique user_id and one or two other columns like fictitious email etc.
>>>>>
>>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>>> orders table has 5 x 10 rows for each user_id.
>>>>>
>>>>> both as comma separated csv file
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies Limited
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>
>>>>>> Thanks Mich,
>>>>>> I have not but I will certainly read up on this today.
>>>>>>
>>>>>> To your point that all of the essential data is in the 'orders'
>>>>>> table; I agree! That distills the problem nicely. Yet, I still have some
>>>>>> questions on which someone may be able to shed some light.
>>>>>>
>>>>>> 1) If my 'orders' table is very large, and will need to be aggregated
>>>>>> by 'user_id', how will Spark intelligently optimize on that constraint
>>>>>> (only read data for relevent 'user_id's). Is that something I have to
>>>>>> instruct Spark to do?
>>>>>>
>>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>>> search too much?
>>>>>>
>>>>>> Please, if you have any links to documentation I can read on *how*
>>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>>> you give them. Spark has become a pillar on my team and knowing it in more
>>>>>> detail is warranted.
>>>>>>
>>>>>> Slightly pivoting the subject here; I have tried something. It was a
>>>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>>> script I now have the line:
>>>>>>
>>>>>> ```
>>>>>> grouped_orders_df =
>>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>>> ```
>>>>>> (json is ultimately needed)
>>>>>>
>>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>>> single Array column. Now my worry is, will this column become too large if
>>>>>> there are a great many orders. Is there a limit? I have search for
>>>>>> documentation on such a limit but could not find any.
>>>>>>
>>>>>> I truly appreciate your help Mich and team,
>>>>>> Marco.
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>
>>>>>>> Have you thought of using  windowing function
>>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>>> achieve this?
>>>>>>>
>>>>>>> Effectively all your information is in the orders table.
>>>>>>>
>>>>>>> HTH
>>>>>>>
>>>>>>> Mich Talebzadeh,
>>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>>> Palantir Technologies Limited
>>>>>>> London
>>>>>>> United Kingdom
>>>>>>>
>>>>>>>
>>>>>>>    view my Linkedin profile
>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>
>>>>>>>
>>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>>
>>>>>>>> I have two tables: {users, orders}. In this example, let's say that
>>>>>>>> for each 1 User in the users table, there are 100000 Orders in the orders
>>>>>>>> table.
>>>>>>>>
>>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>>> User. So, a single user will need his/her own list of Orders. Additionally,
>>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>>> example).
>>>>>>>>
>>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the email
>>>>>>>> sending individually. However, I now do not know the best way to get each
>>>>>>>> User's Orders.
>>>>>>>>
>>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>>
>>>>>>>> ```
>>>>>>>> users_df = <my entire users DataFrame>
>>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>>
>>>>>>>> #this is poorly named for max understandability in this context
>>>>>>>> def foreach_function(row):
>>>>>>>>   user_id = row.user_id
>>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>>
>>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>>
>>>>>>>> users_df.foreach(foreach_function)
>>>>>>>> ```
>>>>>>>>
>>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>>>>> work. However, I am worried of two things:
>>>>>>>>
>>>>>>>> If I take all Orders up front...
>>>>>>>>
>>>>>>>> Will that work?
>>>>>>>> Will I be taking too much? Will I be taking Orders on partitions
>>>>>>>> who won't handle them (different User).
>>>>>>>>
>>>>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>>>>
>>>>>>>> Will it work?
>>>>>>>> Will that be too much IO to DB?
>>>>>>>>
>>>>>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>>>>>
>>>>>>>> I have not yet tried anything here. I am doing so as we speak, but
>>>>>>>> am suffering from choice-paralysis.
>>>>>>>>
>>>>>>>> Please and thank you.
>>>>>>>>
>>>>>>> --
Best Regards,
Ayan Guha

Re: What is the best way to organize a join within a foreach?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Well OK in a nutshell you want the result set for every user prepared and
email to that user right.

This is a form of ETL where those result sets need to be posted somewhere.
Say you create a table based on the result set prepared for each user. You
may have many raw target tables at the end of the first ETL. How does this
differ from using forEach? Performance wise forEach may not be optimal.

Can you take the sample tables and try your method?

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
marco.costantini@rocketfncl.com> wrote:

> Hi Mich,
> First, thank you for that. Great effort put into helping.
>
> Second, I don't think this tackles the technical challenge here. I
> understand the windowing as it serves those ranks you created, but I don't
> see how the ranks contribute to the solution.
> Third, the core of the challenge is about performing this kind of
> 'statement' but for all users. In this example we target Mich, but that
> reduces the complexity by a lot! In fact, a simple join and filter would
> solve that one.
>
> Any thoughts on that? For me, the foreach is desirable because I can have
> the workers chain other actions to each iteration (send email, send HTTP
> request, etc).
>
> Thanks Mich,
> Marco.
>
> On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Hi Marco,
>>
>> First thoughts.
>>
>> foreach() is an action operation that is to iterate/loop over each
>> element in the dataset, meaning cursor based. That is different from
>> operating over the dataset as a set which is far more efficient.
>>
>> So in your case as I understand it correctly, you want to get order for
>> each user (say Mich), convert the result set to json and send it to Mich
>> via email
>>
>> Let us try this based on sample data
>>
>> Put your csv files into HDFS directory
>>
>> hdfs dfs -put users.csv /data/stg/test
>> hdfs dfs -put orders.csv /data/stg/test
>>
>> Then create dataframes from csv files, create temp views and do a join on
>> result sets with some slicing and dicing on orders table
>>
>> #! /usr/bin/env python3
>> from __future__ import print_function
>> import sys
>> import findspark
>> findspark.init()
>> from pyspark.sql import SparkSession
>> from pyspark import SparkContext
>> from pyspark.sql import SQLContext, HiveContext
>> from pyspark.sql.window import Window
>>
>> def spark_session(appName):
>>   return SparkSession.builder \
>>         .appName(appName) \
>>         .enableHiveSupport() \
>>         .getOrCreate()
>>
>> def main():
>>     appName = "ORDERS"
>>     spark =spark_session(appName)
>>     # get the sample
>>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>>     users_df =
>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load(users_file)
>>     users_df.printSchema()
>>     """
>>     root
>>     |-- id: integer (nullable = true)
>>     |-- name: string (nullable = true)
>>     """
>>
>>     print(f"""\n Reading from  {users_file}\n""")
>>     users_df.show(5,False)
>>     orders_df =
>> spark.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load(orders_file)
>>     orders_df.printSchema()
>>     """
>>     root
>>     |-- id: integer (nullable = true)
>>     |-- description: string (nullable = true)
>>     |-- amount: double (nullable = true)
>>     |-- user_id: integer (nullable = true)
>>      """
>>     print(f"""\n Reading from  {orders_file}\n""")
>>     orders_df.show(50,False)
>>     users_df.createOrReplaceTempView("users")
>>     orders_df.createOrReplaceTempView("orders")
>>     # Create a list of orders for each user
>>     print(f"""\n Doing a join on two temp views\n""")
>>
>>     sqltext = """
>>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>>     FROM
>>     (
>>     SELECT
>>             user_id AS user_id
>>         ,   id as order_id
>>         ,   description as description
>>         ,   amount AS amount
>>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS
>> RANK
>>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS
>> maxorders
>>     FROM orders
>>     ) t
>>     INNER JOIN users u ON t.user_id = u.id
>>     AND  u.name = 'Mich'
>>     ORDER BY t.order_id
>>     """
>>     spark.sql(sqltext).show(50)
>> if __name__ == '__main__':
>>     main()
>>
>> Final outcome displaying orders for user Mich
>>
>> Doing a join on two temp views
>>
>>  Doing a join on two temp views
>>
>> +----+--------+-----------------+------+---------+
>> |name|order_id|      description|amount|maxorders|
>> +----+--------+-----------------+------+---------+
>> |Mich|   50001| Mich's 1st order|101.11|   101.11|
>> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
>> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
>> |Mich|   50004| Mich's 4th order|104.11|   104.11|
>> |Mich|   50005| Mich's 5th order|105.11|   105.11|
>> |Mich|   50006| Mich's 6th order|106.11|   106.11|
>> |Mich|   50007| Mich's 7th order|107.11|   107.11|
>> |Mich|   50008| Mich's 8th order|108.11|   108.11|
>> |Mich|   50009| Mich's 9th order|109.11|   109.11|
>> |Mich|   50010|Mich's 10th order|210.11|   210.11|
>> +----+--------+-----------------+------+---------+
>>
>> You can start on this.  Happy coding
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
>> marco.costantini@rocketfncl.com> wrote:
>>
>>> Thanks Mich,
>>>
>>> Great idea. I have done it. Those files are attached. I'm interested to
>>> know your thoughts. Let's imagine this same structure, but with huge
>>> amounts of data as well.
>>>
>>> Please and thank you,
>>> Marco.
>>>
>>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Hi Marco,
>>>>
>>>> Let us start simple,
>>>>
>>>> Provide a csv file of 5 rows for the users table. Each row has a unique
>>>> user_id and one or two other columns like fictitious email etc.
>>>>
>>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>>> orders table has 5 x 10 rows for each user_id.
>>>>
>>>> both as comma separated csv file
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies Limited
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>>> marco.costantini@rocketfncl.com> wrote:
>>>>
>>>>> Thanks Mich,
>>>>> I have not but I will certainly read up on this today.
>>>>>
>>>>> To your point that all of the essential data is in the 'orders' table;
>>>>> I agree! That distills the problem nicely. Yet, I still have some questions
>>>>> on which someone may be able to shed some light.
>>>>>
>>>>> 1) If my 'orders' table is very large, and will need to be aggregated
>>>>> by 'user_id', how will Spark intelligently optimize on that constraint
>>>>> (only read data for relevent 'user_id's). Is that something I have to
>>>>> instruct Spark to do?
>>>>>
>>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>>> search too much?
>>>>>
>>>>> Please, if you have any links to documentation I can read on *how*
>>>>> Spark works under the hood for these operations, I would appreciate it if
>>>>> you give them. Spark has become a pillar on my team and knowing it in more
>>>>> detail is warranted.
>>>>>
>>>>> Slightly pivoting the subject here; I have tried something. It was a
>>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>>> script I now have the line:
>>>>>
>>>>> ```
>>>>> grouped_orders_df =
>>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>>> ```
>>>>> (json is ultimately needed)
>>>>>
>>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>>> single Array column. Now my worry is, will this column become too large if
>>>>> there are a great many orders. Is there a limit? I have search for
>>>>> documentation on such a limit but could not find any.
>>>>>
>>>>> I truly appreciate your help Mich and team,
>>>>> Marco.
>>>>>
>>>>>
>>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>
>>>>>> Have you thought of using  windowing function
>>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>>> achieve this?
>>>>>>
>>>>>> Effectively all your information is in the orders table.
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>> Mich Talebzadeh,
>>>>>> Lead Solutions Architect/Engineering Lead
>>>>>> Palantir Technologies Limited
>>>>>> London
>>>>>> United Kingdom
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>>
>>>>>>> I have two tables: {users, orders}. In this example, let's say that
>>>>>>> for each 1 User in the users table, there are 100000 Orders in the orders
>>>>>>> table.
>>>>>>>
>>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>>> User. So, a single user will need his/her own list of Orders. Additionally,
>>>>>>> I need to send this statement to the real-world user via email (for
>>>>>>> example).
>>>>>>>
>>>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>>>> DataFrame. This way, I can rely on the spark workers to handle the email
>>>>>>> sending individually. However, I now do not know the best way to get each
>>>>>>> User's Orders.
>>>>>>>
>>>>>>> I will soon try the following (pseudo-code):
>>>>>>>
>>>>>>> ```
>>>>>>> users_df = <my entire users DataFrame>
>>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>>
>>>>>>> #this is poorly named for max understandability in this context
>>>>>>> def foreach_function(row):
>>>>>>>   user_id = row.user_id
>>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>>
>>>>>>>   #here, I'd get any User info from 'row'
>>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>>   #then, I'd prepare the email and send it
>>>>>>>
>>>>>>> users_df.foreach(foreach_function)
>>>>>>> ```
>>>>>>>
>>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>>>> work. However, I am worried of two things:
>>>>>>>
>>>>>>> If I take all Orders up front...
>>>>>>>
>>>>>>> Will that work?
>>>>>>> Will I be taking too much? Will I be taking Orders on partitions who
>>>>>>> won't handle them (different User).
>>>>>>>
>>>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>>>
>>>>>>> Will it work?
>>>>>>> Will that be too much IO to DB?
>>>>>>>
>>>>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>>>>
>>>>>>> I have not yet tried anything here. I am doing so as we speak, but
>>>>>>> am suffering from choice-paralysis.
>>>>>>>
>>>>>>> Please and thank you.
>>>>>>>
>>>>>>

Re: What is the best way to organize a join within a foreach?

Posted by Marco Costantini <ma...@rocketfncl.com>.
Hi Mich,
First, thank you for that. Great effort put into helping.

Second, I don't think this tackles the technical challenge here. I
understand the windowing as it serves those ranks you created, but I don't
see how the ranks contribute to the solution.
Third, the core of the challenge is about performing this kind of
'statement' but for all users. In this example we target Mich, but that
reduces the complexity by a lot! In fact, a simple join and filter would
solve that one.

Any thoughts on that? For me, the foreach is desirable because I can have
the workers chain other actions to each iteration (send email, send HTTP
request, etc).

Thanks Mich,
Marco.

On Tue, Apr 25, 2023 at 6:06 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi Marco,
>
> First thoughts.
>
> foreach() is an action operation that is to iterate/loop over each
> element in the dataset, meaning cursor based. That is different from
> operating over the dataset as a set which is far more efficient.
>
> So in your case as I understand it correctly, you want to get order for
> each user (say Mich), convert the result set to json and send it to Mich
> via email
>
> Let us try this based on sample data
>
> Put your csv files into HDFS directory
>
> hdfs dfs -put users.csv /data/stg/test
> hdfs dfs -put orders.csv /data/stg/test
>
> Then create dataframes from csv files, create temp views and do a join on
> result sets with some slicing and dicing on orders table
>
> #! /usr/bin/env python3
> from __future__ import print_function
> import sys
> import findspark
> findspark.init()
> from pyspark.sql import SparkSession
> from pyspark import SparkContext
> from pyspark.sql import SQLContext, HiveContext
> from pyspark.sql.window import Window
>
> def spark_session(appName):
>   return SparkSession.builder \
>         .appName(appName) \
>         .enableHiveSupport() \
>         .getOrCreate()
>
> def main():
>     appName = "ORDERS"
>     spark =spark_session(appName)
>     # get the sample
>     users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
>     orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
>     users_df =
> spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load(users_file)
>     users_df.printSchema()
>     """
>     root
>     |-- id: integer (nullable = true)
>     |-- name: string (nullable = true)
>     """
>
>     print(f"""\n Reading from  {users_file}\n""")
>     users_df.show(5,False)
>     orders_df =
> spark.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load(orders_file)
>     orders_df.printSchema()
>     """
>     root
>     |-- id: integer (nullable = true)
>     |-- description: string (nullable = true)
>     |-- amount: double (nullable = true)
>     |-- user_id: integer (nullable = true)
>      """
>     print(f"""\n Reading from  {orders_file}\n""")
>     orders_df.show(50,False)
>     users_df.createOrReplaceTempView("users")
>     orders_df.createOrReplaceTempView("orders")
>     # Create a list of orders for each user
>     print(f"""\n Doing a join on two temp views\n""")
>
>     sqltext = """
>     SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
>     FROM
>     (
>     SELECT
>             user_id AS user_id
>         ,   id as order_id
>         ,   description as description
>         ,   amount AS amount
>         ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS RANK
>         ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS maxorders
>     FROM orders
>     ) t
>     INNER JOIN users u ON t.user_id = u.id
>     AND  u.name = 'Mich'
>     ORDER BY t.order_id
>     """
>     spark.sql(sqltext).show(50)
> if __name__ == '__main__':
>     main()
>
> Final outcome displaying orders for user Mich
>
> Doing a join on two temp views
>
>  Doing a join on two temp views
>
> +----+--------+-----------------+------+---------+
> |name|order_id|      description|amount|maxorders|
> +----+--------+-----------------+------+---------+
> |Mich|   50001| Mich's 1st order|101.11|   101.11|
> |Mich|   50002| Mich's 2nd order|102.11|   102.11|
> |Mich|   50003| Mich's 3rd order|103.11|   103.11|
> |Mich|   50004| Mich's 4th order|104.11|   104.11|
> |Mich|   50005| Mich's 5th order|105.11|   105.11|
> |Mich|   50006| Mich's 6th order|106.11|   106.11|
> |Mich|   50007| Mich's 7th order|107.11|   107.11|
> |Mich|   50008| Mich's 8th order|108.11|   108.11|
> |Mich|   50009| Mich's 9th order|109.11|   109.11|
> |Mich|   50010|Mich's 10th order|210.11|   210.11|
> +----+--------+-----------------+------+---------+
>
> You can start on this.  Happy coding
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
> marco.costantini@rocketfncl.com> wrote:
>
>> Thanks Mich,
>>
>> Great idea. I have done it. Those files are attached. I'm interested to
>> know your thoughts. Let's imagine this same structure, but with huge
>> amounts of data as well.
>>
>> Please and thank you,
>> Marco.
>>
>> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Hi Marco,
>>>
>>> Let us start simple,
>>>
>>> Provide a csv file of 5 rows for the users table. Each row has a unique
>>> user_id and one or two other columns like fictitious email etc.
>>>
>>> Also for each user_id, provide 10 rows of orders table, meaning that
>>> orders table has 5 x 10 rows for each user_id.
>>>
>>> both as comma separated csv file
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>>> marco.costantini@rocketfncl.com> wrote:
>>>
>>>> Thanks Mich,
>>>> I have not but I will certainly read up on this today.
>>>>
>>>> To your point that all of the essential data is in the 'orders' table;
>>>> I agree! That distills the problem nicely. Yet, I still have some questions
>>>> on which someone may be able to shed some light.
>>>>
>>>> 1) If my 'orders' table is very large, and will need to be aggregated
>>>> by 'user_id', how will Spark intelligently optimize on that constraint
>>>> (only read data for relevent 'user_id's). Is that something I have to
>>>> instruct Spark to do?
>>>>
>>>> 2) Without #1, even with windowing, am I asking each partition to
>>>> search too much?
>>>>
>>>> Please, if you have any links to documentation I can read on *how*
>>>> Spark works under the hood for these operations, I would appreciate it if
>>>> you give them. Spark has become a pillar on my team and knowing it in more
>>>> detail is warranted.
>>>>
>>>> Slightly pivoting the subject here; I have tried something. It was a
>>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>>> script I now have the line:
>>>>
>>>> ```
>>>> grouped_orders_df =
>>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>>> ```
>>>> (json is ultimately needed)
>>>>
>>>> This actually achieves my goal by putting all of the 'orders' in a
>>>> single Array column. Now my worry is, will this column become too large if
>>>> there are a great many orders. Is there a limit? I have search for
>>>> documentation on such a limit but could not find any.
>>>>
>>>> I truly appreciate your help Mich and team,
>>>> Marco.
>>>>
>>>>
>>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> Have you thought of using  windowing function
>>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>>> achieve this?
>>>>>
>>>>> Effectively all your information is in the orders table.
>>>>>
>>>>> HTH
>>>>>
>>>>> Mich Talebzadeh,
>>>>> Lead Solutions Architect/Engineering Lead
>>>>> Palantir Technologies Limited
>>>>> London
>>>>> United Kingdom
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>>> marco.costantini@rocketfncl.com> wrote:
>>>>>
>>>>>> I have two tables: {users, orders}. In this example, let's say that
>>>>>> for each 1 User in the users table, there are 100000 Orders in the orders
>>>>>> table.
>>>>>>
>>>>>> I have to use pyspark to generate a statement of Orders for each
>>>>>> User. So, a single user will need his/her own list of Orders. Additionally,
>>>>>> I need to send this statement to the real-world user via email (for
>>>>>> example).
>>>>>>
>>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>>> DataFrame. This way, I can rely on the spark workers to handle the email
>>>>>> sending individually. However, I now do not know the best way to get each
>>>>>> User's Orders.
>>>>>>
>>>>>> I will soon try the following (pseudo-code):
>>>>>>
>>>>>> ```
>>>>>> users_df = <my entire users DataFrame>
>>>>>> orders_df = <my entire orders DataFrame>
>>>>>>
>>>>>> #this is poorly named for max understandability in this context
>>>>>> def foreach_function(row):
>>>>>>   user_id = row.user_id
>>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>>
>>>>>>   #here, I'd get any User info from 'row'
>>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>>   #then, I'd prepare the email and send it
>>>>>>
>>>>>> users_df.foreach(foreach_function)
>>>>>> ```
>>>>>>
>>>>>> It is my understanding that if I do my user-specific work in the
>>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>>> work. However, I am worried of two things:
>>>>>>
>>>>>> If I take all Orders up front...
>>>>>>
>>>>>> Will that work?
>>>>>> Will I be taking too much? Will I be taking Orders on partitions who
>>>>>> won't handle them (different User).
>>>>>>
>>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>>
>>>>>> Will it work?
>>>>>> Will that be too much IO to DB?
>>>>>>
>>>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>>>
>>>>>> I have not yet tried anything here. I am doing so as we speak, but am
>>>>>> suffering from choice-paralysis.
>>>>>>
>>>>>> Please and thank you.
>>>>>>
>>>>>

Re: What is the best way to organize a join within a foreach?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Marco,

First thoughts.

foreach() is an action operation that is to iterate/loop over each element
in the dataset, meaning cursor based. That is different from operating over
the dataset as a set which is far more efficient.

So in your case as I understand it correctly, you want to get order for
each user (say Mich), convert the result set to json and send it to Mich
via email

Let us try this based on sample data

Put your csv files into HDFS directory

hdfs dfs -put users.csv /data/stg/test
hdfs dfs -put orders.csv /data/stg/test

Then create dataframes from csv files, create temp views and do a join on
result sets with some slicing and dicing on orders table

#! /usr/bin/env python3
from __future__ import print_function
import sys
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql.window import Window

def spark_session(appName):
  return SparkSession.builder \
        .appName(appName) \
        .enableHiveSupport() \
        .getOrCreate()

def main():
    appName = "ORDERS"
    spark =spark_session(appName)
    # get the sample
    users_file="hdfs://rhes75:9000/data/stg/test/users.csv"
    orders_file="hdfs://rhes75:9000/data/stg/test/orders.csv"
    users_df =
spark.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load(users_file)
    users_df.printSchema()
    """
    root
    |-- id: integer (nullable = true)
    |-- name: string (nullable = true)
    """

    print(f"""\n Reading from  {users_file}\n""")
    users_df.show(5,False)
    orders_df =
spark.read.format("com.databricks.spark.csv").option("inferSchema",
"true").option("header", "true").load(orders_file)
    orders_df.printSchema()
    """
    root
    |-- id: integer (nullable = true)
    |-- description: string (nullable = true)
    |-- amount: double (nullable = true)
    |-- user_id: integer (nullable = true)
     """
    print(f"""\n Reading from  {orders_file}\n""")
    orders_df.show(50,False)
    users_df.createOrReplaceTempView("users")
    orders_df.createOrReplaceTempView("orders")
    # Create a list of orders for each user
    print(f"""\n Doing a join on two temp views\n""")

    sqltext = """
    SELECT u.name, t.order_id, t.description, t.amount, t.maxorders
    FROM
    (
    SELECT
            user_id AS user_id
        ,   id as order_id
        ,   description as description
        ,   amount AS amount
        ,  DENSE_RANK() OVER (PARTITION by user_id ORDER BY amount) AS RANK
        ,  MAX(amount) OVER (PARTITION by user_id ORDER BY id) AS maxorders
    FROM orders
    ) t
    INNER JOIN users u ON t.user_id = u.id
    AND  u.name = 'Mich'
    ORDER BY t.order_id
    """
    spark.sql(sqltext).show(50)
if __name__ == '__main__':
    main()

Final outcome displaying orders for user Mich

Doing a join on two temp views

 Doing a join on two temp views

+----+--------+-----------------+------+---------+
|name|order_id|      description|amount|maxorders|
+----+--------+-----------------+------+---------+
|Mich|   50001| Mich's 1st order|101.11|   101.11|
|Mich|   50002| Mich's 2nd order|102.11|   102.11|
|Mich|   50003| Mich's 3rd order|103.11|   103.11|
|Mich|   50004| Mich's 4th order|104.11|   104.11|
|Mich|   50005| Mich's 5th order|105.11|   105.11|
|Mich|   50006| Mich's 6th order|106.11|   106.11|
|Mich|   50007| Mich's 7th order|107.11|   107.11|
|Mich|   50008| Mich's 8th order|108.11|   108.11|
|Mich|   50009| Mich's 9th order|109.11|   109.11|
|Mich|   50010|Mich's 10th order|210.11|   210.11|
+----+--------+-----------------+------+---------+

You can start on this.  Happy coding

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 25 Apr 2023 at 18:50, Marco Costantini <
marco.costantini@rocketfncl.com> wrote:

> Thanks Mich,
>
> Great idea. I have done it. Those files are attached. I'm interested to
> know your thoughts. Let's imagine this same structure, but with huge
> amounts of data as well.
>
> Please and thank you,
> Marco.
>
> On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <
> mich.talebzadeh@gmail.com> wrote:
>
>> Hi Marco,
>>
>> Let us start simple,
>>
>> Provide a csv file of 5 rows for the users table. Each row has a unique
>> user_id and one or two other columns like fictitious email etc.
>>
>> Also for each user_id, provide 10 rows of orders table, meaning that
>> orders table has 5 x 10 rows for each user_id.
>>
>> both as comma separated csv file
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
>> marco.costantini@rocketfncl.com> wrote:
>>
>>> Thanks Mich,
>>> I have not but I will certainly read up on this today.
>>>
>>> To your point that all of the essential data is in the 'orders' table; I
>>> agree! That distills the problem nicely. Yet, I still have some questions
>>> on which someone may be able to shed some light.
>>>
>>> 1) If my 'orders' table is very large, and will need to be aggregated by
>>> 'user_id', how will Spark intelligently optimize on that constraint (only
>>> read data for relevent 'user_id's). Is that something I have to instruct
>>> Spark to do?
>>>
>>> 2) Without #1, even with windowing, am I asking each partition to search
>>> too much?
>>>
>>> Please, if you have any links to documentation I can read on *how* Spark
>>> works under the hood for these operations, I would appreciate it if you
>>> give them. Spark has become a pillar on my team and knowing it in more
>>> detail is warranted.
>>>
>>> Slightly pivoting the subject here; I have tried something. It was a
>>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>>> script I now have the line:
>>>
>>> ```
>>> grouped_orders_df =
>>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>>> 'timestamp', 'total', 'description'))).alias('orders'))
>>> ```
>>> (json is ultimately needed)
>>>
>>> This actually achieves my goal by putting all of the 'orders' in a
>>> single Array column. Now my worry is, will this column become too large if
>>> there are a great many orders. Is there a limit? I have search for
>>> documentation on such a limit but could not find any.
>>>
>>> I truly appreciate your help Mich and team,
>>> Marco.
>>>
>>>
>>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>>> mich.talebzadeh@gmail.com> wrote:
>>>
>>>> Have you thought of using  windowing function
>>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>>> achieve this?
>>>>
>>>> Effectively all your information is in the orders table.
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies Limited
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>>> marco.costantini@rocketfncl.com> wrote:
>>>>
>>>>> I have two tables: {users, orders}. In this example, let's say that
>>>>> for each 1 User in the users table, there are 100000 Orders in the orders
>>>>> table.
>>>>>
>>>>> I have to use pyspark to generate a statement of Orders for each User.
>>>>> So, a single user will need his/her own list of Orders. Additionally, I
>>>>> need to send this statement to the real-world user via email (for example).
>>>>>
>>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>>> DataFrame. This way, I can rely on the spark workers to handle the email
>>>>> sending individually. However, I now do not know the best way to get each
>>>>> User's Orders.
>>>>>
>>>>> I will soon try the following (pseudo-code):
>>>>>
>>>>> ```
>>>>> users_df = <my entire users DataFrame>
>>>>> orders_df = <my entire orders DataFrame>
>>>>>
>>>>> #this is poorly named for max understandability in this context
>>>>> def foreach_function(row):
>>>>>   user_id = row.user_id
>>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>>
>>>>>   #here, I'd get any User info from 'row'
>>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>>   #then, I'd prepare the email and send it
>>>>>
>>>>> users_df.foreach(foreach_function)
>>>>> ```
>>>>>
>>>>> It is my understanding that if I do my user-specific work in the
>>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>>> work. However, I am worried of two things:
>>>>>
>>>>> If I take all Orders up front...
>>>>>
>>>>> Will that work?
>>>>> Will I be taking too much? Will I be taking Orders on partitions who
>>>>> won't handle them (different User).
>>>>>
>>>>> If I create the orders_df (filtered) within the foreach function...
>>>>>
>>>>> Will it work?
>>>>> Will that be too much IO to DB?
>>>>>
>>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>>
>>>>> I have not yet tried anything here. I am doing so as we speak, but am
>>>>> suffering from choice-paralysis.
>>>>>
>>>>> Please and thank you.
>>>>>
>>>>

Re: What is the best way to organize a join within a foreach?

Posted by Marco Costantini <ma...@rocketfncl.com>.
Thanks Mich,

Great idea. I have done it. Those files are attached. I'm interested to
know your thoughts. Let's imagine this same structure, but with huge
amounts of data as well.

Please and thank you,
Marco.

On Tue, Apr 25, 2023 at 12:12 PM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Hi Marco,
>
> Let us start simple,
>
> Provide a csv file of 5 rows for the users table. Each row has a unique
> user_id and one or two other columns like fictitious email etc.
>
> Also for each user_id, provide 10 rows of orders table, meaning that
> orders table has 5 x 10 rows for each user_id.
>
> both as comma separated csv file
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
> marco.costantini@rocketfncl.com> wrote:
>
>> Thanks Mich,
>> I have not but I will certainly read up on this today.
>>
>> To your point that all of the essential data is in the 'orders' table; I
>> agree! That distills the problem nicely. Yet, I still have some questions
>> on which someone may be able to shed some light.
>>
>> 1) If my 'orders' table is very large, and will need to be aggregated by
>> 'user_id', how will Spark intelligently optimize on that constraint (only
>> read data for relevent 'user_id's). Is that something I have to instruct
>> Spark to do?
>>
>> 2) Without #1, even with windowing, am I asking each partition to search
>> too much?
>>
>> Please, if you have any links to documentation I can read on *how* Spark
>> works under the hood for these operations, I would appreciate it if you
>> give them. Spark has become a pillar on my team and knowing it in more
>> detail is warranted.
>>
>> Slightly pivoting the subject here; I have tried something. It was a
>> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
>> script I now have the line:
>>
>> ```
>> grouped_orders_df =
>> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
>> 'timestamp', 'total', 'description'))).alias('orders'))
>> ```
>> (json is ultimately needed)
>>
>> This actually achieves my goal by putting all of the 'orders' in a single
>> Array column. Now my worry is, will this column become too large if there
>> are a great many orders. Is there a limit? I have search for documentation
>> on such a limit but could not find any.
>>
>> I truly appreciate your help Mich and team,
>> Marco.
>>
>>
>> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <
>> mich.talebzadeh@gmail.com> wrote:
>>
>>> Have you thought of using  windowing function
>>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>>> achieve this?
>>>
>>> Effectively all your information is in the orders table.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>> London
>>> United Kingdom
>>>
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>>> marco.costantini@rocketfncl.com> wrote:
>>>
>>>> I have two tables: {users, orders}. In this example, let's say that for
>>>> each 1 User in the users table, there are 100000 Orders in the orders table.
>>>>
>>>> I have to use pyspark to generate a statement of Orders for each User.
>>>> So, a single user will need his/her own list of Orders. Additionally, I
>>>> need to send this statement to the real-world user via email (for example).
>>>>
>>>> My first intuition was to apply a DataFrame.foreach() on the users
>>>> DataFrame. This way, I can rely on the spark workers to handle the email
>>>> sending individually. However, I now do not know the best way to get each
>>>> User's Orders.
>>>>
>>>> I will soon try the following (pseudo-code):
>>>>
>>>> ```
>>>> users_df = <my entire users DataFrame>
>>>> orders_df = <my entire orders DataFrame>
>>>>
>>>> #this is poorly named for max understandability in this context
>>>> def foreach_function(row):
>>>>   user_id = row.user_id
>>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>>
>>>>   #here, I'd get any User info from 'row'
>>>>   #then, I'd convert all 'user_orders' to JSON
>>>>   #then, I'd prepare the email and send it
>>>>
>>>> users_df.foreach(foreach_function)
>>>> ```
>>>>
>>>> It is my understanding that if I do my user-specific work in the
>>>> foreach function, I will capitalize on Spark's scalability when doing that
>>>> work. However, I am worried of two things:
>>>>
>>>> If I take all Orders up front...
>>>>
>>>> Will that work?
>>>> Will I be taking too much? Will I be taking Orders on partitions who
>>>> won't handle them (different User).
>>>>
>>>> If I create the orders_df (filtered) within the foreach function...
>>>>
>>>> Will it work?
>>>> Will that be too much IO to DB?
>>>>
>>>> The question ultimately is: How can I achieve this goal efficiently?
>>>>
>>>> I have not yet tried anything here. I am doing so as we speak, but am
>>>> suffering from choice-paralysis.
>>>>
>>>> Please and thank you.
>>>>
>>>

Re: What is the best way to organize a join within a foreach?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Hi Marco,

Let us start simple,

Provide a csv file of 5 rows for the users table. Each row has a unique
user_id and one or two other columns like fictitious email etc.

Also for each user_id, provide 10 rows of orders table, meaning that orders
table has 5 x 10 rows for each user_id.

both as comma separated csv file

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 25 Apr 2023 at 14:07, Marco Costantini <
marco.costantini@rocketfncl.com> wrote:

> Thanks Mich,
> I have not but I will certainly read up on this today.
>
> To your point that all of the essential data is in the 'orders' table; I
> agree! That distills the problem nicely. Yet, I still have some questions
> on which someone may be able to shed some light.
>
> 1) If my 'orders' table is very large, and will need to be aggregated by
> 'user_id', how will Spark intelligently optimize on that constraint (only
> read data for relevent 'user_id's). Is that something I have to instruct
> Spark to do?
>
> 2) Without #1, even with windowing, am I asking each partition to search
> too much?
>
> Please, if you have any links to documentation I can read on *how* Spark
> works under the hood for these operations, I would appreciate it if you
> give them. Spark has become a pillar on my team and knowing it in more
> detail is warranted.
>
> Slightly pivoting the subject here; I have tried something. It was a
> suggestion by an AI chat bot and it seemed reasonable. In my main Spark
> script I now have the line:
>
> ```
> grouped_orders_df =
> orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
> 'timestamp', 'total', 'description'))).alias('orders'))
> ```
> (json is ultimately needed)
>
> This actually achieves my goal by putting all of the 'orders' in a single
> Array column. Now my worry is, will this column become too large if there
> are a great many orders. Is there a limit? I have search for documentation
> on such a limit but could not find any.
>
> I truly appreciate your help Mich and team,
> Marco.
>
>
> On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <mi...@gmail.com>
> wrote:
>
>> Have you thought of using  windowing function
>> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
>> achieve this?
>>
>> Effectively all your information is in the orders table.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
>> marco.costantini@rocketfncl.com> wrote:
>>
>>> I have two tables: {users, orders}. In this example, let's say that for
>>> each 1 User in the users table, there are 100000 Orders in the orders table.
>>>
>>> I have to use pyspark to generate a statement of Orders for each User.
>>> So, a single user will need his/her own list of Orders. Additionally, I
>>> need to send this statement to the real-world user via email (for example).
>>>
>>> My first intuition was to apply a DataFrame.foreach() on the users
>>> DataFrame. This way, I can rely on the spark workers to handle the email
>>> sending individually. However, I now do not know the best way to get each
>>> User's Orders.
>>>
>>> I will soon try the following (pseudo-code):
>>>
>>> ```
>>> users_df = <my entire users DataFrame>
>>> orders_df = <my entire orders DataFrame>
>>>
>>> #this is poorly named for max understandability in this context
>>> def foreach_function(row):
>>>   user_id = row.user_id
>>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>>
>>>   #here, I'd get any User info from 'row'
>>>   #then, I'd convert all 'user_orders' to JSON
>>>   #then, I'd prepare the email and send it
>>>
>>> users_df.foreach(foreach_function)
>>> ```
>>>
>>> It is my understanding that if I do my user-specific work in the foreach
>>> function, I will capitalize on Spark's scalability when doing that work.
>>> However, I am worried of two things:
>>>
>>> If I take all Orders up front...
>>>
>>> Will that work?
>>> Will I be taking too much? Will I be taking Orders on partitions who
>>> won't handle them (different User).
>>>
>>> If I create the orders_df (filtered) within the foreach function...
>>>
>>> Will it work?
>>> Will that be too much IO to DB?
>>>
>>> The question ultimately is: How can I achieve this goal efficiently?
>>>
>>> I have not yet tried anything here. I am doing so as we speak, but am
>>> suffering from choice-paralysis.
>>>
>>> Please and thank you.
>>>
>>

Re: What is the best way to organize a join within a foreach?

Posted by Marco Costantini <ma...@rocketfncl.com>.
Thanks Mich,
I have not but I will certainly read up on this today.

To your point that all of the essential data is in the 'orders' table; I
agree! That distills the problem nicely. Yet, I still have some questions
on which someone may be able to shed some light.

1) If my 'orders' table is very large, and will need to be aggregated by
'user_id', how will Spark intelligently optimize on that constraint (only
read data for relevent 'user_id's). Is that something I have to instruct
Spark to do?

2) Without #1, even with windowing, am I asking each partition to search
too much?

Please, if you have any links to documentation I can read on *how* Spark
works under the hood for these operations, I would appreciate it if you
give them. Spark has become a pillar on my team and knowing it in more
detail is warranted.

Slightly pivoting the subject here; I have tried something. It was a
suggestion by an AI chat bot and it seemed reasonable. In my main Spark
script I now have the line:

```
grouped_orders_df =
orders_df.groupBy('user_id').agg(collect_list(to_json(struct('user_id',
'timestamp', 'total', 'description'))).alias('orders'))
```
(json is ultimately needed)

This actually achieves my goal by putting all of the 'orders' in a single
Array column. Now my worry is, will this column become too large if there
are a great many orders. Is there a limit? I have search for documentation
on such a limit but could not find any.

I truly appreciate your help Mich and team,
Marco.


On Tue, Apr 25, 2023 at 5:40 AM Mich Talebzadeh <mi...@gmail.com>
wrote:

> Have you thought of using  windowing function
> <https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
> achieve this?
>
> Effectively all your information is in the orders table.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
> marco.costantini@rocketfncl.com> wrote:
>
>> I have two tables: {users, orders}. In this example, let's say that for
>> each 1 User in the users table, there are 100000 Orders in the orders table.
>>
>> I have to use pyspark to generate a statement of Orders for each User.
>> So, a single user will need his/her own list of Orders. Additionally, I
>> need to send this statement to the real-world user via email (for example).
>>
>> My first intuition was to apply a DataFrame.foreach() on the users
>> DataFrame. This way, I can rely on the spark workers to handle the email
>> sending individually. However, I now do not know the best way to get each
>> User's Orders.
>>
>> I will soon try the following (pseudo-code):
>>
>> ```
>> users_df = <my entire users DataFrame>
>> orders_df = <my entire orders DataFrame>
>>
>> #this is poorly named for max understandability in this context
>> def foreach_function(row):
>>   user_id = row.user_id
>>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>>
>>   #here, I'd get any User info from 'row'
>>   #then, I'd convert all 'user_orders' to JSON
>>   #then, I'd prepare the email and send it
>>
>> users_df.foreach(foreach_function)
>> ```
>>
>> It is my understanding that if I do my user-specific work in the foreach
>> function, I will capitalize on Spark's scalability when doing that work.
>> However, I am worried of two things:
>>
>> If I take all Orders up front...
>>
>> Will that work?
>> Will I be taking too much? Will I be taking Orders on partitions who
>> won't handle them (different User).
>>
>> If I create the orders_df (filtered) within the foreach function...
>>
>> Will it work?
>> Will that be too much IO to DB?
>>
>> The question ultimately is: How can I achieve this goal efficiently?
>>
>> I have not yet tried anything here. I am doing so as we speak, but am
>> suffering from choice-paralysis.
>>
>> Please and thank you.
>>
>

Re: What is the best way to organize a join within a foreach?

Posted by Mich Talebzadeh <mi...@gmail.com>.
Have you thought of using  windowing function
<https://sparkbyexamples.com/spark/spark-sql-window-functions/>s to
achieve this?

Effectively all your information is in the orders table.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 25 Apr 2023 at 00:15, Marco Costantini <
marco.costantini@rocketfncl.com> wrote:

> I have two tables: {users, orders}. In this example, let's say that for
> each 1 User in the users table, there are 100000 Orders in the orders table.
>
> I have to use pyspark to generate a statement of Orders for each User. So,
> a single user will need his/her own list of Orders. Additionally, I need to
> send this statement to the real-world user via email (for example).
>
> My first intuition was to apply a DataFrame.foreach() on the users
> DataFrame. This way, I can rely on the spark workers to handle the email
> sending individually. However, I now do not know the best way to get each
> User's Orders.
>
> I will soon try the following (pseudo-code):
>
> ```
> users_df = <my entire users DataFrame>
> orders_df = <my entire orders DataFrame>
>
> #this is poorly named for max understandability in this context
> def foreach_function(row):
>   user_id = row.user_id
>   user_orders_df = orders_df.select(f'user_id = {user_id}')
>
>   #here, I'd get any User info from 'row'
>   #then, I'd convert all 'user_orders' to JSON
>   #then, I'd prepare the email and send it
>
> users_df.foreach(foreach_function)
> ```
>
> It is my understanding that if I do my user-specific work in the foreach
> function, I will capitalize on Spark's scalability when doing that work.
> However, I am worried of two things:
>
> If I take all Orders up front...
>
> Will that work?
> Will I be taking too much? Will I be taking Orders on partitions who won't
> handle them (different User).
>
> If I create the orders_df (filtered) within the foreach function...
>
> Will it work?
> Will that be too much IO to DB?
>
> The question ultimately is: How can I achieve this goal efficiently?
>
> I have not yet tried anything here. I am doing so as we speak, but am
> suffering from choice-paralysis.
>
> Please and thank you.
>