You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/01 09:13:34 UTC

[GitHub] [hudi] joao-miranda commented on issue #6552: [SUPPORT] AWSDmsAvroPayload does not work correctly with any version above 0.10.0

joao-miranda commented on issue #6552:
URL: https://github.com/apache/hudi/issues/6552#issuecomment-1233993157

   Thank you for your quick reply.  It doesn't seem to be either of those options for this scenario. I created a unit test and added the full code so you can understand our use case better:
   
   ```
   test("test_hudi_upgrade") {
       println("=============test_hudi_upgrade=============")
   
       val yesterday_date = 1654520683000L
       val today_date = 1654607083000L
       val tomorrow_date = 1654608083000L
   
       val testDF_upgrade: DataFrame = sparkSession.createDataFrame(
         List(
           (null, yesterday_date, 1, "Clark Kent", "Superman"),
           (null, yesterday_date, 2, "Bruce Wayne", "Batman"),
           (null, yesterday_date, 3, "Diana Prince", "Wonder Woman"),
           (null, yesterday_date, 4, "Hal Jordan", "Green Lantern"),
           ("I", today_date, 5, "Barry Allen", "The Flash"),
           ("I", today_date, 6, "Arthur Curry", "Aquaman"),
           ("D", today_date, 2, "Bruce Wayne", "Detective Comics"),
           ("U", today_date, 4, "John Stewart", "Green Lantern"),
           ("U", tomorrow_date, 4, "Guy Gardner", "Green Lantern")
         )
       ).toDF("Op", "ts", "id", "name", "alias")
   
       val expectedDF_upgrade = sparkSession.createDataFrame(
         List(
           (yesterday_date, 1, "Clark Kent", "Superman"),
           (yesterday_date, 3, "Diana Prince", "Wonder Woman"),
           (tomorrow_date, 4, "Guy Gardner", "Green Lantern"),
           (today_date, 5, "Barry Allen", "The Flash"),
           (today_date, 6, "Arthur Curry", "Aquaman"),
         )
       )
         .toDF("ts", "id", "name", "alias")
         .orderBy("id")
   
       val testDirectory = "file:/tmp/hudi/"
       val testDatabase = "testDatabase"
       val testTable = "testTable"
       val tableName = "upgrade_table"
   
   
       var hudiOptions = scala.collection.mutable.Map[String, String](
         HoodieWriteConfig.TABLE_NAME -> tableName,
         DataSourceWriteOptions.OPERATION_OPT_KEY -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
         DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
         DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
         DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY ->  "ts",
         DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[AWSDmsAvroPayload].getName,
         DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[CustomKeyGenerator].getName,
         DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> ""
       )
   
       // Write the DataFrame as a Hudi dataset
       testDF_upgrade
         .dropDuplicates()
         .write
         .format("org.apache.hudi")
         .options(hudiOptions)
         .mode(SaveMode.Append)
         .save(s"$testDirectory/$testDatabase/$tableName")
   
   
   
       val actualDF_upgrade = sparkSession.read.format("org.apache.hudi").load(
         s"$testDirectory/$testDatabase/$tableName"
       )
         .orderBy("id")
   
       println("=============Expected DF=============")
       expectedDF_upgrade.show(false)
       println("=============Actual DF=============")
       actualDF_upgrade.show(false)
   
       assertSmallDatasetEquality(actualDF_upgrade.select("ts", "id", "name", "alias"), expectedDF_upgrade, true, false, true)
     }
   ```
     
   This runs succesfully with Hudi 0.10.0:
   
   ```
   =============test_hudi_upgrade=============
   =============Expected DF=============
   +-------------+---+------------+-------------+
   |ts           |id |name        |alias        |
   +-------------+---+------------+-------------+
   |1654520683000|1  |Clark Kent  |Superman     |
   |1654520683000|3  |Diana Prince|Wonder Woman |
   |1654608083000|4  |Guy Gardner |Green Lantern|
   |1654607083000|5  |Barry Allen |The Flash    |
   |1654607083000|6  |Arthur Curry|Aquaman      |
   +-------------+---+------------+-------------+
   
   =============Actual DF=============
   +-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+
   |_hoodie_commit_time|_hoodie_commit_seqno |_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name                                                         |Op  |ts           |id |name        |alias        |
   +-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+
   |20220901100706768  |20220901100706768_0_3|1                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|null|1654520683000|1  |Clark Kent  |Superman     |
   |20220901100706768  |20220901100706768_0_4|3                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|null|1654520683000|3  |Diana Prince|Wonder Woman |
   |20220901100706768  |20220901100706768_0_5|4                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|U   |1654608083000|4  |Guy Gardner |Green Lantern|
   |20220901100706768  |20220901100706768_0_1|5                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|I   |1654607083000|5  |Barry Allen |The Flash    |
   |20220901100706768  |20220901100706768_0_2|6                 |                      |431a7e13-bc66-404a-8193-b1c8a1adb194-0_0-32-2205_20220901100706768.parquet|I   |1654607083000|6  |Arthur Curry|Aquaman      |
   +-------------------+---------------------+------------------+----------------------+--------------------------------------------------------------------------+----+-------------+---+------------+-------------+
   
   [info] RawCDCMergeTest:
   [info] - test_hudi_upgrade
   ```
   
   and fails with Hudi 0.12.0:
   ```
   =============test_hudi_upgrade=============
   32131 [consumer-thread-1] ERROR org.apache.hudi.io.HoodieWriteHandle  - Error writing record HoodieRecord{key=HoodieKey { recordKey=2 partitionPath=}, currentLocation='null', newLocation='null'}
   java.util.NoSuchElementException: No value present in Option
           at org.apache.hudi.common.util.Option.get(Option.java:89)
           at org.apache.hudi.common.model.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:72)
           at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
           at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
           at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
           at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
           at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
   32345 [pool-6-thread-3-ScalaTest-running-RawCDCMergeTest] ERROR org.apache.hudi.HoodieSparkSqlWriter$  - UPSERT failed with errors
   [info] RawCDCMergeTest:
   [info] - test_hudi_upgrade *** FAILED ***
   [info]   org.apache.spark.sql.AnalysisException: cannot resolve '`id`' given input columns: [];
   [info] 'Sort ['id ASC NULLS FIRST], true
   [info] +- Relation[] org.apache.hudi.EmptyRelation@5144bb56
   [info]   at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
   [info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155)
   [info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152)
   [info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341)
   [info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
   [info]   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:341)
   [info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:338)
   [info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:407)
   [info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:243)
   [info]   at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:405)
   [info]   ...
   ```
   
   Please let me know if you need any other information.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org