You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/25 13:22:33 UTC

[GitHub] [hudi] MikeTipico opened a new issue, #5683: [SUPPORT] org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file

MikeTipico opened a new issue, #5683:
URL: https://github.com/apache/hudi/issues/5683

   **Problem Faced**
   
   We currently have a series of pipelines doing the following: 
   A. NiFi
   	1. consume avro data from Kafka 
   	2. attach schema information (downloaded from registry) 
   	3. save to S3 'raw'
   B. Spark Application (structured streaming)
   	4. consume from S3 'raw'
   	5. do some basic cleaning and schema flattening 
   	6. persist to Hudi S3 'data'
   
   For one pipeline we encountered a schema change from source with some of the data replayed in the new schema format. This has unfortunately caused our Hudi pipeline to break and give the exception 'org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file' (full stack-trace attached). What we found strange in this case is that we prepared for the change and were no longer selecting the now dropped integer field to be inserted into Hudi. 
   Kindly advice us if something was missed and what can be done in such cases to be able to process the data with the new schema irrespective of the way it was originally sent and processed. 
   	
   Schema change:	[string,integer] -> string
   stack-trace: 
   ```
   Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key xyz_topic from old 
   				file s3a://data_raw/xyz_topic/calendardate=2022-05-19/6a7a5f41-b43f-47ac-8109-6215c6119ff5-0_0-8956-834609_20220519080508668.parquet 
   to new 	file s3a://data_raw/xyz_topic/calendardate=2022-05-19/6a7a5f41-b43f-47ac-8109-6215c6119ff5-0_0-27-385_20220519145322564.parquet with writerSchema {
     "type" : "record",
     "name" : "em_dm_v1_topic_record",
     "namespace" : "hoodie.em_dm_v1_topic",
     "fields" : [ {
       "name" : "_hoodie_commit_time",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_commit_seqno",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_record_key",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_partition_path",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_file_name",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "key",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "calendardate",
       "type" : [ "null", {
         "type" : "int",
         "logicalType" : "date"
       } ],
       "default" : null
     }, {
       "name" : "eventtime",
       "type" : [ "null", "long" ],
       "default" : null
     }, {
       "name" : "userid_string",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "userid_integer",
       "type" : "int"
     }]
   }
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
   	at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
   	... 31 more
   }
   	at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:356)
   	at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122)
   	at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	... 3 more
   Caused by: java.lang.RuntimeException: Null-value for required field: userid_integer
   	at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:194)
   	at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
   	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
   	at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
   	at org.apache.hudi.io.storage.HoodieParquetWriter.writeAvro(HoodieParquetWriter.java:95)
   	at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
   	... 8 more
   
   Driver stacktrace:
   === Streaming Query ===
   Identifier: [dpl-spark-streaming] query for hudi 'dm_v1_GmUserInfo_GmCoreSelfExclusionMessage' [id = 3fb941dc-f450-4306-8eab-6af98062c0b6, runId = afc85301-938a-4141-9e31-a84bc24094e0]
   Current Committed Offsets: {FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*]: {"logOffset":170}}
   Current Available Offsets: {FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*]: {"logOffset":171}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   Project [key#74, calendardate#75, eventtime#76L, userid_string#77, userid_integer#78, domainid#27, exclusiontype#28, expirydate#29, exclusionperiod#30, cooloffreason#31, cooloffdescription#32, unsatisfiedreason#33, unsatisfieddescription#34, selfexclusionreason#35, messagekey_operationtype#36, messagekey_messageversion#37, messagekey_messagetype#38, messagekey_domainid#39, messagekey_sourcename#40, messagekey_sourcecategory#41, messagekey_processingdate#42, messagekey_streamingdate#43, selfexclusionid#44]
   +- SubqueryAlias t
      +- Project [userid#26 AS key#74, cast(messagekey_streamingdate#43 as date) AS calendardate#75, unix_micros(cast(messagekey_processingdate#42 as timestamp)) AS eventtime#76L, userid#26 AS userid_string#77, -1 AS userid_integer#78, domainid#27, exclusiontype#28, expirydate#29, exclusionperiod#30, cooloffreason#31, cooloffdescription#32, unsatisfiedreason#33, unsatisfieddescription#34, selfexclusionreason#35, messagekey_operationtype#36, messagekey_messageversion#37, messagekey_messagetype#38, messagekey_domainid#39, messagekey_sourcename#40, messagekey_sourcecategory#41, messagekey_processingdate#42, messagekey_streamingdate#43, selfexclusionid#44]
         +- SubqueryAlias view_dm_v1_gmuserinfo_gmcoreselfexclusionmessage
            +- Project [userid#0 AS userid#26, domainid#1 AS domainid#27, exclusiontype#2 AS exclusiontype#28, expirydate#3 AS expirydate#29, exclusionperiod#4 AS exclusionperiod#30, cooloffreason#5 AS cooloffreason#31, cooloffdescription#6 AS cooloffdescription#32, unsatisfiedreason#7 AS unsatisfiedreason#33, unsatisfieddescription#8 AS unsatisfieddescription#34, selfexclusionreason#9 AS selfexclusionreason#35, messagekey#10.operationtype AS messagekey_operationtype#36, messagekey#10.messageversion AS messagekey_messageversion#37, messagekey#10.messagetype AS messagekey_messagetype#38, messagekey#10.domainid AS messagekey_domainid#39, messagekey#10.sourcename AS messagekey_sourcename#40, messagekey#10.sourcecategory AS messagekey_sourcecategory#41, messagekey#10.processingdate AS messagekey_processingdate#42, messagekey#10.streamingdate AS messagekey_streamingdate#43, selfexclusionid#11 AS selfexclusionid#44, username#12 AS username#45]
               +- StreamingExecutionRelation FileStreamSource[s3a://data_raw/xyz_topic/calendardate=2*], [UserID#0, DomainID#1, ExclusionType#2, ExpiryDate#3, ExclusionPeriod#4, CoolOffReason#5, CoolOffDescription#6, UnsatisfiedReason#7, UnsatisfiedDescription#8, SelfExclusionReason#9, MessageKey#10, SelfExclusionID#11, Username#12]
   	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354)
   	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
   Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 11 times, most recent failure: Lost task 0.10 in stage 27.0 (TID 385) (ip-10-102-9-167.eu-central-1.compute.internal executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:320)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:172)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on issue #5683: [SUPPORT] org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file

Posted by GitBox <gi...@apache.org>.
xushiyan commented on issue #5683:
URL: https://github.com/apache/hudi/issues/5683#issuecomment-1296312285

   > What we found strange in this case is that we prepared for the change and were no longer selecting the now dropped integer field to be inserted into Hudi.
   
   is this in step 5?
   
   > After this batch we successfully loaded newer data using the new schema and default values.
   
   @MikeTipico not sure what "After this batch" meant: did you skip the batch or just proceed ingesting newer data and ignore the exception? would like to have reproducible steps (hudi / spark versions, old/new schemas, configs & snippet of the structured streaming job)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5683: [SUPPORT] org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5683:
URL: https://github.com/apache/hudi/issues/5683#issuecomment-1229359229

   @MikeTipico : can you respond to my above comment when you can. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] MikeTipico commented on issue #5683: [SUPPORT] org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file

Posted by GitBox <gi...@apache.org>.
MikeTipico commented on issue #5683:
URL: https://github.com/apache/hudi/issues/5683#issuecomment-1138235130

   @xushiyan we also tried using a default value of either null or -1 but in both cases still got exceptions on the merging for a particular batch of data. After this batch we successfully loaded newer data using the new schema and default values.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5683: [SUPPORT] org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5683:
URL: https://github.com/apache/hudi/issues/5683#issuecomment-1289937013

   @MikeTipico : gentle ping. if you can give us a reproducible script, would be great. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] xushiyan commented on issue #5683: [SUPPORT] org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file

Posted by GitBox <gi...@apache.org>.
xushiyan commented on issue #5683:
URL: https://github.com/apache/hudi/issues/5683#issuecomment-1138188799

   @MikeTipico dropping column is not a backward-compatible change. Before 0.11, only adding column is supported. you can read more in https://hudi.apache.org/docs/schema_evolution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on issue #5683: [SUPPORT] org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on issue #5683:
URL: https://github.com/apache/hudi/issues/5683#issuecomment-1149282029

   may I know how did you set the default value. Did you verify that default was reflected. I have seen from some users in the community that they did not set the default properly and hence it did not kick in. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org