You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by aakash aakash <em...@gmail.com> on 2022/07/15 05:51:34 UTC

need help with Hudi Delete

Hi,

We have a use case to perform soft delete over some record keys where we
nullify non-key fields and ignore any update for this record later on.  We
thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi hard
delete (_hoodie_is_deleted) does to make it simple to identify if the
platform perform any soft delete but I am getting avro field not found
exception when we perform another soft delete on the same index, please let
me know if you have any advise how to fix it or if this is a wrong
approach, we wanted to avoid adding any extra field in the customer schema
and behind the scene filter the soft delete record as done for hard delete
but still keep the record in the system.


Hudi : 0.8.0
Exception stacktrace:

2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID
33283, 172.25.31.77, executor 3):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :5
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
  at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
  at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
  at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
  at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
  at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
  at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
  at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
  at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
  at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
  at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieException:
org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
  at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
  at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
  ... 30 more
Caused by: org.apache.hudi.exception.HoodieException:
java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
  at
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
  ... 33 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.hudi.exception.HoodieException: operation has failed
  at java.util.concurrent.FutureTask.report(FutureTask.java:122)
  at java.util.concurrent.FutureTask.get(FutureTask.java:192)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
  ... 34 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  ... 3 more
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro
schema mismatch: Avro field '_hoodie_is_soft_deleted' not found
  at
org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
  at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
  at
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
  at
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
  at
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
  at
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
  at
org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
  at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
  at
org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
  at
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
  at
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  ... 4 more







How we add this column to the Spark dataframe :

object SoftDeleteColInfo {
  val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
  val softDeleteStrVal = "true"

  val softDeletedUDF = udf(softDeleted)

  def softDeleted() = (arg: String) => arg
}

sparkSession.udf.register("softDeletedUDF", SoftDeleteColInfo.softDeletedUDF)
df.withColumn(softDeleteHudiMetaCol,
functions.callUDF("softDeletedUDF", lit("true")))

Re: need help with Hudi Delete

Posted by Sivabalan <n....@gmail.com>.
yes, from the pasted schema, there is no default set for the newly added
column.

{
    "name" : "*_hoodie_is_soft_deleted*",
    "type" : [ "string", "null" ]
  } ]

If you can fix that and give it a try, it should work.


On Sat, 16 Jul 2022 at 03:13, Pratyaksh Sharma <pr...@gmail.com>
wrote:

