You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/08/25 06:23:00 UTC

[jira] [Commented] (HUDI-2357) MERGE INTO doesn't work for tables created using CTAS

    [ https://issues.apache.org/jira/browse/HUDI-2357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404199#comment-17404199 ] 

ASF GitHub Bot commented on HUDI-2357:
--------------------------------------

pengzhiwei2018 opened a new pull request #3534:
URL: https://github.com/apache/hudi/pull/3534


   
   
   ## What is the purpose of the pull request
   
   *Fix the missing extraOptions passed by CTAS. This missing options will lead the Merge Into not work for CTAS.
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> MERGE INTO doesn't work for tables created using CTAS
> -----------------------------------------------------
>
>                 Key: HUDI-2357
>                 URL: https://issues.apache.org/jira/browse/HUDI-2357
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: Spark Integration
>            Reporter: Vinoth Govindarajan
>            Assignee: pengzhiwei
>            Priority: Major
>             Fix For: 0.9.0
>
>
> MERGE INTO command doesn't select the correct primary key for tables created using CTAS, whereas it works for tables created using CREATE TABLE command.
> I guess we are hitting this issue because the key generator class is set to SqlKeyGenerator for tables created using CTAS:
> working use-case:
> {code:java}
> create table h5 (id bigint, name string, ts bigint) using hudi
> options (type = "cow" , primaryKey="id" , preCombineField="ts" );
> merge into h5 as t0
> using (
>     select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts
> ) t1
> on t1.s_id = t0.id
> when matched then update set * 
> when not matched then insert *;
> {code}
> hoodie.properties for working use-case:
> {code:java}
> ➜  analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties
> #Properties saved on Wed Aug 25 04:10:33 UTC 2021
> #Wed Aug 25 04:10:33 UTC 2021
> hoodie.table.name=h5
> hoodie.table.recordkey.fields=id
> hoodie.table.type=COPY_ON_WRITE
> hoodie.table.precombine.field=ts
> hoodie.table.partition.fields=
> hoodie.archivelog.folder=archived
> hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]}
> hoodie.timeline.layout.version=1
> hoodie.table.version=1{code}
>  
> Whereas this doesn't work:
> {code:java}
> create table h4 using hudi options (type = "cow" , primaryKey="id" , preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name, current_timestamp();
> merge into h3 as t0u sing (select '5' as s_id, 'vinoth' as s_name, current_timestamp() as s_ts) t1 on t1.s_id = t0.id when matched then update set * when not matched then insert *;
> ========ERROR LOG====================
> 544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver  - Failed in [merge into analytics.h3 as t0using (    select '5' as s_id, 'vinoth' as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then update set *when not matched then insert *]java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the defined primary key[] in table h3 at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException: Merge Key[id] is not Equal to the defined primary key[] in table h3 at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425) at org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala){code}
> hoodie.properties for not working use-case (CTAS table):
> {code:java}
> ➜  analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
> #Properties saved on Wed Aug 25 04:09:37 UTC 2021
> #Wed Aug 25 04:09:37 UTC 2021
> hoodie.table.name=h4
> hoodie.table.recordkey.fields=id
> hoodie.table.type=COPY_ON_WRITE
> hoodie.table.precombine.field=ts
> hoodie.table.partition.fields=
> hoodie.archivelog.folder=archived
> hoodie.populate.meta.fields=true
> hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
> hoodie.table.base.file.format=PARQUET
> hoodie.timeline.layout.version=1
> hoodie.table.version=1{code}
> This is a blocker for the dbt integration https://issues.apache.org/jira/browse/HUDI-2319
> Please try to fix it as part of the 0.9.0 release so that the dbt integration can be unblocked.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)