You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Udit Mehrotra (Jira)" <ji...@apache.org> on 2021/08/25 08:55:00 UTC

[jira] [Closed] (HUDI-1021) [Bug] Unable to update bootstrapped table using rows from the written bootstrapped table

     [ https://issues.apache.org/jira/browse/HUDI-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Udit Mehrotra closed HUDI-1021.
-------------------------------
    Resolution: Fixed

> [Bug] Unable to update bootstrapped table using rows from the written bootstrapped table
> ----------------------------------------------------------------------------------------
>
>                 Key: HUDI-1021
>                 URL: https://issues.apache.org/jira/browse/HUDI-1021
>             Project: Apache Hudi
>          Issue Type: Sub-task
>          Components: bootstrap
>    Affects Versions: 0.9.0
>            Reporter: Udit Mehrotra
>            Assignee: Wenning Ding
>            Priority: Major
>             Fix For: 0.9.0
>
>
> Reproduction Steps:
>  
> {code:java}
> import spark.implicits._
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.DataSourceReadOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.HoodieDataSourceHelpers
> import org.apache.hudi.common.model.HoodieTableType
> import org.apache.spark.sql.SaveMode
> val sourcePath = "s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null"
> val sourceDf = spark.read.parquet(sourcePath + "/*")
> var tableName = "events_data_partitioned_non_null_00"
> var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName
> sourceDf.write.format("org.apache.hudi")
>  .option(HoodieWriteConfig.TABLE_NAME, tableName)
>  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>  .mode(SaveMode.Overwrite)
>  .save(tablePath)
> val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*")
> val updateDf = readDf.filter($"event_id" === "106")
>                  .withColumn("event_name", lit("udit_event_106"))
>                  
> updateDf.write.format("org.apache.hudi")
>  .option(HoodieWriteConfig.TABLE_NAME, tableName)
>  .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>  .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
>  .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>  .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>  .mode(SaveMode.Append)
>  .save(tablePath)
> {code}
>  
> Full Stack trace:
> {noformat}
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
>  at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276)
>  at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
>  at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>  at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
>  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
>  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
>  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
>  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
>  at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
>  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  at org.apache.spark.scheduler.Task.run(Task.scala:123)
>  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>  at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134)
>  at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90)
>  at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
>  at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269)
>  ... 30 more
>  Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
>  at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132)
>  ... 33 more
>  Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
>  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
>  ... 34 more
>  Caused by: java.lang.NullPointerException
>  at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222)
>  at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159)
>  at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149)
>  at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
>  at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  ... 3 more
> {noformat}
>  
> Here as you can see *updateDf* is being formed by reading row from the *bootstrapped hudi table* itself. If however, we for the *updateDf* from the source data it works fine:
> val readDf = spark.read.parquet(sourcePath + "/*")
>  val updateDf = readDf.filter($"event_id" === "106").withColumn("event_name", lit("udit_event_106"))
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)