> Hi Aakash,
>
> For the field to behave as a nullable extra field, you need to add default
> value as null to the schema and make "null" as the first type in your union
> schema for `_hoodie_is_soft_deleted`.Hope that helps.
>
> On Fri, Jul 15, 2022 at 8:01 PM aakash aakash <em...@gmail.com>
> wrote:
>
>> Thanks for the response Pratyaksh!
>>
>> We add this column to the Spark dataframe before calling the hudi upsert
>> and delete. And this should work like an extra nullable column in the
>> schema but it's not behaving like that, so wondering if we remove any
>> column with the prefix *'_hoodie' * in Hudi code.  We wanted to this to be
>> part of the platform so every team does not have to add an extra field in
>> their prod schema since it is not supposed to be visible to everyone.
>>
>>
>> Here is an excerpt of the code :
>>
>> object SoftDeleteColInfo {
>>   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
>>   val softDeleteStrVal = "true"
>>
>>   val softDeletedUDF = udf(softDeleted)
>>
>>   def softDeleted() = (arg: String) => arg
>> }
>>
>> sparkSession.udf.register("softDeletedUDF",
>> SoftDeleteColInfo.softDeletedUDF)
>>
>> *df.withColumn(softDeleteHudiMetaCol, functions.callUDF("softDeletedUDF",
>> lit("true")))*
>> and the excerpt of the schema of dataframe before calling hudi operation :
>> }, {
>>     "name" : "end_time_utc",
>>     "type" : [ {
>>       "type" : "long",
>>       "logicalType" : "timestamp-micros"
>>     }, "null" ]
>>   }, {
>>     "name" : "date_created_utc",
>>     "type" : [ {
>>       "type" : "long",
>>       "logicalType" : "timestamp-micros"
>>     }, "null" ]
>>   }, {
>>     "name" : "date_updated_utc",
>>     "type" : [ {
>>       "type" : "long",
>>       "logicalType" : "timestamp-micros"
>>     }, "null" ]
>>   }, {
>>     "name" : "*_hoodie_is_soft_deleted*",
>>     "type" : [ "string", "null" ]
>>   } ]
>> }
>>
>> On Fri, Jul 15, 2022 at 12:03 AM Pratyaksh Sharma <pr...@gmail.com>
>> wrote:
>>
>> > Hi,
>> >
>> > Hudi is complaining because '_hoodie_is_soft_deleted' is present in the
>> > parquet file's schema but is not present in your incoming schema.
>> >
>> > From my experience, I would say it is a standard practice to add an
>> extra
>> > field which acts as a marker for soft deletion and needs to be persisted
>> > with every record. So I would suggest adding an extra field in the
>> schema
>> > and solve your use case.
>> >
>> > @Sivabalan <n....@gmail.com> can probably add more here.
>> >
>> > On Fri, Jul 15, 2022 at 11:21 AM aakash aakash <em...@gmail.com>
>> > wrote:
>> >
>> > > Hi,
>> > >
>> > > We have a use case to perform soft delete over some record keys where
>> we
>> > > nullify non-key fields and ignore any update for this record later on.
>> > We
>> > > thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi
>> > hard
>> > > delete (_hoodie_is_deleted) does to make it simple to identify if the
>> > > platform perform any soft delete but I am getting avro field not found
>> > > exception when we perform another soft delete on the same index,
>> please
>> > let
>> > > me know if you have any advise how to fix it or if this is a wrong
>> > > approach, we wanted to avoid adding any extra field in the customer
>> > schema
>> > > and behind the scene filter the soft delete record as done for hard
>> > delete
>> > > but still keep the record in the system.
>> > >
>> > >
>> > > Hudi : 0.8.0
>> > > Exception stacktrace:
>> > >
>> > > 2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID
>> > > 33283, 172.25.31.77, executor 3):
>> > > org.apache.hudi.exception.HoodieUpsertException: Error upserting
>> > bucketType
>> > > UPDATE for partition :5
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>> > >   at
>> > >
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>> > >   at
>> > >
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>> > >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>> > >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>> > >   at
>> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>> > >   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>> > >   at
>> > >
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>> > >   at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>> > >   at org.apache.spark.scheduler.Task.run(Task.scala:123)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>> > >   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>> > >   at
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>> > >   at
>> > >
>> > >
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > >   at
>> > >
>> > >
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > >   at java.lang.Thread.run(Thread.java:748)
>> > > Caused by: org.apache.hudi.exception.HoodieException:
>> > > org.apache.hudi.exception.HoodieException:
>> > > java.util.concurrent.ExecutionException:
>> > > org.apache.hudi.exception.HoodieException: operation has failed
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
>> > >   ... 30 more
>> > > Caused by: org.apache.hudi.exception.HoodieException:
>> > > java.util.concurrent.ExecutionException:
>> > > org.apache.hudi.exception.HoodieException: operation has failed
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
>> > >   ... 33 more
>> > > Caused by: java.util.concurrent.ExecutionException:
>> > > org.apache.hudi.exception.HoodieException: operation has failed
>> > >   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> > >   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
>> > >   ... 34 more
>> > > Caused by: org.apache.hudi.exception.HoodieException: operation has
>> > failed
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
>> > >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > >   ... 3 more
>> > > Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro
>> > > schema mismatch: Avro field '_hoodie_is_soft_deleted' not found
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
>> > >   at
>> > >
>> >
>> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
>> > >   at
>> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>> > >   at
>> > >
>> > >
>> >
>> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
>> > >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> > >   at
>> > >
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > >   ... 4 more
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > How we add this column to the Spark dataframe :
>> > >
>> > > object SoftDeleteColInfo {
>> > >   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
>> > >   val softDeleteStrVal = "true"
>> > >
>> > >   val softDeletedUDF = udf(softDeleted)
>> > >
>> > >   def softDeleted() = (arg: String) => arg
>> > > }
>> > >
>> > > sparkSession.udf.register("softDeletedUDF",
>> > > SoftDeleteColInfo.softDeletedUDF)
>> > > df.withColumn(softDeleteHudiMetaCol,
>> > > functions.callUDF("softDeletedUDF", lit("true")))
>> > >
>> >
>>
>

