You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Ethan Guo (Jira)" <ji...@apache.org> on 2021/11/30 20:11:00 UTC

[jira] [Updated] (HUDI-2896) Exception is thrown from HoodieAvroUtils.getNestedFieldVal() with returnNullIfNotFound as true

     [ https://issues.apache.org/jira/browse/HUDI-2896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ethan Guo updated HUDI-2896:
----------------------------
    Description: 
Based on Alexey's testing, when 
{code:java}
getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true){code}
 gets called, the following exception is thrown.  However, the method is designed to return null is the field is not found in the schema.

 
{code:java}
21/11/30 10:22:42 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=R2I675JE64OFU1 partitionPath=default}, currentLocation='null', newLocation='null'}
org.apache.avro.AvroRuntimeException: Not a valid schema field: ts
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
        at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:462)
        at org.apache.hudi.common.model.DefaultHoodieRecordPayload.updateEventTime(DefaultHoodieRecordPayload.java:90)
        at org.apache.hudi.common.model.DefaultHoodieRecordPayload.getInsertValue(DefaultHoodieRecordPayload.java:84)
        at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
        at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
        at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
        at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
        at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748) {code}
 

 

Code to reproduce:

 
{code:java}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieClusteringConfig
import org.apache.hudi.config.HoodieWriteConfig._

val layoutOptStrategy = "z-order";

val inputPath = s"file:///${System.getProperty("user.home")}/datasets/amazon_reviews_parquet/product_category=Personal_Care_Appliances"
val tableName = s"amazon_reviews_${layoutOptStrategy}"
val outputPath = s"file:///tmp/hudi/$tableName"

val commonOpts =
  Map(
    "hoodie.compact.inline" -> "false",
    "hoodie.bulk_insert.shuffle.parallelism" -> "10"
  )

spark.sparkContext.setLogLevel("DEBUG")

val df = spark.read.parquet(inputPath)

df.write.format("hudi")
  .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
  .option("hoodie.table.name", tableName)
  .option(PRECOMBINE_FIELD.key(), "review_id")
  .option(RECORDKEY_FIELD.key(), "review_id")
  // TODO obliterate
  .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_category")
  .option("hoodie.clustering.inline", "true")
  .option("hoodie.clustering.inline.max.commits", "1")
  .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
  .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, layoutOptStrategy)
  .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "product_id,customer_id")
  .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
  .option(BULK_INSERT_SORT_MODE.key(), "NONE")
  .options(commonOpts)
  .mode(Overwrite)
  .save(outputPath) {code}
 

Full stacktrace:

[https://gist.github.com/alexeykudinkin/8ec70674f23c797ab285fb6d2e2f14ad]

 

> Exception is thrown from HoodieAvroUtils.getNestedFieldVal() with returnNullIfNotFound as true
> ----------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2896
>                 URL: https://issues.apache.org/jira/browse/HUDI-2896
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ethan Guo
>            Priority: Major
>
> Based on Alexey's testing, when 
> {code:java}
> getNestedFieldVal(record, properties.getProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY), true){code}
>  gets called, the following exception is thrown.  However, the method is designed to return null is the field is not found in the schema.
>  
> {code:java}
> 21/11/30 10:22:42 ERROR HoodieWriteHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=R2I675JE64OFU1 partitionPath=default}, currentLocation='null', newLocation='null'}
> org.apache.avro.AvroRuntimeException: Not a valid schema field: ts
>         at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
>         at org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal(HoodieAvroUtils.java:462)
>         at org.apache.hudi.common.model.DefaultHoodieRecordPayload.updateEventTime(DefaultHoodieRecordPayload.java:90)
>         at org.apache.hudi.common.model.DefaultHoodieRecordPayload.getInsertValue(DefaultHoodieRecordPayload.java:84)
>         at org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
>         at org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
>         at org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
>         at org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
>         at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748) {code}
>  
>  
> Code to reproduce:
>  
> {code:java}
> import scala.collection.JavaConversions._
> import org.apache.spark.sql.SaveMode._
> import org.apache.hudi.DataSourceReadOptions._
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.DataSourceWriteOptions._
> import org.apache.hudi.config.HoodieClusteringConfig
> import org.apache.hudi.config.HoodieWriteConfig._
> val layoutOptStrategy = "z-order";
> val inputPath = s"file:///${System.getProperty("user.home")}/datasets/amazon_reviews_parquet/product_category=Personal_Care_Appliances"
> val tableName = s"amazon_reviews_${layoutOptStrategy}"
> val outputPath = s"file:///tmp/hudi/$tableName"
> val commonOpts =
>   Map(
>     "hoodie.compact.inline" -> "false",
>     "hoodie.bulk_insert.shuffle.parallelism" -> "10"
>   )
> spark.sparkContext.setLogLevel("DEBUG")
> val df = spark.read.parquet(inputPath)
> df.write.format("hudi")
>   .option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
>   .option("hoodie.table.name", tableName)
>   .option(PRECOMBINE_FIELD.key(), "review_id")
>   .option(RECORDKEY_FIELD.key(), "review_id")
>   // TODO obliterate
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_category")
>   .option("hoodie.clustering.inline", "true")
>   .option("hoodie.clustering.inline.max.commits", "1")
>   .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_ENABLE.key, "true")
>   .option(HoodieClusteringConfig.LAYOUT_OPTIMIZE_STRATEGY.key, layoutOptStrategy)
>   .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "product_id,customer_id")
>   .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
>   .option(BULK_INSERT_SORT_MODE.key(), "NONE")
>   .options(commonOpts)
>   .mode(Overwrite)
>   .save(outputPath) {code}
>  
> Full stacktrace:
> [https://gist.github.com/alexeykudinkin/8ec70674f23c797ab285fb6d2e2f14ad]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)