You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "Bhavani Sudha (Jira)" <ji...@apache.org> on 2019/11/27 00:54:00 UTC

[jira] [Commented] (HUDI-305) Presto MOR "_rt" queries only reads base parquet file

    [ https://issues.apache.org/jira/browse/HUDI-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16983043#comment-16983043 ] 

Bhavani Sudha commented on HUDI-305:
------------------------------------

@[~bdscheller] I was able to confirm this is indeed the cause. The FileSplit returned from HoodieParquetRealtimeInputFormat is implemented in class HoodieRealtimeFileSplit. This only passes the base parquet file to the super class FileSplit and the merging actually happens in the write(DataOutput) and readFileds(DataInput) methods. These are indirectly used by RecordReaders in Hive world. However, Presto assumes every split is tangibly associated with a single FilePath. And also it uses only the *getPath(), getStart(), getLength()* APIs of the  FileSplit class. That reasons why we only see the results from the base Parquet file. 

This looks like a deep implementation change in the presto-hive connector the way it leverages FileSplits. Am trying to see if there is any way we can make a quick fix.

 

> Presto MOR "_rt" queries only reads base parquet file 
> ------------------------------------------------------
>
>                 Key: HUDI-305
>                 URL: https://issues.apache.org/jira/browse/HUDI-305
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: Presto Integration
>         Environment: On AWS EMR
>            Reporter: Brandon Scheller
>            Assignee: Bhavani Sudha Saktheeswaran
>            Priority: Major
>             Fix For: 0.5.1
>
>
> Code example to reproduce.
> {code:java}
> import org.apache.hudi.DataSourceWriteOptions
> import org.apache.hudi.config.HoodieWriteConfig
> import org.apache.spark.sql.SaveMode
> val df = Seq(
>   ("100", "event_name_900", "2015-01-01T13:51:39.340396Z", "type1"),
>   ("101", "event_name_546", "2015-01-01T12:14:58.597216Z", "type2"),
>   ("104", "event_name_123", "2015-01-01T12:15:00.512679Z", "type1"),
>   ("105", "event_name_678", "2015-01-01T13:51:42.248818Z", "type2")
>   ).toDF("event_id", "event_name", "event_ts", "event_type")
> var tableName = "hudi_events_mor_1"
> var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
> // write hudi dataset
> df.write.format("org.apache.hudi")
>   .option(HoodieWriteConfig.TABLE_NAME, tableName)
>   .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
>   .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>   .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>   .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>   .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>   .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>   .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>   .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>   .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>   .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>   .mode(SaveMode.Overwrite)
>   .save(tablePath)
> // update a record with event_name "event_name_123" => "event_name_changed"
> val df1 = spark.read.format("org.apache.hudi").load(tablePath + "/*/*")
> val df2 = df1.filter($"event_id" === "104")
> val df3 = df2.withColumn("event_name", lit("event_name_changed"))
> // update hudi dataset
> df3.write.format("org.apache.hudi")
>    .option(HoodieWriteConfig.TABLE_NAME, tableName)
>    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
>    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
>    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "event_id")
>    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "event_type") 
>    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "event_ts")
>    .option("hoodie.compact.inline", "false")
>    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
>    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
>    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "event_type")
>    .option(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY, "false")
>    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
>    .mode(SaveMode.Append)
>    .save(tablePath)
> {code}
> Now when querying the real-time table from Hive, we have no issue seeing the updated value:
> {code:java}
> hive> select event_name from hudi_events_mor_1_rt;
> OK
> event_name_900
> event_name_changed
> event_name_546
> event_name_678
> Time taken: 0.103 seconds, Fetched: 4 row(s)
> {code}
> But when querying the real-time table from Presto, we only read the base parquet file and do not see the update that should be merged in from the log file.
> {code:java}
> presto:default> select event_name from hudi_events_mor_1_rt;
>    event_name
> ----------------
>  event_name_900
>  event_name_123
>  event_name_546
>  event_name_678
> (4 rows)
> {code}
> Our current understanding of this issue is that while the HoodieParquetRealtimeInputFormat correctly generates the splits. The RealtimeCompactedRecordReader record reader is not used so it is not reading the log file and only reading the base parquet file.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)