You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by "Dubey, Raghu" <ra...@amazon.com.INVALID> on 2020/05/24 14:14:46 UTC
Hudi Global Bloom Index Issue
Hi Team,
I used DeltaStreamer to ingest data and performed a test where the partition column changes. When the partition column in my dataset got updated, my hive query on Hudi dataset returned 2 rows for the same recordKey. This was expected and I got the explanation in this issue. https://github.com/apache/incubator-hudi/issues/423
However, as per the link : https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHudiindexingwork&whatareitsbenefits, I tried setting hoodie.index.type=GLOBAL_BLOOM to circumvent this issue.
Once I do this, I do not see 2 records anymore in the Hive query.
But I was expecting that it should move a file from one partition to another and update the partition column in Hudi (in my case store_id). I only have 2 store IDs (1 and 2). But if you see the results of the queries before and after the update statement, it seems to have just stopped updating the partition path altogether.
Am I doing something wrong here?
I want to have a global uniqueness of data on recordKey irrespective of partition path.
Please help.
Below is the updated incremental source data that was used (store_id is updated to 1 instead of 2).
{
"Op": "U",
"src_received_ts": "2020-05-22 15:03:51.000000",
"tran_id": 18,
"tran_date": "2019-03-16",
"store_id": 1,
"store_city": "CHICAGO",
"store_state": "IL",
"item_code": "XXXXXX",
"quantity": 126
}
Before:##########
hive> select * from XXXXX where tran_id=18;
OK
_hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name op src_received_ts tran_id tran_date store_city store_state item_code quantity total store_id
20200519123838 20200519123838_1_1 18 2 4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet I 2020-05-19 12:26:03.000000 18 2019-03-16 CHICAGO IL XXXXXX 126 22.0 2
After:##########
select * from XXXXX where tran_id=18;
OK
_hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name op src_received_ts tran_id tran_date store_city store_state item_code quantity total store_id
20200522150752 20200522150752_0_1 18 2 4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet U 2020-05-22 15:03:51.000000 18 2019-03-16 CHICAGO IL XXXXXX 126 1.19E-43 2
Time taken: 0.952 seconds, Fetched: 1 row(s)
Hudi Command:
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
--packages org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5 \
--master yarn \
--deploy-mode cluster \
/usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
--table-type COPY_ON_WRITE \
--source-ordering-field src_received_ts \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-base-path s3://xxxx/xxxxxxx \
--target-table xxxxxxx \
--props s3://xxxx/xxx/hudi_base.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--transformer-class org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
--enable-hive-sync
Here is the hudi_base.properties file:
hoodie.cleaner.commits.retained=2
hoodie.upsert.shuffle.parallelism=1
hoodie.insert.shuffle.parallelism=1
hoodie.bulkinsert.shuffle.parallelism=1
hoodie.datasource.write.recordkey.field=tran_id
hoodie.datasource.hive_sync.partition_fields=store_id
hoodie.datasource.write.partitionpath.field=store_id
hoodie.datasource.hive_sync.database=default
hoodie.datasource.hive_sync.table=XXXX
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/trgt.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/src.avsc
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
hoodie.deltastreamer.transformer.sql=select Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,item_code,quantity from <SRC>
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX
hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX
hoodie.index.type=GLOBAL_BLOOM
Thanks,
Raghu
RE: Hudi Global Bloom Index Issue
Posted by "Dubey, Raghu" <ra...@amazon.com.INVALID>.
Thanks a lot Siva! Tested and it works as expected.
-----Original Message-----
From: Sivabalan <n....@gmail.com>
Sent: Sunday, May 24, 2020 11:10 PM
To: dev@hudi.apache.org
Subject: RE: [EXTERNAL] Hudi Global Bloom Index Issue
CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe.
Hi Raghu,
Hudi has a property named "hoodie.bloom.index.update.partition.path".
You might want to try setting this to true if you need the behavior you are expecting. Here is the docs docs for this config. Default value is false for this config param.
/**
* Only applies if index type is GLOBAL_BLOOM.
* <p>
* When set to true, an update to a record with a different partition from its existing one
* will insert the record to the new partition and delete it from the old partition.
* <p>
* When set to false, a record will be updated to the old partition.
*/
Let me know if this makes sense and is what you are expecting.
On Sun, May 24, 2020 at 10:14 AM Dubey, Raghu <ra...@amazon.com.invalid>
wrote:
> Hi Team,
>
> I used DeltaStreamer to ingest data and performed a test where the
> partition column changes. When the partition column in my dataset got
> updated, my hive query on Hudi dataset returned 2 rows for the same
> recordKey. This was expected and I got the explanation in this issue.
> https://github.com/apache/incubator-hudi/issues/423
>
> However, as per the link :
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHud
> iindexingwork&whatareitsbenefits, I tried setting
> hoodie.index.type=GLOBAL_BLOOM to circumvent this issue.
> Once I do this, I do not see 2 records anymore in the Hive query.
>
> But I was expecting that it should move a file from one partition to
> another and update the partition column in Hudi (in my case store_id).
> I only have 2 store IDs (1 and 2). But if you see the results of the
> queries before and after the update statement, it seems to have just
> stopped updating the partition path altogether.
>
> Am I doing something wrong here?
> I want to have a global uniqueness of data on recordKey irrespective
> of partition path.
> Please help.
>
> Below is the updated incremental source data that was used (store_id
> is updated to 1 instead of 2).
> {
> "Op": "U",
> "src_received_ts": "2020-05-22 15:03:51.000000",
> "tran_id": 18,
> "tran_date": "2019-03-16",
> "store_id": 1,
> "store_city": "CHICAGO",
> "store_state": "IL",
> "item_code": "XXXXXX",
> "quantity": 126
> }
>
> Before:##########
>
> hive> select * from XXXXX where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key
> _hoodie_partition_path _hoodie_file_name op src_received_ts
> tran_id tran_date store_city store_state item_code quantity
> total store_id
> 20200519123838 20200519123838_1_1 18 2
> 4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet I
> 2020-05-19 12:26:03.000000 18 2019-03-16 CHICAGO IL
> XXXXXX 126 22.0 2
>
> After:##########
>
> select * from XXXXX where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key
> _hoodie_partition_path _hoodie_file_name op src_received_ts
> tran_id tran_date store_city store_state item_code quantity
> total store_id
> 20200522150752 20200522150752_0_1 18 2
> 4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet U
> 2020-05-22 15:03:51.000000 18 2019-03-16 CHICAGO IL
> XXXXXX 126 1.19E-43 2
> Time taken: 0.952 seconds, Fetched: 1 row(s)
>
> Hudi Command:
>
> spark-submit --class
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --packages
> org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache
> .spark:spark-avro_2.11:2.4.5
> \
> --master yarn \
> --deploy-mode cluster \
> /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
> --table-type COPY_ON_WRITE \ --source-ordering-field src_received_ts \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3://xxxx/xxxxxxx \ --target-table xxxxxxx \
> --props s3://xxxx/xxx/hudi_base.properties \ --schemaprovider-class
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
> --transformer-class
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --enable-hive-sync
>
> Here is the hudi_base.properties file:
>
> hoodie.cleaner.commits.retained=2
> hoodie.upsert.shuffle.parallelism=1
> hoodie.insert.shuffle.parallelism=1
> hoodie.bulkinsert.shuffle.parallelism=1
> hoodie.datasource.write.recordkey.field=tran_id
> hoodie.datasource.hive_sync.partition_fields=store_id
> hoodie.datasource.write.partitionpath.field=store_id
> hoodie.datasource.hive_sync.database=default
> hoodie.datasource.hive_sync.table=XXXX
> hoodie.datasource.hive_sync.assume_date_partitioning=false
>
> hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/
> trgt.avsc
>
> hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/s
> rc.avsc
>
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.
> hive.MultiPartKeysValueExtractor
> hoodie.deltastreamer.transformer.sql=select
> Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,i
> tem_code,quantity
> from <SRC>
> hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX
> hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX
> hoodie.index.type=GLOBAL_BLOOM
>
>
> Thanks,
> Raghu
>
>
--
Regards,
-Sivabalan
Re: Hudi Global Bloom Index Issue
Posted by Sivabalan <n....@gmail.com>.
Hi Raghu,
Hudi has a property named "hoodie.bloom.index.update.partition.path".
You might want to try setting this to true if you need the behavior you are
expecting. Here is the docs docs for this config. Default value is false
for this config param.
/**
* Only applies if index type is GLOBAL_BLOOM.
* <p>
* When set to true, an update to a record with a different partition
from its existing one
* will insert the record to the new partition and delete it from the
old partition.
* <p>
* When set to false, a record will be updated to the old partition.
*/
Let me know if this makes sense and is what you are expecting.
On Sun, May 24, 2020 at 10:14 AM Dubey, Raghu <ra...@amazon.com.invalid>
wrote:
> Hi Team,
>
> I used DeltaStreamer to ingest data and performed a test where the
> partition column changes. When the partition column in my dataset got
> updated, my hive query on Hudi dataset returned 2 rows for the same
> recordKey. This was expected and I got the explanation in this issue.
> https://github.com/apache/incubator-hudi/issues/423
>
> However, as per the link :
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-HowdoestheHudiindexingwork&whatareitsbenefits,
> I tried setting hoodie.index.type=GLOBAL_BLOOM to circumvent this issue.
> Once I do this, I do not see 2 records anymore in the Hive query.
>
> But I was expecting that it should move a file from one partition to
> another and update the partition column in Hudi (in my case store_id). I
> only have 2 store IDs (1 and 2). But if you see the results of the queries
> before and after the update statement, it seems to have just stopped
> updating the partition path altogether.
>
> Am I doing something wrong here?
> I want to have a global uniqueness of data on recordKey irrespective of
> partition path.
> Please help.
>
> Below is the updated incremental source data that was used (store_id is
> updated to 1 instead of 2).
> {
> "Op": "U",
> "src_received_ts": "2020-05-22 15:03:51.000000",
> "tran_id": 18,
> "tran_date": "2019-03-16",
> "store_id": 1,
> "store_city": "CHICAGO",
> "store_state": "IL",
> "item_code": "XXXXXX",
> "quantity": 126
> }
>
> Before:##########
>
> hive> select * from XXXXX where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key
> _hoodie_partition_path _hoodie_file_name op src_received_ts
> tran_id tran_date store_city store_state item_code quantity
> total store_id
> 20200519123838 20200519123838_1_1 18 2
> 4b683050-4151-4d76-a22a-355f73f61b15-0_1-23-21_20200519123838.parquet I
> 2020-05-19 12:26:03.000000 18 2019-03-16 CHICAGO IL
> XXXXXX 126 22.0 2
>
> After:##########
>
> select * from XXXXX where tran_id=18;
> OK
> _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key
> _hoodie_partition_path _hoodie_file_name op src_received_ts
> tran_id tran_date store_city store_state item_code quantity
> total store_id
> 20200522150752 20200522150752_0_1 18 2
> 4b683050-4151-4d76-a22a-355f73f61b15-0_0-23-20_20200522150752.parquet U
> 2020-05-22 15:03:51.000000 18 2019-03-16 CHICAGO IL
> XXXXXX 126 1.19E-43 2
> Time taken: 0.952 seconds, Fetched: 1 row(s)
>
> Hudi Command:
>
> spark-submit --class
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --packages
> org.apache.hudi:hudi-utilities-bundle_2.11:0.5.2-incubating,org.apache.spark:spark-avro_2.11:2.4.5
> \
> --master yarn \
> --deploy-mode cluster \
> /usr/lib/hudi/hudi-utilities-bundle_2.11-0.5.2-incubating.jar \
> --table-type COPY_ON_WRITE \
> --source-ordering-field src_received_ts \
> --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
> --target-base-path s3://xxxx/xxxxxxx \
> --target-table xxxxxxx \
> --props s3://xxxx/xxx/hudi_base.properties \
> --schemaprovider-class
> org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
> --transformer-class
> org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \
> --enable-hive-sync
>
> Here is the hudi_base.properties file:
>
> hoodie.cleaner.commits.retained=2
> hoodie.upsert.shuffle.parallelism=1
> hoodie.insert.shuffle.parallelism=1
> hoodie.bulkinsert.shuffle.parallelism=1
> hoodie.datasource.write.recordkey.field=tran_id
> hoodie.datasource.hive_sync.partition_fields=store_id
> hoodie.datasource.write.partitionpath.field=store_id
> hoodie.datasource.hive_sync.database=default
> hoodie.datasource.hive_sync.table=XXXX
> hoodie.datasource.hive_sync.assume_date_partitioning=false
>
> hoodie.deltastreamer.schemaprovider.target.schema.file=s3://XXXX/XXXX/trgt.avsc
>
> hoodie.deltastreamer.schemaprovider.source.schema.file=s3://XXXX/XXX/src.avsc
>
> hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
> hoodie.deltastreamer.transformer.sql=select
> Op,src_received_ts,tran_id,tran_date,store_id,store_city,store_state,item_code,quantity
> from <SRC>
> hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://XXX:XXXX
> hoodie.deltastreamer.source.dfs.root=s3://XXXX/ XXX
> hoodie.index.type=GLOBAL_BLOOM
>
>
> Thanks,
> Raghu
>
>
--
Regards,
-Sivabalan