You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Volodymyr Burenin (Jira)" <ji...@apache.org> on 2022/07/20 21:02:00 UTC

[jira] [Updated] (HUDI-4430) Incorrect type casting while reading HUDI table created with CustomKeyGenerator and unixtimestamp paritioning field

     [ https://issues.apache.org/jira/browse/HUDI-4430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Volodymyr Burenin updated HUDI-4430:
------------------------------------
    Description: 
Hi,

I have discovered an issue that doesn't play nicely with the custom key generatosr, basically anything that is not TimestampBasedKeyGenerator or TimestampBasedAvroKeyGenerator.

{{While trying to read a table that was created with these parameters(the rest don't matter):}}
{quote}{{hoodie.datasource.write.recordkey.field=query_id,event_type}}
{{hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator}}
{{hoodie.datasource.write.partitionpath.field=create_time_epoch_seconds:timestamp}}
{quote}
I get and error that looks like:
{quote}22/07/20 20:32:48 DEBUG Spark32HoodieParquetFileFormat: Appending StructType(StructField(create_time_epoch_seconds,LongType,true)) [2022/07/13]
22/07/20 20:32:48 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
    at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:66)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:245)
{quote}
Apparently the issue is in _partitionSchemaFromProperties function in file: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala

that checks for a class type it uses StructType of String for.
Once it is any non Timestamp based known class it basically uses whatever type it is and then fails to retrieve the value for.

I have a proposal here which we probably need: Give a user a way to force a string type if needed and add ability to add a prefixed column that contains a processed partition value. It could be done as two separate features.

This problem is critical for me, so I have to change Hoodie source code on my end temporary to make it work.

Here is how I roughly changed the referenced function:

 
{code:java}
/**
 * Get the partition schema from the hoodie.properties.
 */
private lazy val _partitionSchemaFromProperties: StructType = {
  val tableConfig = metaClient.getTableConfig
  val partitionColumns = tableConfig.getPartitionFields

  if (partitionColumns.isPresent) {
    val partitionFields = partitionColumns.get().map(column => StructField("_hoodie_"+column, StringType))
    StructType(partitionFields)
  } else {
    // If the partition columns have not stored in hoodie.properties(the table that was
    // created earlier), we trait it as a non-partitioned table.
    logWarning("No partition columns available from hoodie.properties." +
      " Partition pruning will not work")
    new StructType()
  }
} {code}

  was:
Hi,

I have discovered an issue that doesn't play nicely with the custom key generatosr, basically anything that is not TimestampBasedKeyGenerator or TimestampBasedAvroKeyGenerator.

{{While trying to read a table that was created with these parameters(the rest don't matter):}}
{quote}{{hoodie.datasource.write.recordkey.field=query_id,event_type}}
{{hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator}}
{{hoodie.datasource.write.partitionpath.field=create_time_epoch_seconds:timestamp}}
{quote}
I get and error that looks like:
{quote}22/07/20 20:32:48 DEBUG Spark32HoodieParquetFileFormat: Appending StructType(StructField(create_time_epoch_seconds,LongType,true)) [2022/07/13]
22/07/20 20:32:48 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
    at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:66)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:245)
{quote}
Apparently the issue is in _partitionSchemaFromProperties function in file: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala

that checks for a class type it uses StructType of String for.
Once it is any non Timestamp based known class it basically uses whatever type it is and then fails to retrieve the value for.

I have a proposal here which we probably need: Give a user a way to force a string type if needed and add ability to add a prefixed column that contains a processed partition value. It could be done as two separate features.

This problem is critical for me, so I have to change Hoodie source code on my end temporary to make it work.

Here is how I roughly changed the referenced function:


{quote}/**
* Get the partition schema from the hoodie.properties.
*/
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields

if (partitionColumns.isPresent) {
val partitionFields = partitionColumns.get().map(column => StructField("_hoodie_"+column, StringType))
StructType(partitionFields)
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}


{quote}


> Incorrect type casting while reading HUDI table created with CustomKeyGenerator and unixtimestamp paritioning field
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-4430
>                 URL: https://issues.apache.org/jira/browse/HUDI-4430
>             Project: Apache Hudi
>          Issue Type: Improvement
>            Reporter: Volodymyr Burenin
>            Priority: Critical
>
> Hi,
> I have discovered an issue that doesn't play nicely with the custom key generatosr, basically anything that is not TimestampBasedKeyGenerator or TimestampBasedAvroKeyGenerator.
> {{While trying to read a table that was created with these parameters(the rest don't matter):}}
> {quote}{{hoodie.datasource.write.recordkey.field=query_id,event_type}}
> {{hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator}}
> {{hoodie.datasource.write.partitionpath.field=create_time_epoch_seconds:timestamp}}
> {quote}
> I get and error that looks like:
> {quote}22/07/20 20:32:48 DEBUG Spark32HoodieParquetFileFormat: Appending StructType(StructField(create_time_epoch_seconds,LongType,true)) [2022/07/13]
> 22/07/20 20:32:48 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
> java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
>     at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107)
>     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42)
>     at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42)
>     at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195)
>     at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:66)
>     at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:245)
> {quote}
> Apparently the issue is in _partitionSchemaFromProperties function in file: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
> that checks for a class type it uses StructType of String for.
> Once it is any non Timestamp based known class it basically uses whatever type it is and then fails to retrieve the value for.
> I have a proposal here which we probably need: Give a user a way to force a string type if needed and add ability to add a prefixed column that contains a processed partition value. It could be done as two separate features.
> This problem is critical for me, so I have to change Hoodie source code on my end temporary to make it work.
> Here is how I roughly changed the referenced function:
>  
> {code:java}
> /**
>  * Get the partition schema from the hoodie.properties.
>  */
> private lazy val _partitionSchemaFromProperties: StructType = {
>   val tableConfig = metaClient.getTableConfig
>   val partitionColumns = tableConfig.getPartitionFields
>   if (partitionColumns.isPresent) {
>     val partitionFields = partitionColumns.get().map(column => StructField("_hoodie_"+column, StringType))
>     StructType(partitionFields)
>   } else {
>     // If the partition columns have not stored in hoodie.properties(the table that was
>     // created earlier), we trait it as a non-partitioned table.
>     logWarning("No partition columns available from hoodie.properties." +
>       " Partition pruning will not work")
>     new StructType()
>   }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)