-- 
Regards,
-Sivabalan

Re: need help with Hudi Delete

Posted by Pratyaksh Sharma <pr...@gmail.com>.
Hi Aakash,

For the field to behave as a nullable extra field, you need to add default
value as null to the schema and make "null" as the first type in your union
schema for `_hoodie_is_soft_deleted`.Hope that helps.

On Fri, Jul 15, 2022 at 8:01 PM aakash aakash <em...@gmail.com>
wrote:

> Thanks for the response Pratyaksh!
>
> We add this column to the Spark dataframe before calling the hudi upsert
> and delete. And this should work like an extra nullable column in the
> schema but it's not behaving like that, so wondering if we remove any
> column with the prefix *'_hoodie' * in Hudi code.  We wanted to this to be
> part of the platform so every team does not have to add an extra field in
> their prod schema since it is not supposed to be visible to everyone.
>
>
> Here is an excerpt of the code :
>
> object SoftDeleteColInfo {
>   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
>   val softDeleteStrVal = "true"
>
>   val softDeletedUDF = udf(softDeleted)
>
>   def softDeleted() = (arg: String) => arg
> }
>
> sparkSession.udf.register("softDeletedUDF",
> SoftDeleteColInfo.softDeletedUDF)
>
> *df.withColumn(softDeleteHudiMetaCol, functions.callUDF("softDeletedUDF",
> lit("true")))*
> and the excerpt of the schema of dataframe before calling hudi operation :
> }, {
>     "name" : "end_time_utc",
>     "type" : [ {
>       "type" : "long",
>       "logicalType" : "timestamp-micros"
>     }, "null" ]
>   }, {
>     "name" : "date_created_utc",
>     "type" : [ {
>       "type" : "long",
>       "logicalType" : "timestamp-micros"
>     }, "null" ]
>   }, {
>     "name" : "date_updated_utc",
>     "type" : [ {
>       "type" : "long",
>       "logicalType" : "timestamp-micros"
>     }, "null" ]
>   }, {
>     "name" : "*_hoodie_is_soft_deleted*",
>     "type" : [ "string", "null" ]
>   } ]
> }
>
> On Fri, Jul 15, 2022 at 12:03 AM Pratyaksh Sharma <pr...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Hudi is complaining because '_hoodie_is_soft_deleted' is present in the
> > parquet file's schema but is not present in your incoming schema.
> >
> > From my experience, I would say it is a standard practice to add an extra
> > field which acts as a marker for soft deletion and needs to be persisted
> > with every record. So I would suggest adding an extra field in the schema
> > and solve your use case.
> >
> > @Sivabalan <n....@gmail.com> can probably add more here.
> >
> > On Fri, Jul 15, 2022 at 11:21 AM aakash aakash <em...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > > We have a use case to perform soft delete over some record keys where
> we
> > > nullify non-key fields and ignore any update for this record later on.
> > We
> > > thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi
> > hard
> > > delete (_hoodie_is_deleted) does to make it simple to identify if the
> > > platform perform any soft delete but I am getting avro field not found
> > > exception when we perform another soft delete on the same index, please
> > let
> > > me know if you have any advise how to fix it or if this is a wrong
> > > approach, we wanted to avoid adding any extra field in the customer
> > schema
> > > and behind the scene filter the soft delete record as done for hard
> > delete
> > > but still keep the record in the system.
> > >
> > >
> > > Hudi : 0.8.0
> > > Exception stacktrace:
> > >
> > > 2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID
> > > 33283, 172.25.31.77, executor 3):
> > > org.apache.hudi.exception.HoodieUpsertException: Error upserting
> > bucketType
> > > UPDATE for partition :5
> > >   at
> > >
> > >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
> > >   at
> > >
> > >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
> > >   at
> > >
> > >
> >
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> > >   at
> > >
> > >
> >
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> > >   at
> > >
> > >
> >
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
> > >   at
> > >
> > >
> >
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
> > >   at
> > >
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> > >   at
> > >
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> > >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
> > >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
> > >   at
> > >
> > >
> >
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> > >   at
> > >
> > >
> >
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> > >   at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> > >   at
> > >
> > >
> >
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> > >   at
> > >
> > >
> >
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> > >   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
> > >   at
> > >
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> > >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> > >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> > >   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> > >   at org.apache.spark.scheduler.Task.run(Task.scala:123)
> > >   at
> > >
> > >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> > >   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> > >   at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> > >   at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > >   at
> > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > >   at java.lang.Thread.run(Thread.java:748)
> > > Caused by: org.apache.hudi.exception.HoodieException:
> > > org.apache.hudi.exception.HoodieException:
> > > java.util.concurrent.ExecutionException:
> > > org.apache.hudi.exception.HoodieException: operation has failed
> > >   at
> > >
> > >
> >
> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
> > >   at
> > >
> > >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
> > >   at
> > >
> > >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
> > >   at
> > >
> > >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
> > >   ... 30 more
> > > Caused by: org.apache.hudi.exception.HoodieException:
> > > java.util.concurrent.ExecutionException:
> > > org.apache.hudi.exception.HoodieException: operation has failed
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
> > >   at
> > >
> > >
> >
> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
> > >   ... 33 more
> > > Caused by: java.util.concurrent.ExecutionException:
> > > org.apache.hudi.exception.HoodieException: operation has failed
> > >   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > >   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
> > >   ... 34 more
> > > Caused by: org.apache.hudi.exception.HoodieException: operation has
> > failed
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
> > >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > >   ... 3 more
> > > Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro
> > > schema mismatch: Avro field '_hoodie_is_soft_deleted' not found
> > >   at
> > >
> > >
> >
> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
> > >   at
> > >
> > >
> >
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
> > >   at
> > >
> > >
> >
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
> > >   at
> > >
> > >
> >
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
> > >   at
> > >
> > >
> >
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
> > >   at
> > >
> > >
> >
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
> > >   at
> > >
> >
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
> > >   at
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
> > >   at
> > >
> > >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
> > >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > >   at
> > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > >   ... 4 more
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > How we add this column to the Spark dataframe :
> > >
> > > object SoftDeleteColInfo {
> > >   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
> > >   val softDeleteStrVal = "true"
> > >
> > >   val softDeletedUDF = udf(softDeleted)
> > >
> > >   def softDeleted() = (arg: String) => arg
> > > }
> > >
> > > sparkSession.udf.register("softDeletedUDF",
> > > SoftDeleteColInfo.softDeletedUDF)
> > > df.withColumn(softDeleteHudiMetaCol,
> > > functions.callUDF("softDeletedUDF", lit("true")))
> > >
> >
>

