You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by "Dubey, Raghu" <ra...@amazon.com.INVALID> on 2020/05/24 14:14:46 UTC

Hudi Global Bloom Index Issue

Hi Team,

I used DeltaStreamer to ingest data and performed a test where the partition column changes. When the partition column in my dataset got updated, my hive query on Hudi dataset returned 2 rows for the same recordKey. This was expected and I got the explanation in this issue. https://github.com/apache/incubator-hudi/issues/423

However, as per the link : https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHudiindexingwork&whatareitsbenefits, I tried setting hoodie.index.type=GLOBAL_BLOOM to circumvent this issue.
Once I do this, I do not see 2 records anymore in the Hive query.

But I was expecting that it should move a file from one partition to another and update the partition column in Hudi (in my case store_id). I only have 2 store IDs (1 and 2). But if you see the results of the queries before and after the update statement, it seems to have just stopped updating the partition path altogether.

Am I doing something wrong here?
I want to have a global uniqueness of data on recordKey irrespective of partition path.
Please help.

Below is the updated incremental source data that was used (store_id is updated to 1 instead of 2).
{
    "Op": "U",
    "src_received_ts": "2020-05-22 15:03:51.000000",
    "tran_id": 18,
    "tran_date": "2019-03-16",
    "store_id": 1,
    "store_city": "CHICAGO",
    "store_state": "IL",
    "item_code": "XXXXXX",
    "quantity": 126
}

Before:##########

hive> select * from  XXXXX  where tran_id=18;
OK
_hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key  _hoodie_partition_path     _hoodie_file_name   op  src_received_ts     tran_id    tran_date   store_city  store_state item_code  quantity    total       store_id
20200519123838  20200519123838_1_1      18      2       4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet   I       2020-05-19 12:26:03.000000     18      2019-03-16      CHICAGO IL      XXXXXX  126     22.0    2

After:##########

select * from XXXXX where tran_id=18;
OK
_hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key  _hoodie_partition_path     _hoodie_file_name   op  src_received_ts     tran_id    tran_date   store_city  store_state item_code  quantity    total       store_id
20200522150752  20200522150752_0_1      18      2       4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet   U       2020-05-22 15:03:51.000000     18      2019-03-16      CHICAGO IL      XXXXXX  126     1.19E-43        2
Time taken: 0.952 seconds, Fetched: 1 row(s)

Hudi Command:

spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
--packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--master yarn \
--deploy-mode cluster \
/usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field src_received_ts \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path s3://xxxx/xxxxxxx \
--target-table xxxxxxx \
--props s3://xxxx/xxx/hudi_base.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--enable-hive-sync

Here is the hudi_base.properties file:

hoodie.cleaner.commits.retained=2
hoodie.upsert.shuffle.parallelism=1
hoodie.insert.shuffle.parallelism=1
hoodie.bulkinsert.shuffle.parallelism=1
hoodie.datasource.write.recordkey.field=tran_id
hoodie.datasource.hive_sync.partition_fields=store_id
hoodie.datasource.write.partitionpath.field=store_id
hoodie.datasource.hive_sync.database=default
hoodie.datasource.hive_sync.table=XXXX
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/trgt.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/src.avsc
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.deltastreamer.transformer.sql=select Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,item_code,quantity from <SRC>
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX
hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX
hoodie.index.type=GLOBAL_BLOOM


Thanks,
Raghu


RE: Hudi Global Bloom Index Issue

Posted by "Dubey, Raghu" <ra...@amazon.com.INVALID>.
Thanks a lot Siva! Tested and it works as expected.


-----Original Message-----
From: Sivabalan <n....@gmail.com> 
Sent: Sunday, May 24, 2020 11:10 PM
To: dev@hudi.apache.org
Subject: RE: [EXTERNAL] Hudi Global Bloom Index Issue

CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.



Hi Raghu,
   Hudi has a property named "hoodie.bloom.index.update.partition.path".
You might want to try setting this to true if you need the behavior you are expecting. Here is the docs docs for this config. Default value is false for this config param.

/**
 * Only applies if index type is GLOBAL_BLOOM.
 * <p>
 * When set to true, an update to a record with a different partition from its existing one
 * will insert the record to the new partition and delete it from the old partition.
 * <p>
 * When set to false, a record will be updated to the old partition.
 */

Let me know if this makes sense and is what you are expecting.



