You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Vinoth Chandar (Jira)" <ji...@apache.org> on 2021/10/07 23:05:01 UTC
[jira] [Commented] (HUDI-2467) Delete data is not working with
0.9.0 and pySpark
[ https://issues.apache.org/jira/browse/HUDI-2467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425826#comment-17425826 ]
Vinoth Chandar commented on HUDI-2467:
--------------------------------------
[~phi@gpm.com] could you share the error you got?
> Delete data is not working with 0.9.0 and pySpark
> -------------------------------------------------
>
> Key: HUDI-2467
> URL: https://issues.apache.org/jira/browse/HUDI-2467
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: Spark Integration
> Reporter: Phil Chen
> Priority: Major
>
> Following this spark guide:
> [https://hudi.apache.org/docs/quick-start-guide/]
> Everything works until delete data:
> I'm using Pyspark with Spark 3.1.2 with python 3.9
> {code:java}
> // code placeholder
> # pyspark# fetch total records count
> spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
> # fetch two records to be deleted
> ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
> # issue deletes
> hudi_delete_options = { 'hoodie.table.name': tableName, 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.table.name': tableName, 'hoodie.datasource.write.operation': 'delete', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.upsert.shuffle.parallelism': 2,
> 'hoodie.insert.shuffle.parallelism': 2}
> from pyspark.sql.functions import lit
> deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
> df = spark.sparkContext.parallelize(deletes).toDF(['uuid', 'partitionpath']).withColumn('ts', lit(0.0))
> df.write.format("hudi"). \
> options(**hudi_delete_options). \
> mode("append"). \
> save(basePath)
> # run the same read query as above.
> roAfterDeleteViewDF = spark. \
> read. \
> format("hudi"). \
> load(basePath)
> roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
> # fetch should return (total - 2) records
> spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count(){code}
> The count before delete is 10 and after delete is still 10 (expecting 8)
> {code:java}
> // code placeholder
> >>> df.show()
> +--------------------+--------------------+---+
> | partitionpath| uuid| ts|
> +--------------------+--------------------+---+
> |74bed794-c854-4aa...|americas/united_s...|0.0|
> |ce71c2dc-dedf-483...|americas/united_s...|0.0|
> +--------------------+--------------------+---+
> {code}
>
> The 2 records to be deleted
> Note, the
--
This message was sent by Atlassian Jira
(v8.3.4#803005)