You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "cdmikechen (Jira)" <ji...@apache.org> on 2020/10/13 13:05:00 UTC

[jira] [Comment Edited] (HUDI-480) Support a querying delete data methond in incremental view

    [ https://issues.apache.org/jira/browse/HUDI-480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213102#comment-17213102 ] 

cdmikechen edited comment on HUDI-480 at 10/13/20, 1:04 PM:
------------------------------------------------------------

Sorry for starting to discuss this topic as long as so far. I have considered a general implementation based on spark to complete incremental query for deleted data:
1. We need to create a new method that use *HoodieWriteStat.getNumDeletes()* to filter files which contains delete rows.
2. We also need to create a new method that use *HoodieWriteStat.getPrevCommit()* to find previous file with same fileId.
3. Creating a new method that check current files and previous files with same fileId, and then filter out deleted rows. The implementation method is similar to spark merge method. It builds an *ExternalSpillableMap* (including COW parquet and MOR log) of the current file datasets, and then determines whether it is in prev *ExternalSpillableMap*. If the old key does not exist in *ExternalSpillableMap*, it is a deleted row.
4. Add a new column named _ hoodie_ delete_   which is the commit time of the current files. It is used to indicate the deletion time.
5. Delete mode of incremental view, we can add a new type *DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL*

{code:scala}
Dataset<Row> hudiIncQueryDF = spark.read()
     .format("org.apache.hudi")
     .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL())
     .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>)
     .load(tablePath); // For incremental query, pass in the root/base path of table
     
hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select * from hudi_trips_incremental where fare > 20.0").show()
{code}



was (Author: chenxiang):
Sorry for starting to discuss this topic as long as so far. I have considered a general implementation based on spark to complete incremental query for deleted data:
1. We need to create a new method that use *HoodieWriteStat.getNumDeletes() *to filter files which contains delete rows.
2. We also need to create a new method that use *HoodieWriteStat.getPrevCommit()* to find previous file with same fileId.
3. Creating a new method that check current files and previous files with same fileId, and then filter out deleted rows. The implementation method is similar to spark merge method. It builds an *ExternalSpillableMap* (including COW parquet and MOR log) of the current file datasetS, and then determines whether it is in prev *ExternalSpillableMap*. If it does not exist, it is a deleted row.
4. add a new column named _ hoodie_ delete_   which is the commit time of the current files. It is used to indicate the deletion time.
5. Delete mode of incremental view, we can add a new type *DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL*

{code:scala}
Dataset<Row> hudiIncQueryDF = spark.read()
     .format("org.apache.hudi")
     .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_DELETE_OPT_VAL())
     .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>)
     .load(tablePath); // For incremental query, pass in the root/base path of table
     
hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental")
spark.sql("select * from hudi_trips_incremental where fare > 20.0").show()
{code}


> Support a querying delete data methond in incremental view
> ----------------------------------------------------------
>
>                 Key: HUDI-480
>                 URL: https://issues.apache.org/jira/browse/HUDI-480
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Incremental Pull
>            Reporter: cdmikechen
>            Priority: Minor
>
> As we known, hudi have supported many method to query data in Spark and Hive and Presto. And it also provides a very good timeline idea to trace changes in data, and it can be used to query incremental data in incremental view.
> In old time, we just have insert and update funciton to upsert data, and now we have added new functions to delete some existing data.
> *[HUDI-328] Adding delete api to HoodieWriteClient* https://github.com/apache/incubator-hudi/pull/1004
> *[HUDI-377] Adding Delete() support to DeltaStreamer**https://github.com/apache/incubator-hudi/pull/1073
> So I think if we have delete api, should we add another method to get deleted data in incremental view?
> I've looked at the methods for generating new parquet files. I think the main idea is to combine old and new data, and then filter the data which need to be deleted, so that the deleted data does not exist in the new dataset. However, in this way, the data to be deleted will not be retained in new dataset, so that only the inserted or modified data can be found according to the existing timestamp field during data tracing in incremental view.
> If we can do it, I feel that there are two ideas to consider:
> 1. Trace the dataset in the same file at different time check points according to the timeline, compare the two datasets according to the key and filter out the deleted data. This method does not consume extra when writing, but it needs to call the analysis function according to the actual request during query, which consumes a lot.
> 2. When writing data, if there is any deleted data, we will record it. File name such as *.delete_filename_version_timestamp*. So that we can immediately give feedback according to the time. But additional processing will be done at the time of writing.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)