Re: need help with Hudi Delete

Posted by aakash aakash <em...@gmail.com>.
Thanks for the response Pratyaksh!

We add this column to the Spark dataframe before calling the hudi upsert
and delete. And this should work like an extra nullable column in the
schema but it's not behaving like that, so wondering if we remove any
column with the prefix *'_hoodie' * in Hudi code.  We wanted to this to be
part of the platform so every team does not have to add an extra field in
their prod schema since it is not supposed to be visible to everyone.


Here is an excerpt of the code :

object SoftDeleteColInfo {
  val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
  val softDeleteStrVal = "true"

  val softDeletedUDF = udf(softDeleted)

  def softDeleted() = (arg: String) => arg
}

sparkSession.udf.register("softDeletedUDF",
SoftDeleteColInfo.softDeletedUDF)

*df.withColumn(softDeleteHudiMetaCol, functions.callUDF("softDeletedUDF",
lit("true")))*
and the excerpt of the schema of dataframe before calling hudi operation :
}, {
    "name" : "end_time_utc",
    "type" : [ {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    }, "null" ]
  }, {
    "name" : "date_created_utc",
    "type" : [ {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    }, "null" ]
  }, {
    "name" : "date_updated_utc",
    "type" : [ {
      "type" : "long",
      "logicalType" : "timestamp-micros"
    }, "null" ]
  }, {
    "name" : "*_hoodie_is_soft_deleted*",
    "type" : [ "string", "null" ]
  } ]
}

