You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/09/16 19:48:00 UTC
[jira] [Created] (HUDI-4861) Relax MERGE INTO restrictions to permit casting of the matching condition
Alexey Kudinkin created HUDI-4861:
-------------------------------------
Summary: Relax MERGE INTO restrictions to permit casting of the matching condition
Key: HUDI-4861
URL: https://issues.apache.org/jira/browse/HUDI-4861
Project: Apache Hudi
Issue Type: Bug
Affects Versions: 0.12.0
Reporter: Alexey Kudinkin
Assignee: Alexey Kudinkin
Fix For: 0.13.0
Reported by user:
[https://github.com/apache/hudi/issues/6626]
Following code
{code:java}
target_df = spark.read.format("hudi").load(basePath)
print("###################################")
print(target_df.printSchema())
# # target_df.show()
target_datatype_map = {}
for name, dtype in target_df.dtypes:
target_datatype_map[name] = dtype
print(str(target_datatype_map))
print("###################################")
for col in columns:
if has_column(deflateDf, col):
deflateDf = deflateDf.withColumn(col, F.col(col))
else:
deflateDf = deflateDf.withColumn(col, F.lit(None))
deflateDf.createOrReplaceTempView("deflate_table")
create_sql = "create table RESULTDATA using hudi location '/tmp/RESULTDATA_mor'"
spark.sql(create_sql)
merge_sql = """
merge into RESULTDATA as target
using (
select * from deflate_table as deflate
)
on target._context_id_ = deflate._context_id_ and target.id = deflate.id
when matched
then update set
target.CREATED = cast(if(array_contains(deflate.changed_cols, 'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, target.CREATEDBY) as string),target.DELETED = cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, target.DELETED) as timestamp),target.DELETEDBY = cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY, target.DELETEDBY) as string),target.EXPIRATIONDATE = cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'), deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID = cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols, 'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), deflate.LASTMODIFIED, target.LASTMODIFIED) as timestamp),target.LASTMODIFIEDBY = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.ORDERING = cast(if(array_contains(deflate.changed_cols, 'ORDERING'), deflate.ORDERING, target.ORDERING) as decimal(12,0)),target.RESULTID = cast(if(array_contains(deflate.changed_cols, 'RESULTID'), deflate.RESULTID, target.RESULTID) as decimal(12,0)),target.REPORTINGPERIODTYPE = cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'), deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols, 'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as timestamp),target.SATISFYINGNUMERATOR = cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'), deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols, 'VALUE'), deflate.VALUE, target.VALUE) as string),target._ETL_RUN_ID_ = cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as decimal(38,0)),target._ETL_MODIFIED_ = cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'), deflate._EXTRACTED_, target._EXTRACTED_) as timestamp),target._SOURCE_EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as timestamp),target._LAST_MODIFIED_SEQ_ = cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as decimal(38,0)),target._SCHEMA_CLASS_ = cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ = cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), deflate._CONTEXT_ID_, target._CONTEXT_ID_) as decimal(12,0)),target._IS_DELETED_ = cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
when not matched
then insert
(CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_) values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as decimal(12,0)),cast(deflate.RESULTID as decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as string),cast(deflate.RESULTDATE as timestamp),cast(deflate.SATISFYINGNUMERATOR as decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as decimal(38,0)),cast(deflate._ETL_MODIFIED_ as timestamp),cast(deflate._EXTRACTED_ as timestamp),cast(deflate._SOURCE_EXTRACTED_ as timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_ as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
"""
spark.sql(merge_sql) {code}
Results in the exception being thrown:
{code:java}
/09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from file:///tmp/RESULTDATA_mor
22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20220907150126010__deltacommit__COMPLETED]}
Traceback (most recent call last):
File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
process_table(deflate_df, tableName, table_cols[tableNames[0]], concurrent_write_enabled, delete_insert_enabled)
File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
merge_into_hudi(table_name, df, table_cols)
File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
target_rows = spark.sql(sql)
File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql
File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition: (CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS DECIMAL(20,0)) AS DECIMAL(20,0))).The validate condition should be 'targetColumn = sourceColumnExpression', e.g. t.id = s.id and t.dt = from_unixtime(s.ts) {code}
This occurs due to the fact that current impl of {{MergeIntoHoodieTableCommand}} restricts target table's primary key to be just an {{{}AttributeReference{}}}, which in this case is wrapped into a {{Cast}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)