You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Balaji Varadarajan (Jira)" <ji...@apache.org> on 2020/05/28 17:48:00 UTC

[jira] [Updated] (HUDI-934) Hive query does not work with realtime table which contain decimal type

     [ https://issues.apache.org/jira/browse/HUDI-934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Balaji Varadarajan updated HUDI-934:
------------------------------------
    Status: Open  (was: New)

> Hive query does not work with realtime table which contain decimal type
> -----------------------------------------------------------------------
>
>                 Key: HUDI-934
>                 URL: https://issues.apache.org/jira/browse/HUDI-934
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Wenning Ding
>            Priority: Major
>
> h3. Issue
> After updating a MOR table with decimal type, Hive query would fail because of a type cast exception.
> The bug looks like is because the decimal type is not correctly handled in here: https://github.com/apache/hudi/blob/master/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java#L303
> h3. Reproduction steps
> Create a MOR table with decimal type
> {code:java}
> import org.apache.spark.sql.types._
> import spark.implicits._
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.common.model.HoodieTableType
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
> import org.apache.spark.sql.Row 
> import org.apache.spark.sql.functions
> import java.util.Date
> import org.apache.spark.sql.DataFrame
> var df = Seq(
>   (100, "event_name_16", "2015-01-01T13:51:39.340396Z", 1.32, "type1"),
>   (101, "event_name_546", "2015-01-01T12:14:58.597216Z", 2.57, "type2"),
>   (104, "event_name_123", "2015-01-01T12:15:00.512679Z", 3.45, "type1"),
>   (105, "event_name_678", "2015-01-01T13:51:42.248818Z", 6.78, "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_value", "event_type")
>   
> df = df.withColumn("event_value", df.col("event_value").cast(DecimalType(4,2)))
> val tableType = HoodieTableType.MERGE_ON_READ.name
> val tableName = "test8"
> val tablePath = "s3://xxx/hudi/tables/" + tableName + "/"
> df.write.format("org.apache.hudi")
>    .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>    .option("hoodie.compact.inline", "false")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator")
>    .mode(SaveMode.Overwrite)
>    .save(tablePath)
> {code}
> Update this table w/o inline compaction:
> {code:java}
> var update_df = Seq(
>   (100, "event_name_16", "2015-01-01T13:51:39.340396Z", 9.00, "type1"),
>   (101, "event_name_546", "2015-01-01T12:14:58.597216Z", 2.57, "type2"),
>   (104, "event_name_123", "2015-01-01T12:15:00.512679Z", 8.00, "type1"),
>   (105, "event_name_678", "2015-01-01T13:51:42.248818Z", 6.78, "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_value", "event_type")
>   
> update_df = update_df.withColumn("event_value", update_df.col("event_value").cast(DecimalType(4,2)))
> update_df.write.format("org.apache.hudi")
>              .option(HoodieWriteConfig.TABLE_NAME, tableName)
>              .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>              .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, tableType)
>              .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>              .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>              .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type")
>              .option("hoodie.compact.inline", "false")
>              .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY, "true")
>              .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>              .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>              .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "default")
>              .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>              .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>              .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.keygen.ComplexKeyGenerator")
>              .mode(SaveMode.Append)
>              .save(tablePath)
> {code}
> Query _rt table with hive
> {code:java}
> hive>  select * from test8_rt;
> {code}
> Get this error
> {code:java}
> Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.hive.serde2.io.HiveDecimalWritabl
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)