You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/07 16:34:57 UTC

[GitHub] [hudi] arunb2w opened a new issue, #6626: [SUPPORT] HUDI merge into via spark sql not working

arunb2w opened a new issue, #6626:
URL: https://github.com/apache/hudi/issues/6626

   Getting error pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition: when running the below code
   
   Hudi Version: 0.10
   Spark version: 3.1.2
   
   
   **Sample code**
   ```
        target_df = spark.read.format("hudi").load(basePath)
       print("###################################")
       print(target_df.printSchema())
       # # target_df.show()
       target_datatype_map = {}
       for name, dtype in target_df.dtypes:
           target_datatype_map[name] = dtype
       print(str(target_datatype_map))
       print("###################################")
   
       for col in columns:
           if has_column(deflateDf, col):
               deflateDf = deflateDf.withColumn(col, F.col(col))
           else:
               deflateDf = deflateDf.withColumn(col, F.lit(None))
       deflateDf.createOrReplaceTempView("deflate_table")
       create_sql = "create table RESULTDATA using hudi location '/tmp/RESULTDATA_mor'"
       spark.sql(create_sql)
       
       merge_sql = """
       merge into RESULTDATA as target
           using (
               select * from deflate_table as deflate
           )
           on target._context_id_ = deflate._context_id_ and target.id = deflate.id
           when matched
           then update set
           target.CREATED = cast(if(array_contains(deflate.changed_cols, 'CREATED'), deflate.CREATED, target.CREATED) as timestamp),target.CREATEDBY = cast(if(array_contains(deflate.changed_cols, 'CREATEDBY'), deflate.CREATEDBY, target.CREATEDBY) as string),target.DELETED = cast(if(array_contains(deflate.changed_cols, 'DELETED'), deflate.DELETED, target.DELETED) as timestamp),target.DELETEDBY = cast(if(array_contains(deflate.changed_cols, 'DELETEDBY'), deflate.DELETEDBY, target.DELETEDBY) as string),target.EXPIRATIONDATE = cast(if(array_contains(deflate.changed_cols, 'EXPIRATIONDATE'), deflate.EXPIRATIONDATE, target.EXPIRATIONDATE) as timestamp),target.ID = cast(if(array_contains(deflate.changed_cols, 'ID'), deflate.ID, target.ID) as decimal(12,0)),target.KEY = cast(if(array_contains(deflate.changed_cols, 'KEY'), deflate.KEY, target.KEY) as string),target.LASTMODIFIED = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIED'), deflate.LASTMODIFIED, target.LASTMODIFIED) as timesta
 mp),target.LASTMODIFIEDBY = cast(if(array_contains(deflate.changed_cols, 'LASTMODIFIEDBY'), deflate.LASTMODIFIEDBY, target.LASTMODIFIEDBY) as string),target.ORDERING = cast(if(array_contains(deflate.changed_cols, 'ORDERING'), deflate.ORDERING, target.ORDERING) as decimal(12,0)),target.RESULTID = cast(if(array_contains(deflate.changed_cols, 'RESULTID'), deflate.RESULTID, target.RESULTID) as decimal(12,0)),target.REPORTINGPERIODTYPE = cast(if(array_contains(deflate.changed_cols, 'REPORTINGPERIODTYPE'), deflate.REPORTINGPERIODTYPE, target.REPORTINGPERIODTYPE) as string),target.RESULTDATE = cast(if(array_contains(deflate.changed_cols, 'RESULTDATE'), deflate.RESULTDATE, target.RESULTDATE) as timestamp),target.SATISFYINGNUMERATOR = cast(if(array_contains(deflate.changed_cols, 'SATISFYINGNUMERATOR'), deflate.SATISFYINGNUMERATOR, target.SATISFYINGNUMERATOR) as decimal(12,0)),target.VALUE = cast(if(array_contains(deflate.changed_cols, 'VALUE'), deflate.VALUE, target.VALUE) as string),target.
 _ETL_RUN_ID_ = cast(if(array_contains(deflate.changed_cols, '_ETL_RUN_ID_'), deflate._ETL_RUN_ID_, target._ETL_RUN_ID_) as decimal(38,0)),target._ETL_MODIFIED_ = cast(if(array_contains(deflate.changed_cols, '_ETL_MODIFIED_'), deflate._ETL_MODIFIED_, target._ETL_MODIFIED_) as timestamp),target._EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_EXTRACTED_'), deflate._EXTRACTED_, target._EXTRACTED_) as timestamp),target._SOURCE_EXTRACTED_ = cast(if(array_contains(deflate.changed_cols, '_SOURCE_EXTRACTED_'), deflate._SOURCE_EXTRACTED_, target._SOURCE_EXTRACTED_) as timestamp),target._LAST_MODIFIED_SEQ_ = cast(if(array_contains(deflate.changed_cols, '_LAST_MODIFIED_SEQ_'), deflate._LAST_MODIFIED_SEQ_, target._LAST_MODIFIED_SEQ_) as decimal(38,0)),target._SCHEMA_CLASS_ = cast(if(array_contains(deflate.changed_cols, '_SCHEMA_CLASS_'), deflate._SCHEMA_CLASS_, target._SCHEMA_CLASS_) as string),target._CONTEXT_ID_ = cast(if(array_contains(deflate.changed_cols, '_CONTEXT_ID_'), defla
 te._CONTEXT_ID_, target._CONTEXT_ID_) as decimal(12,0)),target._IS_DELETED_ = cast(if(array_contains(deflate.changed_cols, '_IS_DELETED_'), deflate._IS_DELETED_, target._IS_DELETED_) as boolean)
           when not matched
           then insert
           (CREATED,CREATEDBY,DELETED,DELETEDBY,EXPIRATIONDATE,ID,KEY,LASTMODIFIED,LASTMODIFIEDBY,ORDERING,RESULTID,REPORTINGPERIODTYPE,RESULTDATE,SATISFYINGNUMERATOR,VALUE,_ETL_RUN_ID_,_ETL_MODIFIED_,_EXTRACTED_,_SOURCE_EXTRACTED_,_LAST_MODIFIED_SEQ_,_SCHEMA_CLASS_,_CONTEXT_ID_,_IS_DELETED_) values (cast(deflate.CREATED as timestamp),cast(deflate.CREATEDBY as string),cast(deflate.DELETED as timestamp),cast(deflate.DELETEDBY as string),cast(deflate.EXPIRATIONDATE as timestamp),cast(deflate.ID as decimal(12,0)),cast(deflate.KEY as string),cast(deflate.LASTMODIFIED as timestamp),cast(deflate.LASTMODIFIEDBY as string),cast(deflate.ORDERING as decimal(12,0)),cast(deflate.RESULTID as decimal(12,0)),cast(deflate.REPORTINGPERIODTYPE as string),cast(deflate.RESULTDATE as timestamp),cast(deflate.SATISFYINGNUMERATOR as decimal(12,0)),cast(deflate.VALUE as string),cast(deflate._ETL_RUN_ID_ as decimal(38,0)),cast(deflate._ETL_MODIFIED_ as timestamp),cast(deflate._EXTRACTED_ as timestamp),cast(de
 flate._SOURCE_EXTRACTED_ as timestamp),cast(deflate._LAST_MODIFIED_SEQ_ as decimal(38,0)),cast(deflate._SCHEMA_CLASS_ as string),cast(deflate._CONTEXT_ID_ as decimal(12,0)),cast(deflate._IS_DELETED_ as boolean))
           """
       spark.sql(merge_sql)
       
   ```
   When running the above sample code using below command getting the error
   
   ```
   spark-submit --master local --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 
   --jars file:///Users/parunkarthick/Downloads/spark-snowflake_2.12-2.10.0-spark_3.1.jar,file:///Users/parunkarthick/Downloads/snowflake-jdbc-3.13.14.jar 
   --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' 
   --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' 
   --driver-memory 1g --executor-memory 1g main.py
   ```
   
   **Error stackstrace**
   `22/09/07 18:47:12 INFO HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from file:///tmp/RESULTDATA_mor
   22/09/07 18:47:12 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20220907150126010__deltacommit__COMPLETED]}
   Traceback (most recent call last):
     File "/Users/parunkarthick/cdc-poc/main.py", line 971, in <module>
       process_table(deflate_df, tableName, table_cols[tableNames[0]], concurrent_write_enabled, delete_insert_enabled)
     File "/Users/parunkarthick/cdc-poc/main.py", line 767, in process_table
       merge_into_hudi(table_name, df, table_cols)
     File "/Users/parunkarthick/cdc-poc/main.py", line 599, in merge_into_hudi
       target_rows = spark.sql(sql)
     File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/session.py", line 723, in sql
     File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
     File "/opt/homebrew/Cellar/apache-spark/3.1.2/libexec/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
   pyspark.sql.utils.AnalysisException: Invalidate Merge-On condition: (CAST(target.`id` AS DECIMAL(20,0)) = CAST(CAST(deflate.`id` AS DECIMAL(20,0)) AS DECIMAL(20,0))).The validate condition should be 'targetColumn = sourceColumnExpression', e.g. t.id = s.id and t.dt = from_unixtime(s.ts)`
   
   I would really appreciate if anyone can help with this issue, or point me in the right direction if in case I've missed anything.
   Thanks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on issue #6626: [SUPPORT] HUDI merge into via spark sql not working

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on issue #6626:
URL: https://github.com/apache/hudi/issues/6626#issuecomment-1248287122

   @xushiyan you can assign this one to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan closed issue #6626: [SUPPORT] HUDI merge into via spark sql not working

