You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Vinoth Chandar (Jira)" <ji...@apache.org> on 2021/10/04 14:53:00 UTC

[jira] [Updated] (HUDI-2467) Delete data is not working with 0.9.0 and pySpark

     [ https://issues.apache.org/jira/browse/HUDI-2467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vinoth Chandar updated HUDI-2467:
---------------------------------
    Summary: Delete data is not working with 0.9.0 and pySpark  (was: Delete data is not working with 0.9.0 )

> Delete data is not working with 0.9.0 and pySpark
> -------------------------------------------------
>
>                 Key: HUDI-2467
>                 URL: https://issues.apache.org/jira/browse/HUDI-2467
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Spark Integration
>            Reporter: Phil Chen
>            Priority: Major
>
> Following this spark guide:
> [https://hudi.apache.org/docs/quick-start-guide/]
> Everything works until delete data:
> I'm using Pyspark with Spark 3.1.2 with python 3.9
> {code:java}
> // code placeholder
> # pyspark# fetch total records count
> spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
> # fetch two records to be deleted
> ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
> # issue deletes
> hudi_delete_options = {  'hoodie.table.name': tableName,  'hoodie.datasource.write.recordkey.field': 'uuid',  'hoodie.datasource.write.partitionpath.field': 'partitionpath',  'hoodie.datasource.write.table.name': tableName,  'hoodie.datasource.write.operation': 'delete',  'hoodie.datasource.write.precombine.field': 'ts',  'hoodie.upsert.shuffle.parallelism': 2,   
> 'hoodie.insert.shuffle.parallelism': 2}
> from pyspark.sql.functions import lit
> deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
> df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))
> df.write.format("hudi"). \  
>   options(**hudi_delete_options). \
>   mode("append"). \
>   save(basePath)
> # run the same read query as above.
> roAfterDeleteViewDF = spark. \
>   read. \
>   format("hudi"). \
>   load(basePath) 
> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
> # fetch should return (total - 2) records
> spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count(){code}
> The count before delete is 10 and after delete is still 10 (expecting 8)
> {code:java}
> // code placeholder
> >>> df.show()
> +--------------------+--------------------+---+
> |       partitionpath|                uuid| ts|
> +--------------------+--------------------+---+
> |74bed794-c854-4aa...|americas/united_s...|0.0|
> |ce71c2dc-dedf-483...|americas/united_s...|0.0|
> +--------------------+--------------------+---+
> {code}
>  
> The 2 records to be deleted
> Note, the 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)