You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Udit Mehrotra (Jira)" <ji...@apache.org> on 2021/08/25 08:55:00 UTC
[jira] [Closed] (HUDI-1021) [Bug] Unable to update bootstrapped
table using rows from the written bootstrapped table
[ https://issues.apache.org/jira/browse/HUDI-1021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Udit Mehrotra closed HUDI-1021.
-------------------------------
Resolution: Fixed
> [Bug] Unable to update bootstrapped table using rows from the written bootstrapped table
> ----------------------------------------------------------------------------------------
>
> Key: HUDI-1021
> URL: https://issues.apache.org/jira/browse/HUDI-1021
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: bootstrap
> Affects Versions: 0.9.0
> Reporter: Udit Mehrotra
> Assignee: Wenning Ding
> Priority: Major
> Fix For: 0.9.0
>
>
> Reproduction Steps:
>
> {code:java}
> import spark.implicits._
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.DataSourceReadOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.hudi.HoodieDataSourceHelpers
> import org.apache.hudi.common.model.HoodieTableType
> import org.apache.spark.sql.SaveMode
> val sourcePath = "s3://uditme-iad/hudi/tables/events/events_data_partitioned_non_null"
> val sourceDf = spark.read.parquet(sourcePath + "/*")
> var tableName = "events_data_partitioned_non_null_00"
> var tablePath = "s3://emr-users/uditme/hudi/tables/events/" + tableName
> sourceDf.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .mode(SaveMode.Overwrite)
> .save(tablePath)
> val readDf = spark.read.format("org.apache.hudi").load(tablePath + "/*")
> val updateDf = readDf.filter($"event_id" === "106")
> .withColumn("event_name", lit("udit_event_106"))
>
> updateDf.write.format("org.apache.hudi")
> .option(HoodieWriteConfig.TABLE_NAME, tableName)
> .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
> .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
> .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
> .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
> .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
> .mode(SaveMode.Append)
> .save(tablePath)
> {code}
>
> Full Stack trace:
> {noformat}
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
> at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:276)
> at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:875)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:359)
> at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1181)
> at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
> at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:123)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
> at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:134)
> at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:90)
> at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
> at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:269)
> ... 30 more
> Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NullPointerException
> at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
> at org.apache.hudi.table.action.commit.MergeHelper.runMerge(MergeHelper.java:132)
> ... 33 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
> ... 34 more
> Caused by: java.lang.NullPointerException
> at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:222)
> at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:159)
> at org.apache.hudi.table.action.commit.MergeHelper$UpdateHandler.consumeOneRecord(MergeHelper.java:149)
> at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
> at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> ... 3 more
> {noformat}
>
> Here as you can see *updateDf* is being formed by reading row from the *bootstrapped hudi table* itself. If however, we for the *updateDf* from the source data it works fine:
> val readDf = spark.read.parquet(sourcePath + "/*")
> val updateDf = readDf.filter($"event_id" === "106").withColumn("event_name", lit("udit_event_106"))
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)