You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2021/11/01 15:46:00 UTC

[jira] [Commented] (HUDI-2500) Spark datasource delete not working on Spark SQL created table

    [ https://issues.apache.org/jira/browse/HUDI-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17436907#comment-17436907 ] 

sivabalan narayanan commented on HUDI-2500:
-------------------------------------------

same comment as other ticket. Once PR is landed, ensure you assign it back to Yann before closing out the ticket. 

> Spark datasource delete not working on Spark SQL created table
> --------------------------------------------------------------
>
>                 Key: HUDI-2500
>                 URL: https://issues.apache.org/jira/browse/HUDI-2500
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Spark Integration
>            Reporter: Raymond Xu
>            Assignee: Raymond Xu
>            Priority: Blocker
>              Labels: sev:critical
>             Fix For: 0.10.0
>
>
> Original issue [https://github.com/apache/hudi/issues/3670]
>  
> Script to re-produce
> {code:java}
> val sparkSourceTablePath = s"${tmp.getCanonicalPath}/test_spark_table"
> val sparkSourceTableName = "test_spark_table"
> val hudiTablePath = s"${tmp.getCanonicalPath}/test_hudi_table"
> val hudiTableName = "test_hudi_table"
> println("0 - prepare source data")
> spark.createDataFrame(Seq(
>   ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
>   ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
>   ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
>   ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
>   ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
>   ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
> )).toDF("id", "creation_date", "last_update_time")
>   .withColumn("creation_date", expr("cast(creation_date as date)"))
>   .withColumn("id", expr("cast(id as bigint)"))
>   .write
>   .option("path", sparkSourceTablePath)
>   .mode("overwrite")
>   .format("parquet")
>   .saveAsTable(sparkSourceTableName)
> println("1 - CTAS to load data to Hudi")
> val hudiOptions = Map[String, String](
>   HoodieWriteConfig.TBL_NAME.key() -> hudiTableName,
>   DataSourceWriteOptions.TABLE_NAME.key() -> hudiTableName,
>   DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE",
>   DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "id",
>   DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getCanonicalName,
>   DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getCanonicalName,
>   DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "creation_date",
>   DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "last_update_time",
>   HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key() -> "1",
>   HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key() -> "1",
>   HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key() -> "1",
>   HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key() -> "1",
>   HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key() -> "1"
> )
> spark.sql(
>   s"""create table if not exists $hudiTableName using hudi
>      |          location '$hudiTablePath'
>      |          options (
>      |          type = 'cow',
>      |          primaryKey = 'id',
>      |          preCombineField = 'last_update_time'
>      |          )
>      |          partitioned by (creation_date)
>      |          AS
>      |          select id, last_update_time, creation_date from $sparkSourceTableName
>      |          """.stripMargin)
> println("2 - Hudi table has all records")
> spark.sql(s"select * from $hudiTableName").show(100)
> println("3 - pick 105 to delete")
> val rec105 = spark.sql(s"select * from $hudiTableName where id = 105")
> rec105.show()
> println("4 - issue delete (Spark SQL)")
> spark.sql(s"delete from $hudiTableName where id = 105")
> println("5 - 105 is deleted")
> spark.sql(s"select * from $hudiTableName").show(100)
> println("6 - pick 104 to delete")
> val rec104 = spark.sql(s"select * from $hudiTableName where id = 104")
> rec104.show()
> println("7 - issue delete (DataSource)")
> rec104.write
>   .format("hudi")
>   .options(hudiOptions)
>   .option(DataSourceWriteOptions.OPERATION.key(), "delete")
>   .option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), classOf[EmptyHoodieRecordPayload].getCanonicalName)
>   .mode(SaveMode.Append)
>   .save(hudiTablePath)
> println("8 - 104 should be deleted")
> spark.sql(s"select * from $hudiTableName").show(100)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)