You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Bhavani Sudha (Jira)" <ji...@apache.org> on 2020/08/06 05:19:00 UTC

[jira] [Resolved] (HUDI-305) Presto MOR "_rt" queries only reads base parquet file

     [ https://issues.apache.org/jira/browse/HUDI-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bhavani Sudha resolved HUDI-305.
--------------------------------
    Resolution: Fixed

> Presto MOR "_rt" queries only reads base parquet file 
> ------------------------------------------------------
>
>                 Key: HUDI-305
>                 URL: https://issues.apache.org/jira/browse/HUDI-305
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Presto Integration
>         Environment: On AWS EMR
>            Reporter: Brandon Scheller
>            Assignee: Bhavani Sudha
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 0.6.0
>
>
> Code example to reproduce.
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> val df = Seq(
>   ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
>   ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_events_mor_1"
> var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Overwrite)
>   .save(tablePath)
> // update a record with event_name "event_name_123" => "event_name_changed"
> val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
> val df2 = df1.filter($"event_id" === "104")
> val df3 = df2.withColumn("event_name", lit("event_name_changed"))
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option("hoodie.compact.inline", "false")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
> Now when querying the real-time table from Hive, we have no issue seeing the updated value:
> {code:java}
> hive> select event_name from hudi_events_mor_1_rt;
> OK
> event_name_900
> event_name_changed
> event_name_546
> event_name_678
> Time taken: 0.103 seconds, Fetched: 4 row(s)
> {code}
> But when querying the real-time table from Presto, we only read the base parquet file and do not see the update that should be merged in from the log file.
> {code:java}
> presto:default> select event_name from hudi_events_mor_1_rt;
>    event_name
> ----------------
>  event_name_900
>  event_name_123
>  event_name_546
>  event_name_678
> (4 rows)
> {code}
> Our current understanding of this issue is that while the HoodieParquetRealtimeInputFormat correctly generates the splits. The RealtimeCompactedRecordReader record reader is not used so it is not reading the log file and only reading the base parquet file.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)