You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Alexey Kudinkin (Jira)" <ji...@apache.org> on 2022/10/04 00:46:00 UTC

[jira] [Updated] (HUDI-4588) Ingestion failing if source column is dropped

     [ https://issues.apache.org/jira/browse/HUDI-4588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Alexey Kudinkin updated HUDI-4588:
----------------------------------
    Sprint: 2022/08/08, 2022/08/22, 2022/09/05, 2022/10/04  (was: 2022/08/08, 2022/08/22, 2022/09/05)

> Ingestion failing if source column is dropped
> ---------------------------------------------
>
>                 Key: HUDI-4588
>                 URL: https://issues.apache.org/jira/browse/HUDI-4588
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: deltastreamer
>            Reporter: Vamshi Gudavarthi
>            Assignee: Alexey Kudinkin
>            Priority: Blocker
>              Labels: pull-request-available, schema, schema-evolution
>             Fix For: 0.12.1
>
>         Attachments: schema_stage1.avsc, schema_stage2.avsc, stage_1.json, stage_2.json
>
>
> Ingestion using Deltastreamer fails if columns are dropped from source. I had reproduced using docker-demo setup. Below are the steps for reproducing it.
>  # I had created data file `stage_1.json`(attached), ingested it to kafka and ingested to hudi-table from kafka using Deltastreamer job(using FileschemaProvider with `schema_stage1.avsc`)
>  # Simulating column dropping from source in the next step.
>  #  Repeat steps in step1 with stage2 files. Stage2 files doesn't have `day` column, Ingestion job failed. Below is detailed stacktrace.
> {code:java}
> Driver stacktrace:
>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
>     at scala.Option.foreach(Option.scala:257)
>     at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
>     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
>     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>     at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
>     at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
>     at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>     at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
>     at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply$mcD$sp(DoubleRDDFunctions.scala:35)
>     at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
>     at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sum$1.apply(DoubleRDDFunctions.scala:35)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>     at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>     at org.apache.spark.rdd.DoubleRDDFunctions.sum(DoubleRDDFunctions.scala:34)
>     at org.apache.spark.api.java.JavaDoubleRDD.sum(JavaDoubleRDD.scala:165)
>     at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:607)
>     at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:335)
>     at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:201)
>     at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
>     at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:199)
>     at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:557)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>     at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
>     at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
>     at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
>     at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>     at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
>     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
>     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
>     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
>     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:244)
>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>     at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:853)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>     at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>     at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>     at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>     at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>     at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>     at org.apache.spark.scheduler.Task.run(Task.scala:123)
>     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
>     at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149)
>     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:358)
>     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:349)
>     at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
>     ... 30 more
> Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
>     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:161)
>     at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:147)
>     ... 33 more
> Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
>     at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:155)
>     ... 34 more
> Caused by: org.apache.hudi.exception.HoodieException: operation has failed
>     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:135)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     ... 3 more
> Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
>     at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53)
>     at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>     at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     ... 4 more
> Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'day' not found
>     at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:225)
>     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:130)
>     at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
>     at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
>     at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
>     at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
>     at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
>     at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
>     at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48)
>     ... 8 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)