On Fri, Jul 15, 2022 at 12:03 AM Pratyaksh Sharma <pr...@gmail.com>
wrote:

> Hi,
>
> Hudi is complaining because '_hoodie_is_soft_deleted' is present in the
> parquet file's schema but is not present in your incoming schema.
>
> From my experience, I would say it is a standard practice to add an extra
> field which acts as a marker for soft deletion and needs to be persisted
> with every record. So I would suggest adding an extra field in the schema
> and solve your use case.
>
> @Sivabalan <n....@gmail.com> can probably add more here.
>
> On Fri, Jul 15, 2022 at 11:21 AM aakash aakash <em...@gmail.com>
> wrote:
>
> > Hi,
> >
> > We have a use case to perform soft delete over some record keys where we
> > nullify non-key fields and ignore any update for this record later on.
> We
> > thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi
> hard
> > delete (_hoodie_is_deleted) does to make it simple to identify if the
> > platform perform any soft delete but I am getting avro field not found
> > exception when we perform another soft delete on the same index, please
> let
> > me know if you have any advise how to fix it or if this is a wrong
> > approach, we wanted to avoid adding any extra field in the customer
> schema
> > and behind the scene filter the soft delete record as done for hard
> delete
> > but still keep the record in the system.
> >
> >
> > Hudi : 0.8.0
> > Exception stacktrace:
> >
> > 2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID
> > 33283, 172.25.31.77, executor 3):
> > org.apache.hudi.exception.HoodieUpsertException: Error upserting
> bucketType
> > UPDATE for partition :5
> >   at
> >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
> >   at
> >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
> >   at
> >
> >
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> >   at
> >
> >
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
> >   at
> >
> >
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
> >   at
> >
> >
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
> >   at
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> >   at
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
> >   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
> >   at
> >
> >
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
> >   at
> >
> >
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
> >   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
> >   at
> >
> >
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
> >   at
> >
> >
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
> >   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
> >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
> >   at
> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> >   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> >   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> >   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> >   at org.apache.spark.scheduler.Task.run(Task.scala:123)
> >   at
> >
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
> >   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
> >   at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
> >   at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >   at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >   at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.hudi.exception.HoodieException:
> > org.apache.hudi.exception.HoodieException:
> > java.util.concurrent.ExecutionException:
> > org.apache.hudi.exception.HoodieException: operation has failed
> >   at
> >
> >
> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
> >   at
> >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
> >   at
> >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
> >   at
> >
> >
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
> >   ... 30 more
> > Caused by: org.apache.hudi.exception.HoodieException:
> > java.util.concurrent.ExecutionException:
> > org.apache.hudi.exception.HoodieException: operation has failed
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
> >   at
> >
> >
> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
> >   ... 33 more
> > Caused by: java.util.concurrent.ExecutionException:
> > org.apache.hudi.exception.HoodieException: operation has failed
> >   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
> >   ... 34 more
> > Caused by: org.apache.hudi.exception.HoodieException: operation has
> failed
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
> >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >   ... 3 more
> > Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro
> > schema mismatch: Avro field '_hoodie_is_soft_deleted' not found
> >   at
> >
> >
> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
> >   at
> >
> >
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
> >   at
> >
> >
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
> >   at
> >
> >
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
> >   at
> >
> >
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
> >   at
> >
> >
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
> >   at
> >
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
> >   at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
> >   at
> >
> >
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
> >   at
> >
> >
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
> >   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >   at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >   ... 4 more
> >
> >
> >
> >
> >
> >
> >
> > How we add this column to the Spark dataframe :
> >
> > object SoftDeleteColInfo {
> >   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
> >   val softDeleteStrVal = "true"
> >
> >   val softDeletedUDF = udf(softDeleted)
> >
> >   def softDeleted() = (arg: String) => arg
> > }
> >
> > sparkSession.udf.register("softDeletedUDF",
> > SoftDeleteColInfo.softDeletedUDF)
> > df.withColumn(softDeleteHudiMetaCol,
> > functions.callUDF("softDeletedUDF", lit("true")))
> >
>

