You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ayan guha <gu...@gmail.com> on 2020/01/21 21:41:30 UTC

Re: Performance tuning on the Databricks pyspark 2.4.4

For case 1, you can create 3 notebooks and 3 jobs in databricks. Then you
can run them in parallel

On Wed, 22 Jan 2020 at 3:50 am, anbutech <an...@outlook.com> wrote:

> Hi sir,
>
> Could you please help me on the below two cases in the databricks pyspark
> data processing terabytes of json data read from aws s3 bucket.
>
> case 1:
>
> currently I'm reading multiple tables sequentially to get the day count
> from each table
>
> for ex: table_list.csv having one column with multiple table names
>
> year=2019
> month=12
>
> tablesDF =
>
> spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
> tabList = tablesDF.toPandas().values.tolist()
> for table in tabList:
> tab_name = table[0]
>
>  // Snowflake Settings and snowflake  table count()
>
>     sfOptions = dict(
>       "sfURL" -> "",
>       "sfAccount" -> "",
>       "sfUser" -> "",
>       "sfPassword" -> "",
>       "sfDatabase" -> "",
>       "sfSchema" -> "",
>       "sfWarehouse" -> "",
>     )
>
>     // Read data as dataframe
>
>     sfxdf = spark.read
>       .format("snowflake")
>       .options(**sfOptions)
>       .option("query", "select y as year,m as month,count(*) as sCount from
> {} where y={} and m={} group by year,month").format(tab_name,year,month)
>       .load()
>
> //databricks delta lake
>
>          dbxDF = spark.sql("select y as year,m as month,count(*) as dCount
> from
> db.{} where y={} and m={}" group by
> year,month).format(tab_name,year,month)
>
>     resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
> ).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))
>
>         finalDF = resultDF.withColumn("table_name",
>
> lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")
>
>
>         finalDF.coalesce(1).write.format('csv').option('header',
> 'true').save("s3a://outputs/reportcsv)
>
>         Question:
>
>         1) Instead of sequence based running the count query taking one by
> one
> tables ,how to parallel read all the tables from the csv file from s3 and
>         distributed the jobs across the cluster.
>
>         2) Could you please how to optimize the above code in the pyspark
> for
> parallel processing all the count query at the same time.
>
>
>
> Case 2 :
>
> Multiprocessing case:
>   ------------------------
>
>         Could you please help me how to achieve multiprocessing on the
> above
> pyspark query to parallel running in the distributed environment.
>
>         By using below snippets is there any way to achieve the parallel
> processing
> pyspark code in the cluster.
>
>         # Creating a pool of 20 processes. You can set this as per your
> intended
> parallelism and your available resources.
>
>
>
>
>    start = time.time()
>     pool = multiprocessing.Pool(20)
>     # This will execute get_counts() parallel, on each element inside
> input_paths.
>     # result (a list of dictionary) is constructed when all executions are
> completed.
>     //result = pool.map(get_counts, input_paths)
>
>     end = time.time()
>
>     result_df = pd.DataFrame(result)
>     # You can use, result_df.to_csv() to store the results in a csv.
>     print(result_df)
>     print('Time take : {}'.format(end - start))
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
> --
Best Regards,
Ayan Guha