You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Karthick Nk <kc...@gmail.com> on 2023/04/28 11:59:29 UTC

***pyspark.sql.functions.monotonically_increasing_id()***

Hi @all,

I am using monotonically_increasing_id(), in the pyspark function, for
removing one field from json field in one column from the delta table,
please refer the below code

df = spark.sql(f"SELECT * from {database}.{table}")
df1 = spark.read.json(df.rdd.map(lambda x: x.data), multiLine = True)
df1 = df1.drop('fw_createdts')
df1 = df1.selectExpr('to_json(struct(*)) as data')
df = df.withColumn('row_index',
row_number().over(Window.orderBy(monotonically_increasing_id()))).withColumnRenamed(
"data","data1")
df1 = df1.withColumn('row_index',
row_number().over(Window.orderBy(monotonically_increasing_id())))
df = df.join(df1, on=["row_index"]).drop("row_index","data1")
df.createOrReplaceTempView('tempdf')

Business Requirement:
Need to remove one key value from json field from the json field in the
delta table

done steps:
1. reading the data
2. read the JSON data only in separate df and split using spark.read.json
3. I am removing the unwanted column from the df
4. again i am converting it into json field
5. (since we don't have any unique column between json column and other
primary columns are not able to map with unique id) so, I am joining two
dataframe using  monotonically_increasing_id()
(and also some of the fields present in the JSON field is present  in the
primitive level, so not able to split the json field in same dataframe)

*Issue Faced:*
*1. For small database and data it is working as expected*
*2. for big database and data it is not working as expected and get mapping
with different records in same table*

*When I referred document, I could see the id are not consecutive, is there
any limit?*
https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html#pyspark-sql-functions-monotonically-increasing-id

*Could you explain to us if there any constraints on it?*
*How we can achieve this requirement by using any alternate methods?*


Thanks in advance🙂

Re: ***pyspark.sql.functions.monotonically_increasing_id()***

Posted by Winston Lai <we...@gmail.com>.
Hi Karthick,

A few points that may help you:

As stated in the URL you posted, "The function is non-deterministic because its result depends on partition IDs." Hence, the generated ID is dependent on partition IDs. Based on the code snippet you provided, I didn't see the partion columns you selected to apply the window function. Therefore your generated IDs will vary for each run because the partion ID assgined to a particular record may change in each run.

You mentioned that it worked on small database. The small data may happen to fit into one partition and thus give you the same output by chance. Once your data scale up and being assigned with different partition IDs, your result will likely be different. When it comes to solution, to begin with, you may partion your data correctly before applying window functions.

Besides, to achieve what you want to do, you may use row_number() alone to generate the so call index or use monotonically_increasing_id() together with rank() to assign the id and ranking them to make the result more deterministic.

Thank You & Best Regards
Winston Lai

Thank You & Best Regards
Winston Lai
________________________________
From: Karthick Nk <kc...@gmail.com>
Sent: Friday, April 28, 2023 7:59:29 PM
To: user@spark.apache.org <us...@spark.apache.org>
Subject: ***pyspark.sql.functions.monotonically_increasing_id()***

Hi @all,

I am using monotonically_increasing_id(), in the pyspark function, for removing one field from json field in one column from the delta table, please refer the below code

df = spark.sql(f"SELECT * from {database}.{table}")
df1 = spark.read.json(df.rdd.map(lambda x: x.data), multiLine = True)
df1 = df1.drop('fw_createdts')
df1 = df1.selectExpr('to_json(struct(*)) as data')
df = df.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).withColumnRenamed("data","data1")
df1 = df1.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df = df.join(df1, on=["row_index"]).drop("row_index","data1")
df.createOrReplaceTempView('tempdf')

Business Requirement:
Need to remove one key value from json field from the json field in the delta table

done steps:
1. reading the data
2. read the JSON data only in separate df and split using spark.read.json
3. I am removing the unwanted column from the df
4. again i am converting it into json field
5. (since we don't have any unique column between json column and other primary columns are not able to map with unique id) so, I am joining two dataframe using  monotonically_increasing_id()
(and also some of the fields present in the JSON field is present  in the primitive level, so not able to split the json field in same dataframe)

Issue Faced:
1. For small database and data it is working as expected
2. for big database and data it is not working as expected and get mapping with different records in same table

When I referred document, I could see the id are not consecutive, is there any limit?
https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html#pyspark-sql-functions-monotonically-increasing-id

Could you explain to us if there any constraints on it?
How we can achieve this requirement by using any alternate methods?


Thanks in advance🙂