Posted by GitBox <gi...@apache.org>.
xushiyan closed issue #6626: [SUPPORT] HUDI merge into via spark sql not working
URL: https://github.com/apache/hudi/issues/6626


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] alexeykudinkin commented on issue #6626: [SUPPORT] HUDI merge into via spark sql not working

Posted by GitBox <gi...@apache.org>.
alexeykudinkin commented on issue #6626:
URL: https://github.com/apache/hudi/issues/6626#issuecomment-1249744358

   Filed HUDI-4861, we can close this one


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] arunb2w commented on issue #6626: [SUPPORT] HUDI merge into via spark sql not working

Posted by GitBox <gi...@apache.org>.
arunb2w commented on issue #6626:
URL: https://github.com/apache/hudi/issues/6626#issuecomment-1240154803

   @nsivabalan Can you please provide some help on this issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on issue #6626: [SUPPORT] HUDI merge into via spark sql not working

Posted by GitBox <gi...@apache.org>.
xushiyan commented on issue #6626:
URL: https://github.com/apache/hudi/issues/6626#issuecomment-1250519878

   > Filed [HUDI-4861](https://issues.apache.org/jira/browse/HUDI-4861), we can close this one
   
   as per the ticket, condition with casting target pk `(CAST(target.`id` AS DECIMAL(20,0))` needs to be supported


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on issue #6626: [SUPPORT] HUDI merge into via spark sql not working

Posted by GitBox <gi...@apache.org>.
xushiyan commented on issue #6626:
URL: https://github.com/apache/hudi/issues/6626#issuecomment-1247449888

   @arunb2w noticed that you're on hudi 0.10. would you also verify if 0.12 has the same behavior?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org