You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/27 19:16:39 UTC

[GitHub] [hudi] kapjoshi-cisco commented on issue #5701: [SUPPORT]hudi how to upsert a non null array data to a existing column with array of nulls,optional binary. java.lang.ClassCastException: optional binary element (UTF8) is not a group

kapjoshi-cisco commented on issue #5701:
URL: https://github.com/apache/hudi/issues/5701#issuecomment-1139958926

   @nsivabalan  @n3nash @umehrot2 @ Kindly suggest what should be done in this use case, we are stuck with this issue for 1 month now. 
   Existing column schema in the hudi table created via `bulk-insert`. 
   Here the value for this array column was  like:
   ```
   .....
   { "id":1, "NWDepStatus": [] }
   { "id":2, "NWDepStatus": null }
   ....
   ```
   
   This resulted in below schema for this column in hudi table during `bulk insert`
   
   ```bash
   root
    |-- NWDepStatus: array (nullable = true)
    |    |-- element: string (containsNull = true)
   ```
   
   New incoming record schema for the same column is as below. This record is meant to be saved via `upsert`
   
   with value as 
   ```bash
   {
       "id": 1,
       "NWDepCount": 0,
       "NWDepStatus": [
           {
               "ClassId": "metric.DepStatus",
               "Id": 21,
               "Name": "MyNW_3",
               "ObjectType": "metric.DepStatus",
               "Status": "NA"
           },
           {
               "ClassId": "metric.DepStatus",
               "Id": 22,
               "Name": "MyNW2",
               "ObjectType": "metric.DepStatus",
               "Status": "NA"
           }
       ]
   }
   ```
   
   Resulting in schema as below which is different from the existing schema saved in hudi
   ```bash
   root
    |-- NWDepStatus: array (nullable = true)
    |    |-- element: struct (containsNull = true)
    |    |    |-- ClassId: string (nullable = true)
    |    |    |-- Id: long (nullable = true)
    |    |    |-- Name: string (nullable = true)
    |    |    |-- ObjectType: string (nullable = true)
    |    |    |-- Status: string (nullable = true)
   ```
   I even tried altering the existing column schema before writing the new records by making the schema similar to new records with non empty array and retaining nulls in it but with no success. 
   ```bash
   +------------------------+
   |NWDepStatus|
   +------------------------+
   |null                   |
   |null                   |
   +------------------------+
   ```
   
   Configs are as follows:
   ```bash
             commonConfig = {
               'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false',
               'hoodie.datasource.write.precombine.field': 'MdTimestamp',
               'hoodie.datasource.write.recordkey.field': 'id',
               'hoodie.table.name': 'hudi-table', 
               'hoodie.consistency.check.enabled': 'true',
               'hoodie.datasource.hive_sync.database': args['database_name'],
               'hoodie.datasource.write.reconcile.schema': 'true',
               'hoodie.datasource.hive_sync.table': 'hudi + prefix.replace("/", "_").lower(),
               'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + args['curated_bucket'] + '/hudi' + prefix,
               'hoodie.parquet.small.file.limit': '134217728' # 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
           }
           unpartitionDataConfig = {
               'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 
               'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
           }
           initLoadConfig = {
                            'hoodie.bulkinsert.shuffle.parallelism': 68,
                             'hoodie.datasource.write.operation': 'bulk_insert'
           }
         incrementalConfig = {
               'hoodie.upsert.shuffle.parallelism': 68, 
               'hoodie.datasource.write.operation': 'upsert',
               'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 
               'hoodie.cleaner.commits.retained': 10
           }
   ```
   
   Checked this issue #2265 and the fix #2927. But even with the configs given as solution its not working and failing with the same error
   >inputDf.write \
           .format('org.apache.hudi') \
           .option('hoodie.datasource.write.operation', 'upsert') \
           .option("spark.hadoop.parquet.avro.write-old-list-structure", "false") \
           .option("parquet.avro.write-old-list-structure", "false") \
           .option("hoodie.parquet.avro.write-old-list-structure", "false") \
           .option("hoodie.datasource.write.reconcile.schema", "true") \
           .options(**combinedConf) \
           .mode('append') \
           .save()
   >>2022-05-27 18:22:12,568 WARN [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 0.0 in stage 363.0 (TID 8061) (172.36.166.181 executor 24): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
   	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
   	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
   	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
   	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
   	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
   	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
   	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
   	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
   	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
   	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:750)
   Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
   	at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:342)
   	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:315)
   	... 28 more
   Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
   	at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
   	... 31 more
   Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
   	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
   	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
   	... 32 more
   Caused by: org.apache.hudi.exception.HoodieException: operation has failed
   	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	... 3 more
   Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
   	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
   	at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
   	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   	... 4 more
   Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
   	at org.apache.parquet.schema.Type.asGroupType(Type.java:207)
   	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
   	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
   	at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
   	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
   	at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
   	at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
   	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
   	at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
   	at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
   	at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
   	at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
   	at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
   	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
   	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
   	... 8 more
         2022-05-27 18:22:12,571 INFO [dispatcher-CoarseGrainedScheduler] scheduler.TaskSetManager (Logging.scala:logInfo(57)): Starting task 0.1 in stage 363.0 (TID 8062) (172.36.140.28, executor 15, partition 0, PROCESS_LOCAL, 4444 bytes) taskResourceAssignments Map()
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org