You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by Purushotham Pushpavanthar <pu...@gmail.com> on 2019/11/15 13:03:34 UTC

Spark v2.3.2 : Duplicate entries found for each primary Key

Hi,

Below is a create statement on my Hudi dataset.















*CREATE EXTERNAL TABLE `inventory`.`customer`(`_hoodie_commit_time` string,
`_hoodie_commit_seqno` string, `_hoodie_record_key` string,
`_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint,
`sales` bigint, `merchant` bigint, `item_status` bigint, `tem_shipment`
bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH
SERDEPROPERTIES (  'serialization.format' = '1')STORED AS  INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION
's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
'bucketing_version' = '2',  'transient_lastDdlTime' = '1572952974',
'last_commit_time_sync' = '20191114192136')*

I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar* in Hive,
*hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
*hudi-spark-bundle-0.5.1-SNAPSHOT.jar
*in Spark (All three share common Metastore).
We are running Hudi in COW mode and we noticed that there are multiple
versions of the .parquet files
written per partitions depending on number of updates coming to them over
each batch execution. When queried from Hive and Presto
for any Primary Key having multiple updates, it returns single record with
latest state(I assume *HoodieParquetInputFormat* does the magic of taking
care of duplicates). Whereas, when I tried to execute the same query
in Spark SQL, I get duplicated records for any Primary Key having multiple
updates.

Can someone help me understand why Spark is not able to handle
deduplication of records across multiple commits which Presto and Hive are
able to do?
I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar while
starting spark-shell. Is there something that I'm missing?

Thanks in advance.

Regards,
Purushotham Pushpavanth

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Purushotham Pushpavanthar <pu...@gmail.com>.
Figured out.
Below command worked for me in PySpark.

*spark._jsc.hadoopConfiguration().set('mapreduce.input.pathFilter.class','org.apache.hudi.hadoop.HoodieROTablePathFilter')*

Regards,
Purushotham Pushpavanth



On Mon, 18 Nov 2019 at 16:47, Purushotham Pushpavanthar <
pushpavanthar@gmail.com> wrote:

> Kabeer, can you please share *PySpark* command to register pathfileter
> class?
>
> Regards,
> Purushotham Pushpavanth
>
>
>
> On Mon, 18 Nov 2019 at 13:46, Pratyaksh Sharma <pr...@gmail.com>
> wrote:
>
>> Hi Vinoth/Kabeer,
>>
>> I have one small doubt regarding what you proposed to fix the issue. Why
>> is
>> HoodieParquetInputFormat class not able to handle deduplication of records
>> in case of spark while it is able to do so in case of presto and hive?
>>
>> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org> wrote:
>>
>> > Sweet!
>> >
>> > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
>> > pushpavanthar@gmail.com> wrote:
>> >
>> > > Thanks Vinoth and Kabeer. It resolved my problem.
>> > >
>> > > Regards,
>> > > Purushotham Pushpavanth
>> > >
>> > >
>> > >
>> > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org>
>> wrote:
>> > >
>> > > > Adding to Vinoth's response, in spark-shell you just need to copy
>> and
>> > > > paste the below line. Let us know if it still doesnt work.
>> > > >
>> > > >
>> > >
>> >
>> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
>> > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
>> > > > classOf[org.apache.hadoop.fs.PathFilter]);
>> > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
>> wrote:
>> > > > > Hi,
>> > > > >
>> > > > > are you setting the path filters when you query the Hudi Hive
>> table
>> > via
>> > > > > Spark
>> > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
>> > > > > http://hudi.apache.org/querying_data.html#spark-rt-view
>> > > alternatively)?
>> > > > >
>> > > > > - Vinoth
>> > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
>> > > > > pushpavanthar@gmail.com> wrote:
>> > > > >
>> > > > > > Hi,
>> > > > > > Below is a create statement on my Hudi dataset.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > *CREATE EXTERNAL TABLE
>> `inventory`.`customer`(`_hoodie_commit_time`
>> > > > string,
>> > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
>> > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string,
>> `id`
>> > > > bigint,
>> > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
>> > > `tem_shipment`
>> > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
>> > > > > > 'org.apache.hadoop.hive.ql.io
>> .parquet.serde.ParquetHiveSerDe'WITH
>> > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
>> > INPUTFORMAT
>> > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
>> > > > > > 'org.apache.hadoop.hive.ql.io
>> > > > .parquet.MapredParquetOutputFormat'LOCATION
>> > > > > >
>> 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
>> > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
>> '1572952974',
>> > > > > > 'last_commit_time_sync' = '20191114192136')*
>> > > > > >
>> > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
>> in
>> > > > Hive,
>> > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
>> > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
>> > > > > > *in Spark (All three share common Metastore).
>> > > > > > We are running Hudi in COW mode and we noticed that there are
>> > > multiple
>> > > > > > versions of the .parquet files
>> > > > > > written per partitions depending on number of updates coming to
>> > them
>> > > > over
>> > > > > > each batch execution. When queried from Hive and Presto
>> > > > > > for any Primary Key having multiple updates, it returns single
>> > record
>> > > > with
>> > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
>> of
>> > > > taking
>> > > > > > care of duplicates). Whereas, when I tried to execute the same
>> > query
>> > > > > > in Spark SQL, I get duplicated records for any Primary Key
>> having
>> > > > multiple
>> > > > > > updates.
>> > > > > >
>> > > > > > Can someone help me understand why Spark is not able to handle
>> > > > > > deduplication of records across multiple commits which Presto
>> and
>> > > Hive
>> > > > are
>> > > > > > able to do?
>> > > > > > I've taken care of providing
>> hudi-spark-bundle-0.5.1-SNAPSHOT.jar
>> > > while
>> > > > > > starting spark-shell. Is there something that I'm missing?
>> > > > > >
>> > > > > > Thanks in advance.
>> > > > > > Regards,
>> > > > > > Purushotham Pushpavanth
>> > > > >
>> > > > >
>> > > >
>> > > >
>> > >
>> >
>>
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Purushotham Pushpavanthar <pu...@gmail.com>.
Kabeer, can you please share *PySpark* command to register pathfileter
class?

Regards,
Purushotham Pushpavanth



On Mon, 18 Nov 2019 at 13:46, Pratyaksh Sharma <pr...@gmail.com>
wrote:

> Hi Vinoth/Kabeer,
>
> I have one small doubt regarding what you proposed to fix the issue. Why is
> HoodieParquetInputFormat class not able to handle deduplication of records
> in case of spark while it is able to do so in case of presto and hive?
>
> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org> wrote:
>
> > Sweet!
> >
> > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > pushpavanthar@gmail.com> wrote:
> >
> > > Thanks Vinoth and Kabeer. It resolved my problem.
> > >
> > > Regards,
> > > Purushotham Pushpavanth
> > >
> > >
> > >
> > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org>
> wrote:
> > >
> > > > Adding to Vinoth's response, in spark-shell you just need to copy and
> > > > paste the below line. Let us know if it still doesnt work.
> > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
> wrote:
> > > > > Hi,
> > > > >
> > > > > are you setting the path filters when you query the Hudi Hive table
> > via
> > > > > Spark
> > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > http://hudi.apache.org/querying_data.html#spark-rt-view
> > > alternatively)?
> > > > >
> > > > > - Vinoth
> > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > pushpavanthar@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > > Below is a create statement on my Hudi dataset.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > *CREATE EXTERNAL TABLE
> `inventory`.`customer`(`_hoodie_commit_time`
> > > > string,
> > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> > > > bigint,
> > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > `tem_shipment`
> > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > INPUTFORMAT
> > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > > 'org.apache.hadoop.hive.ql.io
> > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> (
> > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> '1572952974',
> > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > >
> > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> in
> > > > Hive,
> > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > *in Spark (All three share common Metastore).
> > > > > > We are running Hudi in COW mode and we noticed that there are
> > > multiple
> > > > > > versions of the .parquet files
> > > > > > written per partitions depending on number of updates coming to
> > them
> > > > over
> > > > > > each batch execution. When queried from Hive and Presto
> > > > > > for any Primary Key having multiple updates, it returns single
> > record
> > > > with
> > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
> of
> > > > taking
> > > > > > care of duplicates). Whereas, when I tried to execute the same
> > query
> > > > > > in Spark SQL, I get duplicated records for any Primary Key having
> > > > multiple
> > > > > > updates.
> > > > > >
> > > > > > Can someone help me understand why Spark is not able to handle
> > > > > > deduplication of records across multiple commits which Presto and
> > > Hive
> > > > are
> > > > > > able to do?
> > > > > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > while
> > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > >
> > > > > > Thanks in advance.
> > > > > > Regards,
> > > > > > Purushotham Pushpavanth
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Bhavani Sudha <bh...@gmail.com>.
Thanks Kabeer. Will take a look and merge.

- Sudha

On Sun, Nov 24, 2019 at 1:11 PM Kabeer Ahmed <ka...@linuxmail.org> wrote:

> Dear Sudha,
>
> I have added a new question and answer in the comments section now (last
> one on the link: https://cwiki.apache.org/confluence/display/HUDI/FAQ
> <https://link.getmailspring.com/link/E720D4C1-8153-4631-AA01-05EF40AF3B9F@getmailspring.com/0?redirect=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FHUDI%2FFAQ&recipient=ZGV2QGh1ZGkuYXBhY2hlLm9yZw%3D%3D>).
> Kindly review and if there are any questions please let me know.
>
> Thanks
> Kabeer.
>
>
> On Nov 20 2019, at 10:58 pm, Bhavani Sudha <bh...@gmail.com>
> wrote:
>
> Thanks Kabeer. I think this would be a great faq question for new users.
>
> I think you should already be able to contribute to faqs like mentioned
> here - https://hudi.incubator.apache.org/community.html#accounts
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-ContributingtoFAQ
> . Please let me know if you see any troubles.
>
> Thanks,
> Sudha
>
> On Wed, Nov 20, 2019 at 9:20 AM Kabeer Ahmed <ka...@linuxmail.org> wrote:
>
> Sudha
>
> Do you think this is a good addition to FAQ? Since this might be a common
> question when a new user gets to see the hudi documentation. You could add
> it or I am happy to do it if you give me access. My apache jira id is:
> smdahmed.
> Thanks,
> Kabeer.
>
> On Nov 20 2019, at 7:23 am, Pratyaksh Sharma <pr...@gmail.com>
> wrote:
>
> Thank you for the explanation Kabeer/Sudha.
>
> Let me go through the flow and revert back in case of any further
>
> queries.
>
> On Wed, Nov 20, 2019 at 6:21 AM Kabeer Ahmed <ka...@linuxmail.org>
>
> wrote:
>
> Pratyaksh,
> +1 to what Sudha has written. Lets zoom a bit closer.
> For hive, as you said, we explicitly set input format to
> HoodieParquetInputFormat.
> - HoodieParquetInputFormat extends MapredParquetInputFormat which is
> nothing but a input format for hive. Hive and Presto depend on this
>
> file to
>
> retrieve dataset from Hudi.
>
> For Spark, there is no such option to set this explicitly. Rather Spark
> starts reading the paths direct from the file system (HDFS or S3). From
> Spark the calls would be as below:
> - org.apache.spark.rdd.NewHadoopRDD.getPartitions
> - org.apache.parquet.hadoop.ParquetInputFormat.getSplits
> - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
>
> Now it is evident that we cant stick the HoodieParquetInputFormat.
>
> Rather
>
> we rely on the PathFilter class that allows us to filter out the paths
>
> (and
>
> files). So we explicitly set this in the Spark Hadoop Configuration
>
> (note
>
> that Spark uses Hadoop FS S3 implementation to read from S3).
>
> If you look into the file: HoodieROTablePathFilter, you will see that
> there is logic to ensure that folders (paths) or files for Hoodie
>
> related
>
> files always ensures that latest path/file is selected. Thus you do
>
> not see
>
> duplicate entries when you set. Without this, Spark is just plainly
>
> reading
>
> all the parquet files and displaying the data within them.
>
> It may take sometime from you to go through these paths and digest the
> flow. But should you still have any questions, please do not hesitate
>
> to
>
> revert back.
>
> Hope this helps
> Kabeer.
>
>
>
> Sent: Monday, November 18, 2019 at 7:47 PM
> From: "Bhavani Sudha" <bh...@gmail.com>
> To: dev@hudi.apache.org
> Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary
>
> Key
>
> Hi Pratyaksh,
>
> Let me try to answer this. I believe spark does not natively invoke
> HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
> queried, spark just loads all the data files in that partition without
> applying Hoodie filtering logic. Thats why we need to instruct Spark to
> read in the appropriate format in one of the two ways suggested by
> Vinoth/Kabeer earlier.
>
> Thanks,
> Sudha
>
> On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <
>
> pratyaksh13@gmail.com>
>
> wrote:
>
> Hi Vinoth/Kabeer,
> I have one small doubt regarding what you proposed to fix the issue.
>
> Why
>
> is
>
> HoodieParquetInputFormat class not able to handle deduplication of
>
>
> records
>
> in case of spark while it is able to do so in case of presto and
>
> hive?
>
>
> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org>
>
> wrote:
>
>
> Sweet!
> On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> pushpavanthar@gmail.com> wrote:
>
> Thanks Vinoth and Kabeer. It resolved my problem.
> Regards,
> Purushotham Pushpavanth
>
>
>
> On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <kabeer@linuxmail.org
>
>
> wrote:
>
>
> Adding to Vinoth's response, in spark-shell you just need to
>
> copy
>
> and
>
> paste the below line. Let us know if it still doesnt work.
>
>
>
>
>
>
>
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
>
> classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> classOf[org.apache.hadoop.fs.PathFilter]);
> On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
>
>
>
>
> wrote:
>
> Hi,
>
> are you setting the path filters when you query the Hudi Hive
>
> table
>
> via
>
> Spark
> http://hudi.apache.org/querying_data.html#spark-ro-view (or
>
>
>
> http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
>
> alternatively)?
>
>
> - Vinoth
> On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> pushpavanthar@gmail.com> wrote:
>
> Hi,
> Below is a create statement on my Hudi dataset.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *CREATE EXTERNAL TABLE
>
> `inventory`.`customer`(`_hoodie_commit_time`
>
> string,
>
> `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> `_hoodie_partition_path` string, `_hoodie_file_name`
>
> string,
>
>
>
>
>
>
>
> `id`
>
> bigint,
>
> `sales` bigint, `merchant` bigint, `item_status` bigint,
>
>
>
>
> `tem_shipment`
>
> bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> 'org.apache.hadoop.hive.ql.io
>
>
>
>
>
>
> .parquet.serde.ParquetHiveSerDe'WITH
>
> SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
>
>
>
>
>
> INPUTFORMAT
>
> 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
>
> OUTPUTFORMAT
>
> 'org.apache.hadoop.hive.ql.io
>
>
>
> .parquet.MapredParquetOutputFormat'LOCATION
>
>
> 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
>
> (
>
> 'bucketing_version' = '2', 'transient_lastDdlTime' =
>
>
>
>
>
>
> '1572952974',
>
> 'last_commit_time_sync' = '20191114192136')*
>
> I've taken care of adding
>
> *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
>
> in
>
> Hive,
>
> *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> *in Spark (All three share common Metastore).
> We are running Hudi in COW mode and we noticed that there
>
> are
>
>
>
>
> multiple
>
> versions of the .parquet files
> written per partitions depending on number of updates
>
> coming to
>
>
>
>
>
> them
>
> over
>
> each batch execution. When queried from Hive and Presto
> for any Primary Key having multiple updates, it returns
>
> single
>
>
>
>
>
> record
>
> with
>
> latest state(I assume *HoodieParquetInputFormat* does the
>
> magic
>
>
>
>
>
>
> of
>
> taking
>
> care of duplicates). Whereas, when I tried to execute the
>
> same
>
>
>
>
>
> query
>
> in Spark SQL, I get duplicated records for any Primary Key
>
>
>
>
>
>
>
> having
>
> multiple
>
> updates.
>
> Can someone help me understand why Spark is not able to
>
> handle
>
> deduplication of records across multiple commits which
>
> Presto
>
>
>
>
>
>
>
> and
>
> Hive
>
> are
>
> able to do?
> I've taken care of providing
>
>
>
>
>
>
>
> hudi-spark-bundle-0.5.1-SNAPSHOT.jar
>
> while
>
> starting spark-shell. Is there something that I'm missing?
>
> Thanks in advance.
> Regards,
> Purushotham Pushpavanth
>
> [image: Sent from Mailspring]

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Kabeer Ahmed <ka...@linuxmail.org>.
Dear Sudha,

I have added a new question and answer in the comments section now (last one on the link: https://cwiki.apache.org/confluence/display/HUDI/FAQ (https://link.getmailspring.com/link/E720D4C1-8153-4631-AA01-05EF40AF3B9F@getmailspring.com/0?redirect=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FHUDI%2FFAQ&recipient=ZGV2QGh1ZGkuYXBhY2hlLm9yZw%3D%3D)). Kindly review and if there are any questions please let me know.
Thanks
Kabeer.

On Nov 20 2019, at 10:58 pm, Bhavani Sudha <bh...@gmail.com> wrote:
> Thanks Kabeer. I think this would be a great faq question for new users.
>
> I think you should already be able to contribute to faqs like mentioned
> here - https://hudi.incubator.apache.org/community.html#accounts
> https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-ContributingtoFAQ
> . Please let me know if you see any troubles.
>
> Thanks,
> Sudha
>
> On Wed, Nov 20, 2019 at 9:20 AM Kabeer Ahmed <ka...@linuxmail.org> wrote:
> > Sudha
> > Do you think this is a good addition to FAQ? Since this might be a common
> > question when a new user gets to see the hudi documentation. You could add
> > it or I am happy to do it if you give me access. My apache jira id is:
> > smdahmed.
> > Thanks,
> > Kabeer.
> >
> > On Nov 20 2019, at 7:23 am, Pratyaksh Sharma <pr...@gmail.com>
> > wrote:
> > > Thank you for the explanation Kabeer/Sudha.
> > >
> > > Let me go through the flow and revert back in case of any further
> > queries.
> > > On Wed, Nov 20, 2019 at 6:21 AM Kabeer Ahmed <ka...@linuxmail.org>
> >
> > wrote:
> > > > Pratyaksh,
> > > > +1 to what Sudha has written. Lets zoom a bit closer.
> > > > For hive, as you said, we explicitly set input format to
> > > > HoodieParquetInputFormat.
> > > > - HoodieParquetInputFormat extends MapredParquetInputFormat which is
> > > > nothing but a input format for hive. Hive and Presto depend on this
> > >
> >
> > file to
> > > > retrieve dataset from Hudi.
> > > >
> > > > For Spark, there is no such option to set this explicitly. Rather Spark
> > > > starts reading the paths direct from the file system (HDFS or S3). From
> > > > Spark the calls would be as below:
> > > > - org.apache.spark.rdd.NewHadoopRDD.getPartitions
> > > > - org.apache.parquet.hadoop.ParquetInputFormat.getSplits
> > > > - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
> > > >
> > > > Now it is evident that we cant stick the HoodieParquetInputFormat.
> > Rather
> > > > we rely on the PathFilter class that allows us to filter out the paths
> > >
> >
> > (and
> > > > files). So we explicitly set this in the Spark Hadoop Configuration
> > >
> >
> > (note
> > > > that Spark uses Hadoop FS S3 implementation to read from S3).
> > > >
> > > > If you look into the file: HoodieROTablePathFilter, you will see that
> > > > there is logic to ensure that folders (paths) or files for Hoodie
> > >
> >
> > related
> > > > files always ensures that latest path/file is selected. Thus you do
> > >
> >
> > not see
> > > > duplicate entries when you set. Without this, Spark is just plainly
> > >
> >
> > reading
> > > > all the parquet files and displaying the data within them.
> > > >
> > > > It may take sometime from you to go through these paths and digest the
> > > > flow. But should you still have any questions, please do not hesitate
> > >
> >
> > to
> > > > revert back.
> > > >
> > > > Hope this helps
> > > > Kabeer.
> > > >
> > > >
> > > >
> > > > Sent: Monday, November 18, 2019 at 7:47 PM
> > > > From: "Bhavani Sudha" <bh...@gmail.com>
> > > > To: dev@hudi.apache.org
> > > > Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary
> > >
> >
> > Key
> > > > Hi Pratyaksh,
> > > >
> > > > Let me try to answer this. I believe spark does not natively invoke
> > > > HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
> > > > queried, spark just loads all the data files in that partition without
> > > > applying Hoodie filtering logic. Thats why we need to instruct Spark to
> > > > read in the appropriate format in one of the two ways suggested by
> > > > Vinoth/Kabeer earlier.
> > > >
> > > > Thanks,
> > > > Sudha
> > > >
> > > > On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <
> > pratyaksh13@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Vinoth/Kabeer,
> > > > > I have one small doubt regarding what you proposed to fix the issue.
> > > >
> > >
> >
> > Why
> > > > is
> > > > > HoodieParquetInputFormat class not able to handle deduplication of
> > > >
> > > >
> > > > records
> > > > > in case of spark while it is able to do so in case of presto and
> > > >
> > >
> >
> > hive?
> > > > >
> > > > > On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Sweet!
> > > > > > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > > > > > pushpavanthar@gmail.com> wrote:
> > > > > >
> > > > > > > Thanks Vinoth and Kabeer. It resolved my problem.
> > > > > > > Regards,
> > > > > > > Purushotham Pushpavanth
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <kabeer@linuxmail.org
> > > > > wrote:
> > > > > > >
> > > > > > > > Adding to Vinoth's response, in spark-shell you just need to
> > copy
> > > > and
> > > > > > > > paste the below line. Let us know if it still doesnt work.
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > > > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > > > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > > > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > wrote:
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > are you setting the path filters when you query the Hudi Hive
> > > > table
> > > > > > via
> > > > > > > > > Spark
> > > > > > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > > > > >
> > > >
> > http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > > > > > alternatively)?
> > > > > > > > >
> > > > > > > > > - Vinoth
> > > > > > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > > > > > pushpavanthar@gmail.com> wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > Below is a create statement on my Hudi dataset.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > *CREATE EXTERNAL TABLE
> > > > > `inventory`.`customer`(`_hoodie_commit_time`
> > > > > > > > string,
> > > > > > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > > > > > `_hoodie_partition_path` string, `_hoodie_file_name`
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > string,
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > `id`
> > > > > > > > bigint,
> > > > > > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > > `tem_shipment`
> > > > > > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > > > > > 'org.apache.hadoop.hive.ql.io
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > INPUTFORMAT
> > > > > > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > OUTPUTFORMAT
> > > > > > > > > > 'org.apache.hadoop.hive.ql.io
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > > > > >
> > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> > > > > (
> > > > > > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > '1572952974',
> > > > > > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > > > > > >
> > > > > > > > > > I've taken care of adding
> > *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> > > > > in
> > > > > > > > Hive,
> > > > > > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > > > > > *in Spark (All three share common Metastore).
> > > > > > > > > > We are running Hudi in COW mode and we noticed that there
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > are
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > > multiple
> > > > > > > > > > versions of the .parquet files
> > > > > > > > > > written per partitions depending on number of updates
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > coming to
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > them
> > > > > > > > over
> > > > > > > > > > each batch execution. When queried from Hive and Presto
> > > > > > > > > > for any Primary Key having multiple updates, it returns
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > single
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > record
> > > > > > > > with
> > > > > > > > > > latest state(I assume *HoodieParquetInputFormat* does the
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > magic
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > of
> > > > > > > > taking
> > > > > > > > > > care of duplicates). Whereas, when I tried to execute the
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > same
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > query
> > > > > > > > > > in Spark SQL, I get duplicated records for any Primary Key
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > having
> > > > > > > > multiple
> > > > > > > > > > updates.
> > > > > > > > > >
> > > > > > > > > > Can someone help me understand why Spark is not able to
> > handle
> > > > > > > > > > deduplication of records across multiple commits which
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > Presto
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > and
> > > > > > > Hive
> > > > > > > > are
> > > > > > > > > > able to do?
> > > > > > > > > > I've taken care of providing
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > > while
> > > > > > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > > > > > >
> > > > > > > > > > Thanks in advance.
> > > > > > > > > > Regards,
> > > > > > > > > > Purushotham Pushpavanth
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>


Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Bhavani Sudha <bh...@gmail.com>.
Thanks Kabeer. I think this would be a great faq question for new users.

I think you should already be able to contribute to faqs like mentioned
here - https://hudi.incubator.apache.org/community.html#accounts
https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-ContributingtoFAQ
. Please let me know if you see any troubles.

Thanks,
Sudha

On Wed, Nov 20, 2019 at 9:20 AM Kabeer Ahmed <ka...@linuxmail.org> wrote:

> Sudha
>
> Do you think this is a good addition to FAQ? Since this might be a common
> question when a new user gets to see the hudi documentation. You could add
> it or I am happy to do it if you give me access. My apache jira id is:
> smdahmed.
> Thanks,
> Kabeer.
>
> On Nov 20 2019, at 7:23 am, Pratyaksh Sharma <pr...@gmail.com>
> wrote:
> > Thank you for the explanation Kabeer/Sudha.
> >
> > Let me go through the flow and revert back in case of any further
> queries.
> > On Wed, Nov 20, 2019 at 6:21 AM Kabeer Ahmed <ka...@linuxmail.org>
> wrote:
> > > Pratyaksh,
> > > +1 to what Sudha has written. Lets zoom a bit closer.
> > > For hive, as you said, we explicitly set input format to
> > > HoodieParquetInputFormat.
> > > - HoodieParquetInputFormat extends MapredParquetInputFormat which is
> > > nothing but a input format for hive. Hive and Presto depend on this
> file to
> > > retrieve dataset from Hudi.
> > >
> > > For Spark, there is no such option to set this explicitly. Rather Spark
> > > starts reading the paths direct from the file system (HDFS or S3). From
> > > Spark the calls would be as below:
> > > - org.apache.spark.rdd.NewHadoopRDD.getPartitions
> > > - org.apache.parquet.hadoop.ParquetInputFormat.getSplits
> > > - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
> > >
> > > Now it is evident that we cant stick the HoodieParquetInputFormat.
> Rather
> > > we rely on the PathFilter class that allows us to filter out the paths
> (and
> > > files). So we explicitly set this in the Spark Hadoop Configuration
> (note
> > > that Spark uses Hadoop FS S3 implementation to read from S3).
> > >
> > > If you look into the file: HoodieROTablePathFilter, you will see that
> > > there is logic to ensure that folders (paths) or files for Hoodie
> related
> > > files always ensures that latest path/file is selected. Thus you do
> not see
> > > duplicate entries when you set. Without this, Spark is just plainly
> reading
> > > all the parquet files and displaying the data within them.
> > >
> > > It may take sometime from you to go through these paths and digest the
> > > flow. But should you still have any questions, please do not hesitate
> to
> > > revert back.
> > >
> > > Hope this helps
> > > Kabeer.
> > >
> > >
> > >
> > > Sent: Monday, November 18, 2019 at 7:47 PM
> > > From: "Bhavani Sudha" <bh...@gmail.com>
> > > To: dev@hudi.apache.org
> > > Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary
> Key
> > > Hi Pratyaksh,
> > >
> > > Let me try to answer this. I believe spark does not natively invoke
> > > HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
> > > queried, spark just loads all the data files in that partition without
> > > applying Hoodie filtering logic. Thats why we need to instruct Spark to
> > > read in the appropriate format in one of the two ways suggested by
> > > Vinoth/Kabeer earlier.
> > >
> > > Thanks,
> > > Sudha
> > >
> > > On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <
> pratyaksh13@gmail.com>
> > > wrote:
> > >
> > > > Hi Vinoth/Kabeer,
> > > > I have one small doubt regarding what you proposed to fix the issue.
> Why
> > > is
> > > > HoodieParquetInputFormat class not able to handle deduplication of
> > >
> > > records
> > > > in case of spark while it is able to do so in case of presto and
> hive?
> > > >
> > > > On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org>
> > > wrote:
> > > >
> > > > > Sweet!
> > > > > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > > > > pushpavanthar@gmail.com> wrote:
> > > > >
> > > > > > Thanks Vinoth and Kabeer. It resolved my problem.
> > > > > > Regards,
> > > > > > Purushotham Pushpavanth
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <kabeer@linuxmail.org
> >
> > > > wrote:
> > > > > >
> > > > > > > Adding to Vinoth's response, in spark-shell you just need to
> copy
> > > and
> > > > > > > paste the below line. Let us know if it still doesnt work.
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
> > > > > >
> > > > >
> > > >
> > > > wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > are you setting the path filters when you query the Hudi Hive
> > > table
> > > > > via
> > > > > > > > Spark
> > > > > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > > > >
> > >
> http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > > > > alternatively)?
> > > > > > > >
> > > > > > > > - Vinoth
> > > > > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > > > > pushpavanthar@gmail.com> wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > Below is a create statement on my Hudi dataset.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > *CREATE EXTERNAL TABLE
> > > > `inventory`.`customer`(`_hoodie_commit_time`
> > > > > > > string,
> > > > > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > > > > `_hoodie_partition_path` string, `_hoodie_file_name`
> string,
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > > `id`
> > > > > > > bigint,
> > > > > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > `tem_shipment`
> > > > > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > > > > 'org.apache.hadoop.hive.ql.io
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > INPUTFORMAT
> > > > > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
> OUTPUTFORMAT
> > > > > > > > > 'org.apache.hadoop.hive.ql.io
> > > > > > > >
> > > > > > >
> > > > > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > > > >
> > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> > > > (
> > > > > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > '1572952974',
> > > > > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > > > > >
> > > > > > > > > I've taken care of adding
> *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> > > > in
> > > > > > > Hive,
> > > > > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > > > > *in Spark (All three share common Metastore).
> > > > > > > > > We are running Hudi in COW mode and we noticed that there
> are
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > > multiple
> > > > > > > > > versions of the .parquet files
> > > > > > > > > written per partitions depending on number of updates
> coming to
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > them
> > > > > > > over
> > > > > > > > > each batch execution. When queried from Hive and Presto
> > > > > > > > > for any Primary Key having multiple updates, it returns
> single
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > record
> > > > > > > with
> > > > > > > > > latest state(I assume *HoodieParquetInputFormat* does the
> magic
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > of
> > > > > > > taking
> > > > > > > > > care of duplicates). Whereas, when I tried to execute the
> same
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > > query
> > > > > > > > > in Spark SQL, I get duplicated records for any Primary Key
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > > having
> > > > > > > multiple
> > > > > > > > > updates.
> > > > > > > > >
> > > > > > > > > Can someone help me understand why Spark is not able to
> handle
> > > > > > > > > deduplication of records across multiple commits which
> Presto
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > > and
> > > > > > Hive
> > > > > > > are
> > > > > > > > > able to do?
> > > > > > > > > I've taken care of providing
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > > hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > while
> > > > > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > > > > >
> > > > > > > > > Thanks in advance.
> > > > > > > > > Regards,
> > > > > > > > > Purushotham Pushpavanth
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
>
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Kabeer Ahmed <ka...@linuxmail.org>.
Sudha

Do you think this is a good addition to FAQ? Since this might be a common question when a new user gets to see the hudi documentation. You could add it or I am happy to do it if you give me access. My apache jira id is: smdahmed.
Thanks,
Kabeer.

On Nov 20 2019, at 7:23 am, Pratyaksh Sharma <pr...@gmail.com> wrote:
> Thank you for the explanation Kabeer/Sudha.
>
> Let me go through the flow and revert back in case of any further queries.
> On Wed, Nov 20, 2019 at 6:21 AM Kabeer Ahmed <ka...@linuxmail.org> wrote:
> > Pratyaksh,
> > +1 to what Sudha has written. Lets zoom a bit closer.
> > For hive, as you said, we explicitly set input format to
> > HoodieParquetInputFormat.
> > - HoodieParquetInputFormat extends MapredParquetInputFormat which is
> > nothing but a input format for hive. Hive and Presto depend on this file to
> > retrieve dataset from Hudi.
> >
> > For Spark, there is no such option to set this explicitly. Rather Spark
> > starts reading the paths direct from the file system (HDFS or S3). From
> > Spark the calls would be as below:
> > - org.apache.spark.rdd.NewHadoopRDD.getPartitions
> > - org.apache.parquet.hadoop.ParquetInputFormat.getSplits
> > - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
> >
> > Now it is evident that we cant stick the HoodieParquetInputFormat. Rather
> > we rely on the PathFilter class that allows us to filter out the paths (and
> > files). So we explicitly set this in the Spark Hadoop Configuration (note
> > that Spark uses Hadoop FS S3 implementation to read from S3).
> >
> > If you look into the file: HoodieROTablePathFilter, you will see that
> > there is logic to ensure that folders (paths) or files for Hoodie related
> > files always ensures that latest path/file is selected. Thus you do not see
> > duplicate entries when you set. Without this, Spark is just plainly reading
> > all the parquet files and displaying the data within them.
> >
> > It may take sometime from you to go through these paths and digest the
> > flow. But should you still have any questions, please do not hesitate to
> > revert back.
> >
> > Hope this helps
> > Kabeer.
> >
> >
> >
> > Sent: Monday, November 18, 2019 at 7:47 PM
> > From: "Bhavani Sudha" <bh...@gmail.com>
> > To: dev@hudi.apache.org
> > Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary Key
> > Hi Pratyaksh,
> >
> > Let me try to answer this. I believe spark does not natively invoke
> > HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
> > queried, spark just loads all the data files in that partition without
> > applying Hoodie filtering logic. Thats why we need to instruct Spark to
> > read in the appropriate format in one of the two ways suggested by
> > Vinoth/Kabeer earlier.
> >
> > Thanks,
> > Sudha
> >
> > On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <pr...@gmail.com>
> > wrote:
> >
> > > Hi Vinoth/Kabeer,
> > > I have one small doubt regarding what you proposed to fix the issue. Why
> > is
> > > HoodieParquetInputFormat class not able to handle deduplication of
> >
> > records
> > > in case of spark while it is able to do so in case of presto and hive?
> > >
> > > On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org>
> > wrote:
> > >
> > > > Sweet!
> > > > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > > > pushpavanthar@gmail.com> wrote:
> > > >
> > > > > Thanks Vinoth and Kabeer. It resolved my problem.
> > > > > Regards,
> > > > > Purushotham Pushpavanth
> > > > >
> > > > >
> > > > >
> > > > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org>
> > > wrote:
> > > > >
> > > > > > Adding to Vinoth's response, in spark-shell you just need to copy
> > and
> > > > > > paste the below line. Let us know if it still doesnt work.
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
> > > > >
> > > >
> > >
> > > wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > are you setting the path filters when you query the Hudi Hive
> > table
> > > > via
> > > > > > > Spark
> > > > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > > >
> > http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > > > alternatively)?
> > > > > > >
> > > > > > > - Vinoth
> > > > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > > > pushpavanthar@gmail.com> wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > > Below is a create statement on my Hudi dataset.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > *CREATE EXTERNAL TABLE
> > > `inventory`.`customer`(`_hoodie_commit_time`
> > > > > > string,
> > > > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string,
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > `id`
> > > > > > bigint,
> > > > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > > > > >
> > > > > >
> > > > >
> > > > > `tem_shipment`
> > > > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > > > 'org.apache.hadoop.hive.ql.io
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > > .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > INPUTFORMAT
> > > > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > > > > 'org.apache.hadoop.hive.ql.io
> > > > > > >
> > > > > >
> > > > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > > >
> > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> > > (
> > > > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > > '1572952974',
> > > > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > > > >
> > > > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> > > in
> > > > > > Hive,
> > > > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > > > *in Spark (All three share common Metastore).
> > > > > > > > We are running Hudi in COW mode and we noticed that there are
> > > > > > >
> > > > > >
> > > > >
> > > > > multiple
> > > > > > > > versions of the .parquet files
> > > > > > > > written per partitions depending on number of updates coming to
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > them
> > > > > > over
> > > > > > > > each batch execution. When queried from Hive and Presto
> > > > > > > > for any Primary Key having multiple updates, it returns single
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > record
> > > > > > with
> > > > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > > of
> > > > > > taking
> > > > > > > > care of duplicates). Whereas, when I tried to execute the same
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > > query
> > > > > > > > in Spark SQL, I get duplicated records for any Primary Key
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > having
> > > > > > multiple
> > > > > > > > updates.
> > > > > > > >
> > > > > > > > Can someone help me understand why Spark is not able to handle
> > > > > > > > deduplication of records across multiple commits which Presto
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > and
> > > > > Hive
> > > > > > are
> > > > > > > > able to do?
> > > > > > > > I've taken care of providing
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> > hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > while
> > > > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > > > >
> > > > > > > > Thanks in advance.
> > > > > > > > Regards,
> > > > > > > > Purushotham Pushpavanth
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>


Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Pratyaksh Sharma <pr...@gmail.com>.
Thank you for the explanation Kabeer/Sudha.

Let me go through the flow and revert back in case of any further queries.

On Wed, Nov 20, 2019 at 6:21 AM Kabeer Ahmed <ka...@linuxmail.org> wrote:

> Pratyaksh,
>
> +1 to what Sudha has written. Lets zoom a bit closer.
> For hive, as you said, we explicitly set input format to
> HoodieParquetInputFormat.
> - HoodieParquetInputFormat extends MapredParquetInputFormat which is
> nothing but a input format for hive. Hive and Presto depend on this file to
> retrieve dataset from Hudi.
>
> For Spark, there is no such option to set this explicitly. Rather Spark
> starts reading the paths direct from the file system (HDFS or S3). From
> Spark the calls would be as below:
> - org.apache.spark.rdd.NewHadoopRDD.getPartitions
> - org.apache.parquet.hadoop.ParquetInputFormat.getSplits
> - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
>
> Now it is evident that we cant stick the HoodieParquetInputFormat. Rather
> we rely on the PathFilter class that allows us to filter out the paths (and
> files). So we explicitly set this in the Spark Hadoop Configuration (note
> that Spark uses Hadoop FS S3 implementation to read from S3).
>
> If you look into the file: HoodieROTablePathFilter, you will see that
> there is logic to ensure that folders (paths) or files for Hoodie related
> files always ensures that latest path/file is selected. Thus you do not see
> duplicate entries when you set. Without this, Spark is just plainly reading
> all the parquet files and displaying the data within them.
>
> It may take sometime from you to go through these paths and digest the
> flow. But should you still have any questions, please do not hesitate to
> revert back.
>
> Hope this helps
> Kabeer.
>
>
>
> Sent: Monday, November 18, 2019 at 7:47 PM
> From: "Bhavani Sudha" <bh...@gmail.com>
> To: dev@hudi.apache.org
> Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary Key
> Hi Pratyaksh,
>
> Let me try to answer this. I believe spark does not natively invoke
> HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
> queried, spark just loads all the data files in that partition without
> applying Hoodie filtering logic. Thats why we need to instruct Spark to
> read in the appropriate format in one of the two ways suggested by
> Vinoth/Kabeer earlier.
>
> Thanks,
> Sudha
>
> On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <pr...@gmail.com>
> wrote:
>
> > Hi Vinoth/Kabeer,
> >
> > I have one small doubt regarding what you proposed to fix the issue. Why
> is
> > HoodieParquetInputFormat class not able to handle deduplication of
> records
> > in case of spark while it is able to do so in case of presto and hive?
> >
> > On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org>
> wrote:
> >
> > > Sweet!
> > >
> > > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > > pushpavanthar@gmail.com> wrote:
> > >
> > > > Thanks Vinoth and Kabeer. It resolved my problem.
> > > >
> > > > Regards,
> > > > Purushotham Pushpavanth
> > > >
> > > >
> > > >
> > > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org>
> > wrote:
> > > >
> > > > > Adding to Vinoth's response, in spark-shell you just need to copy
> and
> > > > > paste the below line. Let us know if it still doesnt work.
> > > > >
> > > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
> > wrote:
> > > > > > Hi,
> > > > > >
> > > > > > are you setting the path filters when you query the Hudi Hive
> table
> > > via
> > > > > > Spark
> > > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > >
> http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > > alternatively)?
> > > > > >
> > > > > > - Vinoth
> > > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > > pushpavanthar@gmail.com> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > Below is a create statement on my Hudi dataset.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > *CREATE EXTERNAL TABLE
> > `inventory`.`customer`(`_hoodie_commit_time`
> > > > > string,
> > > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string,
> `id`
> > > > > bigint,
> > > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > > `tem_shipment`
> > > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > > 'org.apache.hadoop.hive.ql.io
> > .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > > INPUTFORMAT
> > > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > > > 'org.apache.hadoop.hive.ql.io
> > > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > >
> 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> > (
> > > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> > '1572952974',
> > > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > > >
> > > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> > in
> > > > > Hive,
> > > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > > *in Spark (All three share common Metastore).
> > > > > > > We are running Hudi in COW mode and we noticed that there are
> > > > multiple
> > > > > > > versions of the .parquet files
> > > > > > > written per partitions depending on number of updates coming to
> > > them
> > > > > over
> > > > > > > each batch execution. When queried from Hive and Presto
> > > > > > > for any Primary Key having multiple updates, it returns single
> > > record
> > > > > with
> > > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
> > of
> > > > > taking
> > > > > > > care of duplicates). Whereas, when I tried to execute the same
> > > query
> > > > > > > in Spark SQL, I get duplicated records for any Primary Key
> having
> > > > > multiple
> > > > > > > updates.
> > > > > > >
> > > > > > > Can someone help me understand why Spark is not able to handle
> > > > > > > deduplication of records across multiple commits which Presto
> and
> > > > Hive
> > > > > are
> > > > > > > able to do?
> > > > > > > I've taken care of providing
> hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > while
> > > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > > >
> > > > > > > Thanks in advance.
> > > > > > > Regards,
> > > > > > > Purushotham Pushpavanth
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Kabeer Ahmed <ka...@linuxmail.org>.
Pratyaksh, 

+1 to what Sudha has written. Lets zoom a bit closer. 
For hive, as you said, we explicitly set input format to HoodieParquetInputFormat.
- HoodieParquetInputFormat extends MapredParquetInputFormat which is nothing but a input format for hive. Hive and Presto depend on this file to retrieve dataset from Hudi.

For Spark, there is no such option to set this explicitly. Rather Spark starts reading the paths direct from the file system (HDFS or S3). From Spark the calls would be as below:
- org.apache.spark.rdd.NewHadoopRDD.getPartitions
- org.apache.parquet.hadoop.ParquetInputFormat.getSplits
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits

Now it is evident that we cant stick the HoodieParquetInputFormat. Rather we rely on the PathFilter class that allows us to filter out the paths (and files). So we explicitly set this in the Spark Hadoop Configuration (note that Spark uses Hadoop FS S3 implementation to read from S3).

If you look into the file: HoodieROTablePathFilter, you will see that there is logic to ensure that folders (paths) or files for Hoodie related files always ensures that latest path/file is selected. Thus you do not see duplicate entries when you set. Without this, Spark is just plainly reading all the parquet files and displaying the data within them. 

It may take sometime from you to go through these paths and digest the flow. But should you still have any questions, please do not hesitate to revert back. 

Hope this helps
Kabeer.
 
 

Sent: Monday, November 18, 2019 at 7:47 PM
From: "Bhavani Sudha" <bh...@gmail.com>
To: dev@hudi.apache.org
Subject: Re: Spark v2.3.2 : Duplicate entries found for each primary Key
Hi Pratyaksh,

Let me try to answer this. I believe spark does not natively invoke
HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
queried, spark just loads all the data files in that partition without
applying Hoodie filtering logic. Thats why we need to instruct Spark to
read in the appropriate format in one of the two ways suggested by
Vinoth/Kabeer earlier.

Thanks,
Sudha

On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <pr...@gmail.com>
wrote:

> Hi Vinoth/Kabeer,
>
> I have one small doubt regarding what you proposed to fix the issue. Why is
> HoodieParquetInputFormat class not able to handle deduplication of records
> in case of spark while it is able to do so in case of presto and hive?
>
> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org> wrote:
>
> > Sweet!
> >
> > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > pushpavanthar@gmail.com> wrote:
> >
> > > Thanks Vinoth and Kabeer. It resolved my problem.
> > >
> > > Regards,
> > > Purushotham Pushpavanth
> > >
> > >
> > >
> > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org>
> wrote:
> > >
> > > > Adding to Vinoth's response, in spark-shell you just need to copy and
> > > > paste the below line. Let us know if it still doesnt work.
> > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
> wrote:
> > > > > Hi,
> > > > >
> > > > > are you setting the path filters when you query the Hudi Hive table
> > via
> > > > > Spark
> > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > http://hudi.apache.org/querying_data.html#spark-rt-view[http://hudi.apache.org/querying_data.html#spark-rt-view]
> > > alternatively)?
> > > > >
> > > > > - Vinoth
> > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > pushpavanthar@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > > Below is a create statement on my Hudi dataset.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > *CREATE EXTERNAL TABLE
> `inventory`.`customer`(`_hoodie_commit_time`
> > > > string,
> > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> > > > bigint,
> > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > `tem_shipment`
> > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > INPUTFORMAT
> > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > > 'org.apache.hadoop.hive.ql.io
> > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> (
> > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> '1572952974',
> > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > >
> > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> in
> > > > Hive,
> > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > *in Spark (All three share common Metastore).
> > > > > > We are running Hudi in COW mode and we noticed that there are
> > > multiple
> > > > > > versions of the .parquet files
> > > > > > written per partitions depending on number of updates coming to
> > them
> > > > over
> > > > > > each batch execution. When queried from Hive and Presto
> > > > > > for any Primary Key having multiple updates, it returns single
> > record
> > > > with
> > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
> of
> > > > taking
> > > > > > care of duplicates). Whereas, when I tried to execute the same
> > query
> > > > > > in Spark SQL, I get duplicated records for any Primary Key having
> > > > multiple
> > > > > > updates.
> > > > > >
> > > > > > Can someone help me understand why Spark is not able to handle
> > > > > > deduplication of records across multiple commits which Presto and
> > > Hive
> > > > are
> > > > > > able to do?
> > > > > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > while
> > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > >
> > > > > > Thanks in advance.
> > > > > > Regards,
> > > > > > Purushotham Pushpavanth
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Bhavani Sudha <bh...@gmail.com>.
Hi Pratyaksh,

Let me try to answer this. I believe spark does not natively invoke
HoodieParquetInputFormat.getSplits() like Hive and Presto does. So when
queried, spark just loads all the data files in that partition without
applying Hoodie filtering logic. Thats why we need to instruct Spark to
read in the appropriate format in one of the two ways suggested by
Vinoth/Kabeer earlier.

Thanks,
Sudha

On Mon, Nov 18, 2019 at 12:16 AM Pratyaksh Sharma <pr...@gmail.com>
wrote:

> Hi Vinoth/Kabeer,
>
> I have one small doubt regarding what you proposed to fix the issue. Why is
> HoodieParquetInputFormat class not able to handle deduplication of records
> in case of spark while it is able to do so in case of presto and hive?
>
> On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org> wrote:
>
> > Sweet!
> >
> > On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> > pushpavanthar@gmail.com> wrote:
> >
> > > Thanks Vinoth and Kabeer. It resolved my problem.
> > >
> > > Regards,
> > > Purushotham Pushpavanth
> > >
> > >
> > >
> > > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org>
> wrote:
> > >
> > > > Adding to Vinoth's response, in spark-shell you just need to copy and
> > > > paste the below line. Let us know if it still doesnt work.
> > > >
> > > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org>
> wrote:
> > > > > Hi,
> > > > >
> > > > > are you setting the path filters when you query the Hudi Hive table
> > via
> > > > > Spark
> > > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > > http://hudi.apache.org/querying_data.html#spark-rt-view
> > > alternatively)?
> > > > >
> > > > > - Vinoth
> > > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > > pushpavanthar@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > > Below is a create statement on my Hudi dataset.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > *CREATE EXTERNAL TABLE
> `inventory`.`customer`(`_hoodie_commit_time`
> > > > string,
> > > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> > > > bigint,
> > > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > > `tem_shipment`
> > > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > > 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'WITH
> > > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> > INPUTFORMAT
> > > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > > 'org.apache.hadoop.hive.ql.io
> > > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES
> (
> > > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' =
> '1572952974',
> > > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > > >
> > > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar*
> in
> > > > Hive,
> > > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > > *in Spark (All three share common Metastore).
> > > > > > We are running Hudi in COW mode and we noticed that there are
> > > multiple
> > > > > > versions of the .parquet files
> > > > > > written per partitions depending on number of updates coming to
> > them
> > > > over
> > > > > > each batch execution. When queried from Hive and Presto
> > > > > > for any Primary Key having multiple updates, it returns single
> > record
> > > > with
> > > > > > latest state(I assume *HoodieParquetInputFormat* does the magic
> of
> > > > taking
> > > > > > care of duplicates). Whereas, when I tried to execute the same
> > query
> > > > > > in Spark SQL, I get duplicated records for any Primary Key having
> > > > multiple
> > > > > > updates.
> > > > > >
> > > > > > Can someone help me understand why Spark is not able to handle
> > > > > > deduplication of records across multiple commits which Presto and
> > > Hive
> > > > are
> > > > > > able to do?
> > > > > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > while
> > > > > > starting spark-shell. Is there something that I'm missing?
> > > > > >
> > > > > > Thanks in advance.
> > > > > > Regards,
> > > > > > Purushotham Pushpavanth
> > > > >
> > > > >
> > > >
> > > >
> > >
> >
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Pratyaksh Sharma <pr...@gmail.com>.
Hi Vinoth/Kabeer,

I have one small doubt regarding what you proposed to fix the issue. Why is
HoodieParquetInputFormat class not able to handle deduplication of records
in case of spark while it is able to do so in case of presto and hive?

On Sun, Nov 17, 2019 at 4:08 AM Vinoth Chandar <vi...@apache.org> wrote:

> Sweet!
>
> On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
> pushpavanthar@gmail.com> wrote:
>
> > Thanks Vinoth and Kabeer. It resolved my problem.
> >
> > Regards,
> > Purushotham Pushpavanth
> >
> >
> >
> > On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org> wrote:
> >
> > > Adding to Vinoth's response, in spark-shell you just need to copy and
> > > paste the below line. Let us know if it still doesnt work.
> > >
> > >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > > classOf[org.apache.hadoop.fs.PathFilter]);
> > > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org> wrote:
> > > > Hi,
> > > >
> > > > are you setting the path filters when you query the Hudi Hive table
> via
> > > > Spark
> > > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > > http://hudi.apache.org/querying_data.html#spark-rt-view
> > alternatively)?
> > > >
> > > > - Vinoth
> > > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > > pushpavanthar@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > > Below is a create statement on my Hudi dataset.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > *CREATE EXTERNAL TABLE `inventory`.`customer`(`_hoodie_commit_time`
> > > string,
> > > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> > > bigint,
> > > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> > `tem_shipment`
> > > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH
> > > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS
> INPUTFORMAT
> > > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > > 'org.apache.hadoop.hive.ql.io
> > > .parquet.MapredParquetOutputFormat'LOCATION
> > > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
> > > > > 'bucketing_version' = '2', 'transient_lastDdlTime' = '1572952974',
> > > > > 'last_commit_time_sync' = '20191114192136')*
> > > > >
> > > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar* in
> > > Hive,
> > > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > > *in Spark (All three share common Metastore).
> > > > > We are running Hudi in COW mode and we noticed that there are
> > multiple
> > > > > versions of the .parquet files
> > > > > written per partitions depending on number of updates coming to
> them
> > > over
> > > > > each batch execution. When queried from Hive and Presto
> > > > > for any Primary Key having multiple updates, it returns single
> record
> > > with
> > > > > latest state(I assume *HoodieParquetInputFormat* does the magic of
> > > taking
> > > > > care of duplicates). Whereas, when I tried to execute the same
> query
> > > > > in Spark SQL, I get duplicated records for any Primary Key having
> > > multiple
> > > > > updates.
> > > > >
> > > > > Can someone help me understand why Spark is not able to handle
> > > > > deduplication of records across multiple commits which Presto and
> > Hive
> > > are
> > > > > able to do?
> > > > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > while
> > > > > starting spark-shell. Is there something that I'm missing?
> > > > >
> > > > > Thanks in advance.
> > > > > Regards,
> > > > > Purushotham Pushpavanth
> > > >
> > > >
> > >
> > >
> >
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Vinoth Chandar <vi...@apache.org>.
Sweet!

On Sat, Nov 16, 2019 at 10:16 AM Purushotham Pushpavanthar <
pushpavanthar@gmail.com> wrote:

> Thanks Vinoth and Kabeer. It resolved my problem.
>
> Regards,
> Purushotham Pushpavanth
>
>
>
> On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org> wrote:
>
> > Adding to Vinoth's response, in spark-shell you just need to copy and
> > paste the below line. Let us know if it still doesnt work.
> >
> >
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> > classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> > classOf[org.apache.hadoop.fs.PathFilter]);
> > On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org> wrote:
> > > Hi,
> > >
> > > are you setting the path filters when you query the Hudi Hive table via
> > > Spark
> > > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > > http://hudi.apache.org/querying_data.html#spark-rt-view
> alternatively)?
> > >
> > > - Vinoth
> > > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > > pushpavanthar@gmail.com> wrote:
> > >
> > > > Hi,
> > > > Below is a create statement on my Hudi dataset.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > *CREATE EXTERNAL TABLE `inventory`.`customer`(`_hoodie_commit_time`
> > string,
> > > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> > bigint,
> > > > `sales` bigint, `merchant` bigint, `item_status` bigint,
> `tem_shipment`
> > > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH
> > > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS INPUTFORMAT
> > > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > > 'org.apache.hadoop.hive.ql.io
> > .parquet.MapredParquetOutputFormat'LOCATION
> > > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
> > > > 'bucketing_version' = '2', 'transient_lastDdlTime' = '1572952974',
> > > > 'last_commit_time_sync' = '20191114192136')*
> > > >
> > > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar* in
> > Hive,
> > > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > > *in Spark (All three share common Metastore).
> > > > We are running Hudi in COW mode and we noticed that there are
> multiple
> > > > versions of the .parquet files
> > > > written per partitions depending on number of updates coming to them
> > over
> > > > each batch execution. When queried from Hive and Presto
> > > > for any Primary Key having multiple updates, it returns single record
> > with
> > > > latest state(I assume *HoodieParquetInputFormat* does the magic of
> > taking
> > > > care of duplicates). Whereas, when I tried to execute the same query
> > > > in Spark SQL, I get duplicated records for any Primary Key having
> > multiple
> > > > updates.
> > > >
> > > > Can someone help me understand why Spark is not able to handle
> > > > deduplication of records across multiple commits which Presto and
> Hive
> > are
> > > > able to do?
> > > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> while
> > > > starting spark-shell. Is there something that I'm missing?
> > > >
> > > > Thanks in advance.
> > > > Regards,
> > > > Purushotham Pushpavanth
> > >
> > >
> >
> >
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Purushotham Pushpavanthar <pu...@gmail.com>.
Thanks Vinoth and Kabeer. It resolved my problem.

Regards,
Purushotham Pushpavanth



On Fri, 15 Nov 2019 at 20:16, Kabeer Ahmed <ka...@linuxmail.org> wrote:

> Adding to Vinoth's response, in spark-shell you just need to copy and
> paste the below line. Let us know if it still doesnt work.
>
> spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",
> classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],
> classOf[org.apache.hadoop.fs.PathFilter]);
> On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org> wrote:
> > Hi,
> >
> > are you setting the path filters when you query the Hudi Hive table via
> > Spark
> > http://hudi.apache.org/querying_data.html#spark-ro-view (or
> > http://hudi.apache.org/querying_data.html#spark-rt-view alternatively)?
> >
> > - Vinoth
> > On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> > pushpavanthar@gmail.com> wrote:
> >
> > > Hi,
> > > Below is a create statement on my Hudi dataset.
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > *CREATE EXTERNAL TABLE `inventory`.`customer`(`_hoodie_commit_time`
> string,
> > > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id`
> bigint,
> > > `sales` bigint, `merchant` bigint, `item_status` bigint, `tem_shipment`
> > > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH
> > > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS INPUTFORMAT
> > > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > > 'org.apache.hadoop.hive.ql.io
> .parquet.MapredParquetOutputFormat'LOCATION
> > > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
> > > 'bucketing_version' = '2', 'transient_lastDdlTime' = '1572952974',
> > > 'last_commit_time_sync' = '20191114192136')*
> > >
> > > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar* in
> Hive,
> > > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > > *in Spark (All three share common Metastore).
> > > We are running Hudi in COW mode and we noticed that there are multiple
> > > versions of the .parquet files
> > > written per partitions depending on number of updates coming to them
> over
> > > each batch execution. When queried from Hive and Presto
> > > for any Primary Key having multiple updates, it returns single record
> with
> > > latest state(I assume *HoodieParquetInputFormat* does the magic of
> taking
> > > care of duplicates). Whereas, when I tried to execute the same query
> > > in Spark SQL, I get duplicated records for any Primary Key having
> multiple
> > > updates.
> > >
> > > Can someone help me understand why Spark is not able to handle
> > > deduplication of records across multiple commits which Presto and Hive
> are
> > > able to do?
> > > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar while
> > > starting spark-shell. Is there something that I'm missing?
> > >
> > > Thanks in advance.
> > > Regards,
> > > Purushotham Pushpavanth
> >
> >
>
>

Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Kabeer Ahmed <ka...@linuxmail.org>.
Adding to Vinoth's response, in spark-shell you just need to copy and paste the below line. Let us know if it still doesnt work.

spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]);
On Nov 15 2019, at 1:37 pm, Vinoth Chandar <vi...@apache.org> wrote:
> Hi,
>
> are you setting the path filters when you query the Hudi Hive table via
> Spark
> http://hudi.apache.org/querying_data.html#spark-ro-view (or
> http://hudi.apache.org/querying_data.html#spark-rt-view alternatively)?
>
> - Vinoth
> On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
> pushpavanthar@gmail.com> wrote:
>
> > Hi,
> > Below is a create statement on my Hudi dataset.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *CREATE EXTERNAL TABLE `inventory`.`customer`(`_hoodie_commit_time` string,
> > `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> > `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint,
> > `sales` bigint, `merchant` bigint, `item_status` bigint, `tem_shipment`
> > bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH
> > SERDEPROPERTIES ( 'serialization.format' = '1')STORED AS INPUTFORMAT
> > 'org.apache.hudi.hadoop.HoodieParquetInputFormat' OUTPUTFORMAT
> > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION
> > 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
> > 'bucketing_version' = '2', 'transient_lastDdlTime' = '1572952974',
> > 'last_commit_time_sync' = '20191114192136')*
> >
> > I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar* in Hive,
> > *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> > *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> > *in Spark (All three share common Metastore).
> > We are running Hudi in COW mode and we noticed that there are multiple
> > versions of the .parquet files
> > written per partitions depending on number of updates coming to them over
> > each batch execution. When queried from Hive and Presto
> > for any Primary Key having multiple updates, it returns single record with
> > latest state(I assume *HoodieParquetInputFormat* does the magic of taking
> > care of duplicates). Whereas, when I tried to execute the same query
> > in Spark SQL, I get duplicated records for any Primary Key having multiple
> > updates.
> >
> > Can someone help me understand why Spark is not able to handle
> > deduplication of records across multiple commits which Presto and Hive are
> > able to do?
> > I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar while
> > starting spark-shell. Is there something that I'm missing?
> >
> > Thanks in advance.
> > Regards,
> > Purushotham Pushpavanth
>
>


Re: Spark v2.3.2 : Duplicate entries found for each primary Key

Posted by Vinoth Chandar <vi...@apache.org>.
Hi,

are you setting the path filters when you query the Hudi Hive table via
Spark
http://hudi.apache.org/querying_data.html#spark-ro-view (or
http://hudi.apache.org/querying_data.html#spark-rt-view alternatively)?

- Vinoth

On Fri, Nov 15, 2019 at 5:03 AM Purushotham Pushpavanthar <
pushpavanthar@gmail.com> wrote:

> Hi,
>
> Below is a create statement on my Hudi dataset.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *CREATE EXTERNAL TABLE `inventory`.`customer`(`_hoodie_commit_time` string,
> `_hoodie_commit_seqno` string, `_hoodie_record_key` string,
> `_hoodie_partition_path` string, `_hoodie_file_name` string, `id` bigint,
> `sales` bigint, `merchant` bigint, `item_status` bigint, `tem_shipment`
> bigint)PARTITIONED BY (`dt` string)ROW FORMAT SERDE
> 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'WITH
> SERDEPROPERTIES (  'serialization.format' = '1')STORED AS  INPUTFORMAT
> 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION
> 's3://<warehouse-bucket>/<path>/inventory/customer'TBLPROPERTIES (
> 'bucketing_version' = '2',  'transient_lastDdlTime' = '1572952974',
> 'last_commit_time_sync' = '20191114192136')*
>
> I've taken care of adding *hudi-hive-bundle-0.5.1-SNAPSHOT.jar* in Hive,
> *hudi-presto-bundle-0.5.1-SNAPSHOT.jar* in Presto and
> *hudi-spark-bundle-0.5.1-SNAPSHOT.jar
> *in Spark (All three share common Metastore).
> We are running Hudi in COW mode and we noticed that there are multiple
> versions of the .parquet files
> written per partitions depending on number of updates coming to them over
> each batch execution. When queried from Hive and Presto
> for any Primary Key having multiple updates, it returns single record with
> latest state(I assume *HoodieParquetInputFormat* does the magic of taking
> care of duplicates). Whereas, when I tried to execute the same query
> in Spark SQL, I get duplicated records for any Primary Key having multiple
> updates.
>
> Can someone help me understand why Spark is not able to handle
> deduplication of records across multiple commits which Presto and Hive are
> able to do?
> I've taken care of providing hudi-spark-bundle-0.5.1-SNAPSHOT.jar while
> starting spark-shell. Is there something that I'm missing?
>
> Thanks in advance.
>
> Regards,
> Purushotham Pushpavanth
>