You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/09/16 19:48:00 UTC

[jira] [Updated] (HUDI-4861) Relax MERGE INTO restrictions to permit casting of the matching condition

     [ https://issues.apache.org/jira/browse/HUDI-4861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexey Kudinkin updated HUDI-4861:
----------------------------------
    Fix Version/s: 0.12.1
                       (was: 0.13.0)

> Relax MERGE INTO restrictions to permit casting of the matching condition
> -------------------------------------------------------------------------
>
>                 Key: HUDI-4861
>                 URL: https://issues.apache.org/jira/browse/HUDI-4861
>             Project: Apache Hudi
>          Issue Type: Bug
>    Affects Versions: 0.12.0
>            Reporter: Alexey Kudinkin
>            Assignee: Alexey Kudinkin
>            Priority: Critical
>             Fix For: 0.12.1
>
>
> Reported by user:
> [https://github.com/apache/hudi/issues/6626]
>  
> Following code
>  
> {code:java}
>      target_df = spark.read.format("hudi").load(basePath)
>     print("###################################")
>     print(target_df.printSchema())
>     # # target_df.show()
>     target_datatype_map = {}
>     for name, dtype in target_df.dtypes:
>         target_datatype_map[name] = dtype
>     print(str(target_datatype_map))
>     print("###################################")
>     for col in columns:
>         if has_column(deflateDf, col):
>             deflateDf = deflateDf.withColumn(col, F.col(col))
>         else:
>             deflateDf = deflateDf.withColumn(col, F.lit(None))
>     deflateDf.createOrReplaceTempView("deflate_table")
>     create_sql = "create table RESULTDATA using hudi location '/tmp/RESULTDATA_mor'"
>     spark.sql(create_sql)
>     
>     merge_sql = """
>     merge into RESULTDATA as target
>         using (
>             select * from deflate_table as deflate
>         )
>         on target._context_id_ = deflate._context_id_ and target.id = deflate.id
>         when matched
>         then update set
>         target.CREATED = cast(if(array_contains(deflate.changed_cols, 'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, target.CREATEDBY) as string),target.DELETED = cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, target.DELETED) as timestamp),target.DELETEDBY = cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY, target.DELETEDBY) as string),target.EXPIRATIONDATE = cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'), deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID = cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols, 'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), deflate.LASTMODIFIED, target.LASTMODIFIED) as timestamp),target.LASTMODIFIEDBY = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.ORDERING = cast(if(array_contains(deflate.changed_cols, 'ORDERING'), deflate.ORDERING, target.ORDERING) as decimal(12,0)),target.RESULTID = cast(if(array_contains(deflate.changed_cols, 'RESULTID'), deflate.RESULTID, target.RESULTID) as decimal(12,0)),target.REPORTINGPERIODTYPE = cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'), deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols, 'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as timestamp),target.SATISFYINGNUMERATOR = cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'), deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols, 'VALUE'), deflate.VALUE, target.VALUE) as string),target._ETL_RUN_ID_ = cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as decimal(38,0)),target._ETL_MODIFIED_ = cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'), deflate._EXTRACTED_, target._EXTRACTED_) as timestamp),target._SOURCE_EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as timestamp),target._LAST_MODIFIED_SEQ_ = cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as decimal(38,0)),target._SCHEMA_CLASS_ = cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ = cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), deflate._CONTEXT_ID_, target._CONTEXT_ID_) as decimal(12,0)),target._IS_DELETED_ = cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
>         when not matched
>         then insert
>         (CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_) values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as decimal(12,0)),cast(deflate.RESULTID as decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as string),cast(deflate.RESULTDATE as timestamp),cast(deflate.SATISFYINGNUMERATOR as decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as decimal(38,0)),cast(deflate._ETL_MODIFIED_ as timestamp),cast(deflate._EXTRACTED_ as timestamp),cast(deflate._SOURCE_EXTRACTED_ as timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_ as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
>         """
>     spark.sql(merge_sql) {code}
>  
>  
> Results in the exception being thrown:
>  
> {code:java}
> /09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from file:///tmp/RESULTDATA_mor
> 22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20220907150126010__deltacommit__COMPLETED]}
> Traceback (most recent call last):
>   File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
>     process_table(deflate_df, tableName, table_cols[tableNames[0]], concurrent_write_enabled, delete_insert_enabled)
>   File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
>     merge_into_hudi(table_name, df, table_cols)
>   File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
>     target_rows = spark.sql(sql)
>   File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql
>   File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
>   File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
> pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition: (CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS DECIMAL(20,0)) AS DECIMAL(20,0))).The validate condition should be 'targetColumn = sourceColumnExpression', e.g. t.id = s.id and t.dt = from_unixtime(s.ts) {code}
>  
> This occurs due to the fact that current impl of {{MergeIntoHoodieTableCommand}} restricts target table's primary key to be just an {{{}AttributeReference{}}}, which in this case is wrapped into a {{Cast}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)