Re: need help with Hudi Delete

Posted by Pratyaksh Sharma <pr...@gmail.com>.
Hi,

Hudi is complaining because '_hoodie_is_soft_deleted' is present in the
parquet file's schema but is not present in your incoming schema.

From my experience, I would say it is a standard practice to add an extra
field which acts as a marker for soft deletion and needs to be persisted
with every record. So I would suggest adding an extra field in the schema
and solve your use case.

@Sivabalan <n....@gmail.com> can probably add more here.

On Fri, Jul 15, 2022 at 11:21 AM aakash aakash <em...@gmail.com>
wrote:

> Hi,
>
> We have a use case to perform soft delete over some record keys where we
> nullify non-key fields and ignore any update for this record later on.  We
> thought of using a hudi meta field: "_hoodie_is_soft_deleted" as hudi hard
> delete (_hoodie_is_deleted) does to make it simple to identify if the
> platform perform any soft delete but I am getting avro field not found
> exception when we perform another soft delete on the same index, please let
> me know if you have any advise how to fix it or if this is a wrong
> approach, we wanted to avoid adding any extra field in the customer schema
> and behind the scene filter the soft delete record as done for hard delete
> but still keep the record in the system.
>
>
> Hudi : 0.8.0
> Exception stacktrace:
>
> 2/07/14 22:08:21 WARN TaskSetManager: Lost task 5.0 in stage 93.0 (TID
> 33283, 172.25.31.77, executor 3):
> org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
> UPDATE for partition :5
>   at
>
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:288)
>   at
>
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:139)
>   at
>
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>   at
>
> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>   at
>
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>   at
>
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>   at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>   at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>   at
>
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>   at
>
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>   at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>   at
>
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>   at
>
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>   at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>   at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>   at org.apache.spark.scheduler.Task.run(Task.scala:123)
>   at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>   at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.hudi.exception.HoodieException:
> org.apache.hudi.exception.HoodieException:
> java.util.concurrent.ExecutionException:
> org.apache.hudi.exception.HoodieException: operation has failed
>   at
>
> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
>   at
>
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:317)
>   at
>
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:308)
>   at
>
> org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:281)
>   ... 30 more
> Caused by: org.apache.hudi.exception.HoodieException:
> java.util.concurrent.ExecutionException:
> org.apache.hudi.exception.HoodieException: operation has failed
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
>   at
>
> org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
>   ... 33 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.hudi.exception.HoodieException: operation has failed
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
>   ... 34 more
> Caused by: org.apache.hudi.exception.HoodieException: operation has failed
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:247)
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:277)
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   ... 3 more
> Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro
> schema mismatch: Avro field '_hoodie_is_soft_deleted' not found
>   at
>
> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
>   at
>
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
>   at
>
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
>   at
>
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
>   at
>
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
>   at
>
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
>   at
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
>   at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
>   at
>
> org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>   at
>
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>   at
>
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   ... 4 more
>
>
>
>
>
>
>
> How we add this column to the Spark dataframe :
>
> object SoftDeleteColInfo {
>   val softDeleteHudiMetaCol = "_hoodie_is_soft_deleted"
>   val softDeleteStrVal = "true"
>
>   val softDeletedUDF = udf(softDeleted)
>
>   def softDeleted() = (arg: String) => arg
> }
>
> sparkSession.udf.register("softDeletedUDF",
> SoftDeleteColInfo.softDeletedUDF)
> df.withColumn(softDeleteHudiMetaCol,
> functions.callUDF("softDeletedUDF", lit("true")))
>