You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/05 05:56:45 UTC

[GitHub] [hudi] Xiaohan-Shen opened a new issue, #6590: [SUPPORT] HoodieDeltaStreamer AWSDmsAvroPayload fails to handle deletes in MySQL

Xiaohan-Shen opened a new issue, #6590:
URL: https://github.com/apache/hudi/issues/6590

   **Describe the problem you faced**
   inspired by this [blog](https://cwiki.apache.org/confluence/display/HUDI/2020/01/20/Change+Capture+Using+AWS+Database+Migration+Service+and+Hudi), I am trying to set up Hudi Deltastreamer to continuously pick up changes in MySQL for a performance benchmark. My setup hosts MySQL on **AWS RDS**, captures changes in MySQL with **AWS DMS** as Parquet in S3, and runs HoodieDeltaStreamer with `--continuous` on **AWS EMR** to write the changes into a Hudi table on S3. 
   
   It's working fine with updates and inserts but throws exceptions on deletes. The row deleted in MySQL is not deleted in the Hudi table. I am new to Hudi so it's possible I have something configured wrong. 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Follow the setup steps in the [blog](https://cwiki.apache.org/confluence/display/HUDI/2020/01/20/Change+Capture+Using+AWS+Database+Migration+Service+and+Hudi)
   2. use this command for starting HoodieDeltaStreamer: 
   ```
   spark-submit 
     --jars /usr/lib/spark/external/lib/spark-avro.jar,/usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hudi/hudi-utilities-bundle.jar 
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
     --packages org.apache.hudi:hudi-spark-bundle_2.12:0.11.0,org.apache.spark:spark-avro_2.12:3.2.1 
     --master yarn --deploy-mode client /usr/lib/hudi/hudi-utilities-bundle.jar 
     --table-type COPY_ON_WRITE 
     --source-ordering-field updated_at 
     --source-class org.apache.hudi.utilities.sources.ParquetDFSSource 
     --target-base-path s3://mysql-data-replication/hudi_orders 
     --target-table hudi_orders 
     --transformer-class org.apache.hudi.utilities.transform.AWSDmsTransformer 
     --continuous 
     --hoodie-conf hoodie.datasource.write.recordkey.field=order_id 
     --hoodie-conf hoodie.datasource.write.partitionpath.field=customer_name 
     --hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://mysql-data-replication/hudi_dms/orders 
     --payload-class org.apache.hudi.payload.AWSDmsAvroPayload
   ```
   3. Insert a few rows to the MySQL table
   4. Delete a row
   
   **Expected behavior**
   
   Hudi should monitor and capture any changes (Inserts, updates, and deletes) in the MySQL table and writes them into the Hudi table. 
   
   I specified `--payload-class org.apache.hudi.payload.AWSDmsAvroPayload`, which should tell Hudi the right way to handle a row with `Op = D`. I.e. when a row in MySQL is deleted, Hudi should capture the change and delete the corresponding row in the Hudi table. 
   
   **Environment Description**
   
   * Hudi version : 0.11.0
   
   * Spark version : 3.2.1
   
   * Hive version : should be irrelevant, but 3.1.3
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   The command I ran to start Hudi is slightly different from that provided in the blog. The original one didn't work for me out of the box. Please let me know if I passed in the wrong configs in the command that might've caused this issue.
   
   **Stacktrace**
   
   ```
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   
   Driver stacktrace:
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:709)
   	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 156.0 failed 4 times, most recent failure: Lost task 0.3 in stage 156.0 (TID 6494) (ip-172-31-16-83.us-west-1.compute.internal executor 3): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:133)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   
   Driver stacktrace:
   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
   	at scala.Option.foreach(Option.scala:407)
   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2310)
   	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
   	at org.apache.spark.rdd.DoubleRDDFunctions.$anonfun$sum$1(DoubleRDDFunctions.scala:35)
   	at scala.runtime.java8.JFunction0$mcD$sp.apply(JFunction0$mcD$sp.java:23)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   	at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:35)
   	at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:603)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:331)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:675)
   	... 4 more
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:133)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	... 3 more
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   22/08/29 03:56:20 INFO Javalin: Stopping Javalin ...
   22/08/29 03:56:20 ERROR Javalin: Javalin failed to stop gracefully
   java.lang.InterruptedException
   	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
   	at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277)
   	at org.apache.hudi.org.eclipse.jetty.server.AbstractConnector.doStop(AbstractConnector.java:371)
   	at org.apache.hudi.org.eclipse.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88)
   	at org.apache.hudi.org.eclipse.jetty.server.ServerConnector.doStop(ServerConnector.java:246)
   	at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94)
   	at org.apache.hudi.org.eclipse.jetty.server.Server.doStop(Server.java:459)
   	at org.apache.hudi.org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94)
   	at io.javalin.Javalin.stop(Javalin.java:203)
   	at org.apache.hudi.timeline.service.TimelineService.close(TimelineService.java:337)
   	at org.apache.hudi.client.embedded.EmbeddedTimelineService.stop(EmbeddedTimelineService.java:137)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.close(DeltaSync.java:876)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.close(HoodieDeltaStreamer.java:811)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.onDeltaSyncShutdown(HoodieDeltaStreamer.java:222)
   	at org.apache.hudi.async.HoodieAsyncService.lambda$shutdownCallback$0(HoodieAsyncService.java:171)
   	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
   	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
   	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   22/08/29 03:56:20 INFO Javalin: Javalin has stopped
   22/08/29 03:56:20 INFO AbstractConnector: Stopped Spark@65a5357f{HTTP/1.1, (http/1.1)}{0.0.0.0:8090}
   22/08/29 03:56:20 INFO SparkUI: Stopped Spark web UI at http://ip-172-31-16-83.us-west-1.compute.internal:8090
   22/08/29 03:56:20 INFO YarnClientSchedulerBackend: Interrupting monitor thread
   22/08/29 03:56:20 INFO YarnClientSchedulerBackend: Shutting down all executors
   22/08/29 03:56:20 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
   22/08/29 03:56:20 INFO YarnClientSchedulerBackend: YARN client scheduler backend Stopped
   22/08/29 03:56:20 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
   22/08/29 03:56:20 INFO MemoryStore: MemoryStore cleared
   22/08/29 03:56:20 INFO BlockManager: BlockManager stopped
   22/08/29 03:56:20 INFO BlockManagerMaster: BlockManagerMaster stopped
   22/08/29 03:56:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
   22/08/29 03:56:20 INFO SparkContext: Successfully stopped SparkContext
   Exception in thread "main" org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: Job aborted due to stage failure: Task 0 in stage 156.0 failed 4 times, most recent failure: Lost task 0.3 in stage 156.0 (TID 6494) (ip-172-31-16-83.us-west-1.compute.internal executor 3): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:133)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   
   Driver stacktrace:
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:191)
   	at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:186)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
   	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
   	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
   	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
   	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
   	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
   	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
   	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: Job aborted due to stage failure: Task 0 in stage 156.0 failed 4 times, most recent failure: Lost task 0.3 in stage 156.0 (TID 6494) (ip-172-31-16-83.us-west-1.compute.internal executor 3): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:133)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   
   Driver stacktrace:
   	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
   	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
   	at org.apache.hudi.async.HoodieAsyncService.waitForShutdown(HoodieAsyncService.java:103)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$1(HoodieDeltaStreamer.java:189)
   	... 15 more
   Caused by: org.apache.hudi.exception.HoodieException: Job aborted due to stage failure: Task 0 in stage 156.0 failed 4 times, most recent failure: Lost task 0.3 in stage 156.0 (TID 6494) (ip-172-31-16-83.us-west-1.compute.internal executor 3): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:133)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   
   Driver stacktrace:
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:709)
   	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 156.0 failed 4 times, most recent failure: Lost task 0.3 in stage 156.0 (TID 6494) (ip-172-31-16-83.us-west-1.compute.internal executor 3): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:133)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   
   Driver stacktrace:
   	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
   	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
   	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
   	at scala.Option.foreach(Option.scala:407)
   	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
   	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
   	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
   	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
   	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2310)
   	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
   	at org.apache.spark.rdd.DoubleRDDFunctions.$anonfun$sum$1(DoubleRDDFunctions.scala:35)
   	at scala.runtime.java8.JFunction0$mcD$sp.apply(JFunction0$mcD$sp.java:23)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
   	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
   	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
   	at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:35)
   	at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:603)
   	at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:331)
   	at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:675)
   	... 4 more
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:133)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1474)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	... 3 more
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:89)
   	at org.apache.hudi.payload.AWSDmsAvroPayload.getInsertValue(AWSDmsAvroPayload.java:74)
   	at org.apache.hudi.io.HoodieMergeHandle.writeInsertRecord(HoodieMergeHandle.java:272)
   	at org.apache.hudi.io.HoodieMergeHandle.writeIncomingRecords(HoodieMergeHandle.java:380)
   	at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:388)
   	at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:154)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	... 28 more
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] yihua commented on issue #6590: [SUPPORT] HoodieDeltaStreamer AWSDmsAvroPayload fails to handle deletes in MySQL

Posted by GitBox <gi...@apache.org>.
yihua commented on issue #6590:
URL: https://github.com/apache/hudi/issues/6590#issuecomment-1240175913

   This is the same issue as #6552.  cc @rahil-c 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan closed issue #6590: [SUPPORT] HoodieDeltaStreamer AWSDmsAvroPayload fails to handle deletes in MySQL

Posted by GitBox <gi...@apache.org>.
nsivabalan closed issue #6590: [SUPPORT] HoodieDeltaStreamer AWSDmsAvroPayload fails to handle deletes in MySQL
URL: https://github.com/apache/hudi/issues/6590


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #6590: [SUPPORT] HoodieDeltaStreamer AWSDmsAvroPayload fails to handle deletes in MySQL

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #6590:
URL: https://github.com/apache/hudi/issues/6590#issuecomment-1238879167

   @codope : this is similar to the other issue you were triaging last week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #6590: [SUPPORT] HoodieDeltaStreamer AWSDmsAvroPayload fails to handle deletes in MySQL

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #6590:
URL: https://github.com/apache/hudi/issues/6590#issuecomment-1243932247

   closing as duplicate and we have a fix for it already.
   https://github.com/apache/hudi/pull/6637
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org