You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Raymond Xu (Jira)" <ji...@apache.org> on 2021/11/03 15:29:00 UTC
[jira] [Updated] (HUDI-2500) Spark datasource delete not working on
Spark SQL created table
[ https://issues.apache.org/jira/browse/HUDI-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Raymond Xu updated HUDI-2500:
-----------------------------
Reviewers: Raymond Xu
> Spark datasource delete not working on Spark SQL created table
> --------------------------------------------------------------
>
> Key: HUDI-2500
> URL: https://issues.apache.org/jira/browse/HUDI-2500
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: Spark Integration
> Reporter: Raymond Xu
> Assignee: Yann Byron
> Priority: Blocker
> Labels: sev:critical
> Fix For: 0.10.0
>
>
> Original issue [https://github.com/apache/hudi/issues/3670]
>
> Script to re-produce
> {code:java}
> val sparkSourceTablePath = s"${tmp.getCanonicalPath}/test_spark_table"
> val sparkSourceTableName = "test_spark_table"
> val hudiTablePath = s"${tmp.getCanonicalPath}/test_hudi_table"
> val hudiTableName = "test_hudi_table"
> println("0 - prepare source data")
> spark.createDataFrame(Seq(
> ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
> ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
> ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
> ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
> ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
> ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
> )).toDF("id", "creation_date", "last_update_time")
> .withColumn("creation_date", expr("cast(creation_date as date)"))
> .withColumn("id", expr("cast(id as bigint)"))
> .write
> .option("path", sparkSourceTablePath)
> .mode("overwrite")
> .format("parquet")
> .saveAsTable(sparkSourceTableName)
> println("1 - CTAS to load data to Hudi")
> val hudiOptions = Map[String, String](
> HoodieWriteConfig.TBL_NAME.key() -> hudiTableName,
> DataSourceWriteOptions.TABLE_NAME.key() -> hudiTableName,
> DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE",
> DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "id",
> DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getCanonicalName,
> DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() -> classOf[DefaultHoodieRecordPayload].getCanonicalName,
> DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "creation_date",
> DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "last_update_time",
> HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key() -> "1",
> HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key() -> "1",
> HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key() -> "1",
> HoodieWriteConfig.FINALIZE_WRITE_PARALLELISM_VALUE.key() -> "1",
> HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key() -> "1"
> )
> spark.sql(
> s"""create table if not exists $hudiTableName using hudi
> | location '$hudiTablePath'
> | options (
> | type = 'cow',
> | primaryKey = 'id',
> | preCombineField = 'last_update_time'
> | )
> | partitioned by (creation_date)
> | AS
> | select id, last_update_time, creation_date from $sparkSourceTableName
> | """.stripMargin)
> println("2 - Hudi table has all records")
> spark.sql(s"select * from $hudiTableName").show(100)
> println("3 - pick 105 to delete")
> val rec105 = spark.sql(s"select * from $hudiTableName where id = 105")
> rec105.show()
> println("4 - issue delete (Spark SQL)")
> spark.sql(s"delete from $hudiTableName where id = 105")
> println("5 - 105 is deleted")
> spark.sql(s"select * from $hudiTableName").show(100)
> println("6 - pick 104 to delete")
> val rec104 = spark.sql(s"select * from $hudiTableName where id = 104")
> rec104.show()
> println("7 - issue delete (DataSource)")
> rec104.write
> .format("hudi")
> .options(hudiOptions)
> .option(DataSourceWriteOptions.OPERATION.key(), "delete")
> .option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key(), classOf[EmptyHoodieRecordPayload].getCanonicalName)
> .mode(SaveMode.Append)
> .save(hudiTablePath)
> println("8 - 104 should be deleted")
> spark.sql(s"select * from $hudiTableName").show(100)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)