You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "jonvex (via GitHub)" <gi...@apache.org> on 2023/03/08 19:58:50 UTC

[GitHub] [hudi] jonvex commented on issue #8123: [SUPPORT] Merge Into can not be used

jonvex commented on issue #8123:
URL: https://github.com/apache/hudi/issues/8123#issuecomment-1460771740

   This seems to be not working in 0.12.2 but currently working in master
   ```
   spark.sql(
     s"""
        |create table tbl1 (
        |  id int,
        |  name string,
        |  data int,
        |  country string,
        |  ts bigint
        |) using hudi
        |tblproperties (
        |  type = 'cow',
        |  primaryKey = 'id',
        |  preCombineField = 'ts',
        |  'hoodie.datasource.meta.sync.enable' = 'false',
        |  'hoodie.datasource.hive_sync.enable' = 'false'
        | )
        |partitioned by (country)
        |location '/tmp/test8123/tbl1'
        |""".stripMargin)
   
   
   
   spark.sql(
     s"""
        |create table tbl1i (
        |  id int,
        |  name string,
        |  data int,
        |  country string,
        |  ts bigint
        |) using hudi
        |tblproperties (
        |  type = 'cow',
        |  primaryKey = 'id',
        |  preCombineField = 'ts',
        |  'hoodie.datasource.meta.sync.enable' = 'false',
        |  'hoodie.datasource.hive_sync.enable' = 'false'
        | )
        |partitioned by (country)
        |location '/tmp/test8123/tbl1i'
        |""".stripMargin)
   
     spark.sql("insert into tbl1 select 1, 'lb', 24, 195, 'shu'")
     spark.sql("insert into tbl1 select 2, 'gy', 12, 193, 'shu'")
     spark.sql("insert into tbl1 select 1, 'cc', 22, 193, 'wei'")
     spark.sql("insert into tbl1 select 2, 'xy', 23, 193, 'wei'")
   
     spark.sql("insert into tbl1i select 1, 'lb', 22, 194, 'shu'")
     spark.sql("insert into tbl1i select 2, 'gy', 12, 193, 'shu'")
     spark.sql("insert into tbl1i select 1, 'cc', 22, 193, 'wei'")
     spark.sql("insert into tbl1i select 2, 'xy', 23, 193, 'wei'")
   ```
   Then if you perform the merge into as 
   ```
     spark.sql(
     s"""
        |merge into tbl1i as target
        |using (
        |	select * from tbl1
        |) source
        |on source.id = target.id 
        |when matched and data < source.data then
        |update set data = source.data, ts = source.ts
        |when not matched then
        |insert *
        |""".stripMargin)
   ```
   it will not work. I made the < into a > and then finally an =. Only when it was = did it update. That means that in the condition data and source.data are referencing the same thing. I then changed it to target.data < source.data. When I did that, I got the error 
   
   <details>
   
   ```
   org.apache.spark.sql.AnalysisException: Cannot resolve 'target.data in (target.data < source.data), the input columns is: [_hoodie_commit_time#200, _hoodie_commit_seqno#201, _hoodie_record_key#202, _hoodie_partition_path#203, _hoodie_file_name#204, id#205, name#206, data#207, ts#208L, country#209]
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.org$apache$spark$sql$hudi$analysis$HoodieResolveReferences$$resolveExpressionFrom(HoodieAnalysis.scala:572)
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.$anonfun$applyOrElse$4(HoodieAnalysis.scala:316)
     at scala.Option.map(Option.scala:230)
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.resolveConditionAssignments$1(HoodieAnalysis.scala:316)
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.$anonfun$applyOrElse$15(HoodieAnalysis.scala:361)
     at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
     at scala.collection.AbstractTraversable.map(Traversable.scala:108)
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.applyOrElse(HoodieAnalysis.scala:358)
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences$$anonfun$apply$1.applyOrElse(HoodieAnalysis.scala:261)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:30)
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.apply(HoodieAnalysis.scala:261)
     at org.apache.spark.sql.hudi.analysis.HoodieResolveReferences.apply(HoodieAnalysis.scala:257)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
     at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
     at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
     at scala.collection.immutable.List.foldLeft(List.scala:91)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
     at scala.collection.immutable.List.foreach(List.scala:431)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:222)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:218)
     at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:167)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:218)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:182)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
     at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:203)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
     at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:75)
     at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
     at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
     at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:75)
     at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:73)
     at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:65)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
     at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
     ... 58 elided
   ```
   </details>
   
   If you try this with master and don't do target.data, you get the exception
   
   <details>
   
   ```
   org.apache.spark.sql.AnalysisException: Reference 'data' is ambiguous, could be: target.data, source.data.; line 7 pos 17
     at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:372)
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:112)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$resolveExpressionByPlanChildren$1(Analyzer.scala:1880)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$resolveExpression$2(Analyzer.scala:1810)
     at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:60)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.innerResolve$1(Analyzer.scala:1817)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$resolveExpression$7(Analyzer.scala:1830)
     at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1148)
     at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1147)
     at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:555)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.innerResolve$1(Analyzer.scala:1830)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.resolveExpression(Analyzer.scala:1835)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.resolveExpressionByPlanChildren(Analyzer.scala:1886)
     at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$17.$anonfun$applyOrElse$89(Analyzer.scala:1552)
     at scala.Option.map(Option.scala:230)
     at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$17.$anonfun$applyOrElse$87(Analyzer.scala:1552)
     at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
     at scala.collection.AbstractTraversable.map(Traversable.scala:108)
     at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$17.applyOrElse(Analyzer.scala:1545)
     at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$17.applyOrElse(Analyzer.scala:1438)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$5(AnalysisHelper.scala:142)
     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:142)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30)
     at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1438)
     at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1418)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
     at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
     at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
     at scala.collection.immutable.List.foldLeft(List.scala:91)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
     at scala.collection.immutable.List.foreach(List.scala:431)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:222)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:218)
     at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:167)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:218)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:182)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
     at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
     at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:203)
     at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
     at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:202)
     at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:75)
     at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
     at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:183)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:183)
     at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:75)
     at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:73)
     at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:65)
     at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:98)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
     at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
     at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
     ... 58 elided
   ```
   
   </details>
   
   So in conclusion, in current master this works
   ```
    spark.sql(
     s"""
        |merge into tbl1i as target
        |using (
        |	select * from tbl1
        |) source
        |on source.id = target.id 
        |when matched and target.data < source.data then
        |update set data = source.data, ts = source.ts
        |when not matched then
        |insert *
        |""".stripMargin)
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org