On Sun, May 24, 2020 at 10:14 AM Dubey, Raghu <ra...@amazon.com.invalid>
wrote:

> Hi Team,
>
> I used DeltaStreamer to ingest data and performed a test where the 
> partition column changes. When the partition column in my dataset got 
> updated, my hive query on Hudi dataset returned 2 rows for the same 
> recordKey. This was expected and I got the explanation in this issue.
> https://github.com/apache/incubator-hudi/issues/423
>
> However, as per the link :
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHud
> iindexingwork&whatareitsbenefits, I tried setting 
> hoodie.index.type=GLOBAL_BLOOM to circumvent this issue.
> Once I do this, I do not see 2 records anymore in the Hive query.
>
> But I was expecting that it should move a file from one partition to 
> another and update the partition column in Hudi (in my case store_id). 
> I only have 2 store IDs (1 and 2). But if you see the results of the 
> queries before and after the update statement, it seems to have just 
> stopped updating the partition path altogether.
>
> Am I doing something wrong here?
> I want to have a global uniqueness of data on recordKey irrespective 
> of partition path.
> Please help.
>
> Below is the updated incremental source data that was used (store_id 
> is updated to 1 instead of 2).
> {
>     "Op": "U",
>     "src_received_ts": "2020-05-22 15:03:51.000000",
>     "tran_id": 18,
>     "tran_date": "2019-03-16",
>     "store_id": 1,
>     "store_city": "CHICAGO",
>     "store_state": "IL",
>     "item_code": "XXXXXX",
>     "quantity": 126
> }
>
> Before:##########
>
> hive> select * from  XXXXX  where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key
> _hoodie_partition_path     _hoodie_file_name   op  src_received_ts
>  tran_id    tran_date   store_city  store_state item_code  quantity
> total       store_id
> 20200519123838  20200519123838_1_1      18      2
>  4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet   I
>      2020-05-19 12:26:03.000000     18      2019-03-16      CHICAGO IL
> XXXXXX  126     22.0    2
>
> After:##########
>
> select * from XXXXX where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key
> _hoodie_partition_path     _hoodie_file_name   op  src_received_ts
>  tran_id    tran_date   store_city  store_state item_code  quantity
> total       store_id
> 20200522150752  20200522150752_0_1      18      2
>  4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet   U
>      2020-05-22 15:03:51.000000     18      2019-03-16      CHICAGO IL
> XXXXXX  126     1.19E-43        2
> Time taken: 0.952 seconds, Fetched: 1 row(s)
>
> Hudi Command:
>
> spark-submit --class
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \ 
> --packages
> org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache
> .spark:spark-avro_2.11:2.4.5
> \
> --master yarn \
> --deploy-mode cluster \
> /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \ 
> --table-type COPY_ON_WRITE \ --source-ordering-field src_received_ts \ 
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ 
> --target-base-path s3://xxxx/xxxxxxx \ --target-table xxxxxxx \ 
> --props s3://xxxx/xxx/hudi_base.properties \ --schemaprovider-class 
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ 
> --transformer-class 
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ 
> --enable-hive-sync
>
> Here is the hudi_base.properties file:
>
> hoodie.cleaner.commits.retained=2
> hoodie.upsert.shuffle.parallelism=1
> hoodie.insert.shuffle.parallelism=1
> hoodie.bulkinsert.shuffle.parallelism=1
> hoodie.datasource.write.recordkey.field=tran_id
> hoodie.datasource.hive_sync.partition_fields=store_id
> hoodie.datasource.write.partitionpath.field=store_id
> hoodie.datasource.hive_sync.database=default
> hoodie.datasource.hive_sync.table=XXXX
> hoodie.datasource.hive_sync.assume_date_partitioning=false
>
> hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/
> trgt.avsc
>
> hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/s
> rc.avsc
>
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.
> hive.MultiPartKeysValueExtractor 
> hoodie.deltastreamer.transformer.sql=select
> Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,i
> tem_code,quantity
> from <SRC>
> hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX
> hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX 
> hoodie.index.type=GLOBAL_BLOOM
>
>
> Thanks,
> Raghu
>
>

--
Regards,
-Sivabalan

Re: Hudi Global Bloom Index Issue

Posted by Sivabalan <n....@gmail.com>.
Hi Raghu,
   Hudi has a property named "hoodie.bloom.index.update.partition.path".
