You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/27 21:01:29 UTC
[GitHub] [hudi] gtwuser commented on issue #5701: [SUPPORT]hudi how to upsert a non null array data to a existing column with array of nulls,optional binary. java.lang.ClassCastException: optional binary element (UTF8) is not a group
gtwuser commented on issue #5701:
URL: https://github.com/apache/hudi/issues/5701#issuecomment-1140037494
@nsivabalan @n3nash @umehrot2 @ Kindly suggest what should be done in this use case, we are stuck with this issue for 1 month now.
Existing column schema in the hudi table created via `bulk-insert`.
Here the value for this array column was like:
```
.....
{ "id":1, "NWDepStatus": [] }
{ "id":2, "NWDepStatus": null }
....
```
This resulted in below schema for this column in hudi table during `bulk insert`
```bash
root
|-- NWDepStatus: array (nullable = true)
| |-- element: string (containsNull = true)
```
New incoming record schema for the same column is as below. This record is meant to be saved via `upsert`
with value as
```bash
{
"id": 1,
"NWDepCount": 0,
"NWDepStatus": [
{
"ClassId": "metric.DepStatus",
"Id": 21,
"Name": "MyNW_3",
"ObjectType": "metric.DepStatus",
"Status": "NA"
},
{
"ClassId": "metric.DepStatus",
"Id": 22,
"Name": "MyNW2",
"ObjectType": "metric.DepStatus",
"Status": "NA"
}
]
}
```
Resulting in schema as below which is different from the existing schema saved in hudi
```bash
root
|-- NWDepStatus: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ClassId: string (nullable = true)
| | |-- Id: long (nullable = true)
| | |-- Name: string (nullable = true)
| | |-- ObjectType: string (nullable = true)
| | |-- Status: string (nullable = true)
```
I even tried altering the existing column schema before writing the new records by making the schema similar to new records with non empty array and retaining nulls in it but with no success.
```bash
+------------------------+
|NWDepStatus|
+------------------------+
|null |
|null |
+------------------------+
```
Configs are as follows:
```bash
commonConfig = {
'className': 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc': 'false',
'hoodie.datasource.write.precombine.field': 'MdTimestamp',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.table.name': 'hudi-table',
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': args['database_name'],
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.datasource.hive_sync.table': 'hudi + prefix.replace("/", "_").lower(),
'hoodie.datasource.hive_sync.enable': 'true', 'path': 's3://' + args['curated_bucket'] + '/hudi' + prefix,
'hoodie.parquet.small.file.limit': '134217728' # 1,024 * 1,024 * 128 = 134,217,728 (128 MB)
}
unpartitionDataConfig = {
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'
}
initLoadConfig = {
'hoodie.bulkinsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'bulk_insert'
}
incrementalConfig = {
'hoodie.upsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 10
}
```
Checked this issue #2265 and the fix #2927. But even with the configs given as solution its not working and failing with the same error
>inputDf.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.option("spark.hadoop.parquet.avro.write-old-list-structure", "false") \
.option("parquet.avro.write-old-list-structure", "false") \
.option("hoodie.parquet.avro.write-old-list-structure", "false") \
.option("hoodie.datasource.write.reconcile.schema", "true") \
.options(**combinedConf) \
.mode('append') \
.save()
>>2022-05-27 18:22:12,568 WARN [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logWarning(69)): Lost task 0.0 in stage 363.0 (TID 8061) (172.36.166.181 executor 24): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:322)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$execute$ecf5068c$1(BaseSparkCommitActionExecutor.java:174)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:386)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1440)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:335)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:351)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:342)
at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:315)
... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieException: operation has failed
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
... 32 more
Caused by: org.apache.hudi.exception.HoodieException: operation has failed
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.throwExceptionIfFailed(BoundedInMemoryQueue.java:248)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.readNextRecord(BoundedInMemoryQueue.java:226)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.access$100(BoundedInMemoryQueue.java:52)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueue$QueueIterator.hasNext(BoundedInMemoryQueue.java:278)
at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:36)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
Caused by: org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:54)
at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
... 4 more
Caused by: java.lang.ClassCastException: optional binary element (UTF8) is not a group
at org.apache.parquet.schema.Type.asGroupType(Type.java:207)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
... 8 more
2022-05-27 18:22:12,571 INFO [dispatcher-CoarseGrainedScheduler] scheduler.TaskSetManager (Logging.scala:logInfo(57)): Starting task 0.1 in stage 363.0 (TID 8062) (172.36.140.28, executor 15, partition 0, PROCESS_LOCAL, 4444 bytes) taskResourceAssignments Map()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org