You might want to try setting this to true if you need the behavior you are
expecting. Here is the docs docs for this config. Default value is false
for this config param.

/**
 * Only applies if index type is GLOBAL_BLOOM.
 * <p>
 * When set to true, an update to a record with a different partition
from its existing one
 * will insert the record to the new partition and delete it from the
old partition.
 * <p>
 * When set to false, a record will be updated to the old partition.
 */

Let me know if this makes sense and is what you are expecting.



On Sun, May 24, 2020 at 10:14 AM Dubey, Raghu <ra...@amazon.com.invalid>
wrote:

> Hi Team,
>
> I used DeltaStreamer to ingest data and performed a test where the
> partition column changes. When the partition column in my dataset got
> updated, my hive query on Hudi dataset returned 2 rows for the same
> recordKey. This was expected and I got the explanation in this issue.
> https://github.com/apache/incubator-hudi/issues/423
>
> However, as per the link :
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHudiindexingwork&whatareitsbenefits,
> I tried setting hoodie.index.type=GLOBAL_BLOOM to circumvent this issue.
> Once I do this, I do not see 2 records anymore in the Hive query.
>
> But I was expecting that it should move a file from one partition to
> another and update the partition column in Hudi (in my case store_id). I
> only have 2 store IDs (1 and 2). But if you see the results of the queries
> before and after the update statement, it seems to have just stopped
> updating the partition path altogether.
>
> Am I doing something wrong here?
> I want to have a global uniqueness of data on recordKey irrespective of
> partition path.
> Please help.
>
> Below is the updated incremental source data that was used (store_id is
> updated to 1 instead of 2).
> {
>     "Op": "U",
>     "src_received_ts": "2020-05-22 15:03:51.000000",
>     "tran_id": 18,
>     "tran_date": "2019-03-16",
>     "store_id": 1,
>     "store_city": "CHICAGO",
>     "store_state": "IL",
>     "item_code": "XXXXXX",
>     "quantity": 126
> }
>
> Before:##########
>
> hive> select * from  XXXXX  where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key
> _hoodie_partition_path     _hoodie_file_name   op  src_received_ts
>  tran_id    tran_date   store_city  store_state item_code  quantity
> total       store_id
> 20200519123838  20200519123838_1_1      18      2
>  4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet   I
>      2020-05-19 12:26:03.000000     18      2019-03-16      CHICAGO IL
> XXXXXX  126     22.0    2
>
> After:##########
>
> select * from XXXXX where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno        _hoodie_record_key
> _hoodie_partition_path     _hoodie_file_name   op  src_received_ts
>  tran_id    tran_date   store_city  store_state item_code  quantity
> total       store_id
> 20200522150752  20200522150752_0_1      18      2
>  4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet   U
>      2020-05-22 15:03:51.000000     18      2019-03-16      CHICAGO IL
> XXXXXX  126     1.19E-43        2
> Time taken: 0.952 seconds, Fetched: 1 row(s)
>
> Hudi Command:
>
> spark-submit --class
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
> --packages
> org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5
> \
> --master yarn \
> --deploy-mode cluster \
> /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
> --table-type COPY_ON_WRITE \
> --source-ordering-field src_received_ts \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3://xxxx/xxxxxxx \
> --target-table xxxxxxx \
> --props s3://xxxx/xxx/hudi_base.properties \
> --schemaprovider-class
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
> --transformer-class
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --enable-hive-sync
>
> Here is the hudi_base.properties file:
>
> hoodie.cleaner.commits.retained=2
> hoodie.upsert.shuffle.parallelism=1
> hoodie.insert.shuffle.parallelism=1
> hoodie.bulkinsert.shuffle.parallelism=1
> hoodie.datasource.write.recordkey.field=tran_id
> hoodie.datasource.hive_sync.partition_fields=store_id
> hoodie.datasource.write.partitionpath.field=store_id
> hoodie.datasource.hive_sync.database=default
> hoodie.datasource.hive_sync.table=XXXX
> hoodie.datasource.hive_sync.assume_date_partitioning=false
>
> hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/trgt.avsc
>
> hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/src.avsc
>
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
> hoodie.deltastreamer.transformer.sql=select
> Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,item_code,quantity
> from <SRC>
> hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX
> hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX
> hoodie.index.type=GLOBAL_BLOOM
>
>
> Thanks,
> Raghu
>
>

-- 
Regards,
-Sivabalan