You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hudi.apache.org by Tanuj <ta...@gmail.com> on 2020/08/11 09:32:58 UTC

Recommendation to load HUDI data across partitions

Hi,
I have a problem statement where I am consuming messages from Kafka and then depending upon that Kafka message (2K records) I need to query Hudi table and create a dataset (with both updates and inserts) and push them back to Hudi table.

I tried following but it threw NP exception from sparkSession scala code and rightly so as sparkSession was used in Executor.

 Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new FlatMapFunction<KafkaRecord, HudiRecord>() {
            @Override
            public Iterator<HudiRecord> call(KafkaRecord kafkaRecord) throws Exception {
                String prop1= kafkaRecord.getProp1();
                String prop2= kafkaRecord.getProp2();
                HudiRecord hudiRecord =  sparkSession.read()
                        .format(HUDI_DATASOURCE)
                        .schema(<schema>)
                        .load(<path>)
                        .as(Encoders.bean((HudiRecord.class)))
                        .filter(<FILTER_ON_KAFKA_RECORD> say prop1);
                hudiRecord = tranform();
                // Modificiation in hudi record
                return Arrays.asList(kafkaRecord, hudiRecord).iterator();
                }

            }
        }, Encoders.bean(CompanyStatusGoldenRecord.class));

In HUDI, I have 2 level of partitions (year and month) so for eg if I get 2K records from Kafka which will be spanned across multiple partitions - what is advisable load first the full table like "/*/*/*" or first read kafka record, find out which partitions need to be hit and then load only those HUDI tables as per partitions .I believe 2nd option would be faster i.e. loading the specific partitions and thats what I was trying in above snippet of code. So if have to leverage partitions, is collect() on Kafka Dataset to get the list of partitions  and then supply to HUDI is the only option or I can do it just with the spark datasets ?

Re: Recommendation to load HUDI data across partitions

Posted by Vinoth Chandar <vi...@apache.org>.
Great! glad you got it working!

On Fri, Aug 14, 2020 at 6:46 PM tanu dua <ta...@gmail.com> wrote:

> Thanks Vinoth for detailed explanation and I was about to reply you that it
> worked and followed most of the steps that you mentioned below.
> Used forEachBatch() of stream to process the batch data from kafka and then
> finding out the partitions using aggregate functions on Kafka Dataset and
> then feed those partitions using Glob Pattern to Hudi to get hudiDs
> Then performed join on both Ds , I had some complex logic to deduce from
> both kafkaDs and hudiDs and hence using flatMap but I am now able to remove
> flatMap and could use Dataset joins.
>
> Thanks again for all your help as always !!
>
>
>
>
> On Thu, Aug 13, 2020 at 1:42 PM Vinoth Chandar <vi...@apache.org> wrote:
>
> > Hi Tanuj,
> >
> > From this example, it appears as if you are trying to use sparkSession
> from
> > within the executor? This will be problematic. Can you please open a
> > support ticket with the full stack trace?
> >
> > I think what you are describing is a join between Kafka and Hudi tables.
> So
> > I'd read from Kafka first, cache the 2K messages in memory, find out what
> > partitions they belong to, and only load those affected partitions
> instead
> > of the entire table.
> > At this point, you will have two datasets : kafkaDF and hudiDF (or RDD or
> > DataSet.. my suggestion remains valid)
> > And instead of hand crafting the join at the record level, like you have.
> > you can just use RDD/DataSet level join operations and then get a
> resultDF
> >
> > then you do a resultDF.write.format("hudi") and you are done?
> >
> > On Tue, Aug 11, 2020 at 2:33 AM Tanuj <ta...@gmail.com> wrote:
> >
> > > Hi,
> > > I have a problem statement where I am consuming messages from Kafka and
> > > then depending upon that Kafka message (2K records) I need to query
> Hudi
> > > table and create a dataset (with both updates and inserts) and push
> them
> > > back to Hudi table.
> > >
> > > I tried following but it threw NP exception from sparkSession scala
> code
> > > and rightly so as sparkSession was used in Executor.
> > >
> > >  Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new
> > > FlatMapFunction<KafkaRecord, HudiRecord>() {
> > >             @Override
> > >             public Iterator<HudiRecord> call(KafkaRecord kafkaRecord)
> > > throws Exception {
> > >                 String prop1= kafkaRecord.getProp1();
> > >                 String prop2= kafkaRecord.getProp2();
> > >                 HudiRecord hudiRecord =  sparkSession.read()
> > >                         .format(HUDI_DATASOURCE)
> > >                         .schema(<schema>)
> > >                         .load(<path>)
> > >                         .as(Encoders.bean((HudiRecord.class)))
> > >                         .filter(<FILTER_ON_KAFKA_RECORD> say prop1);
> > >                 hudiRecord = tranform();
> > >                 // Modificiation in hudi record
> > >                 return Arrays.asList(kafkaRecord,
> hudiRecord).iterator();
> > >                 }
> > >
> > >             }
> > >         }, Encoders.bean(CompanyStatusGoldenRecord.class));
> > >
> > > In HUDI, I have 2 level of partitions (year and month) so for eg if I
> get
> > > 2K records from Kafka which will be spanned across multiple partitions
> -
> > > what is advisable load first the full table like "/*/*/*" or first read
> > > kafka record, find out which partitions need to be hit and then load
> only
> > > those HUDI tables as per partitions .I believe 2nd option would be
> faster
> > > i.e. loading the specific partitions and thats what I was trying in
> above
> > > snippet of code. So if have to leverage partitions, is collect() on
> Kafka
> > > Dataset to get the list of partitions  and then supply to HUDI is the
> > only
> > > option or I can do it just with the spark datasets ?
> > >
> >
>

Re: Recommendation to load HUDI data across partitions

Posted by tanu dua <ta...@gmail.com>.
Thanks Vinoth for detailed explanation and I was about to reply you that it
worked and followed most of the steps that you mentioned below.
Used forEachBatch() of stream to process the batch data from kafka and then
finding out the partitions using aggregate functions on Kafka Dataset and
then feed those partitions using Glob Pattern to Hudi to get hudiDs
Then performed join on both Ds , I had some complex logic to deduce from
both kafkaDs and hudiDs and hence using flatMap but I am now able to remove
flatMap and could use Dataset joins.

Thanks again for all your help as always !!




On Thu, Aug 13, 2020 at 1:42 PM Vinoth Chandar <vi...@apache.org> wrote:

> Hi Tanuj,
>
> From this example, it appears as if you are trying to use sparkSession from
> within the executor? This will be problematic. Can you please open a
> support ticket with the full stack trace?
>
> I think what you are describing is a join between Kafka and Hudi tables. So
> I'd read from Kafka first, cache the 2K messages in memory, find out what
> partitions they belong to, and only load those affected partitions instead
> of the entire table.
> At this point, you will have two datasets : kafkaDF and hudiDF (or RDD or
> DataSet.. my suggestion remains valid)
> And instead of hand crafting the join at the record level, like you have.
> you can just use RDD/DataSet level join operations and then get a resultDF
>
> then you do a resultDF.write.format("hudi") and you are done?
>
> On Tue, Aug 11, 2020 at 2:33 AM Tanuj <ta...@gmail.com> wrote:
>
> > Hi,
> > I have a problem statement where I am consuming messages from Kafka and
> > then depending upon that Kafka message (2K records) I need to query Hudi
> > table and create a dataset (with both updates and inserts) and push them
> > back to Hudi table.
> >
> > I tried following but it threw NP exception from sparkSession scala code
> > and rightly so as sparkSession was used in Executor.
> >
> >  Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new
> > FlatMapFunction<KafkaRecord, HudiRecord>() {
> >             @Override
> >             public Iterator<HudiRecord> call(KafkaRecord kafkaRecord)
> > throws Exception {
> >                 String prop1= kafkaRecord.getProp1();
> >                 String prop2= kafkaRecord.getProp2();
> >                 HudiRecord hudiRecord =  sparkSession.read()
> >                         .format(HUDI_DATASOURCE)
> >                         .schema(<schema>)
> >                         .load(<path>)
> >                         .as(Encoders.bean((HudiRecord.class)))
> >                         .filter(<FILTER_ON_KAFKA_RECORD> say prop1);
> >                 hudiRecord = tranform();
> >                 // Modificiation in hudi record
> >                 return Arrays.asList(kafkaRecord, hudiRecord).iterator();
> >                 }
> >
> >             }
> >         }, Encoders.bean(CompanyStatusGoldenRecord.class));
> >
> > In HUDI, I have 2 level of partitions (year and month) so for eg if I get
> > 2K records from Kafka which will be spanned across multiple partitions -
> > what is advisable load first the full table like "/*/*/*" or first read
> > kafka record, find out which partitions need to be hit and then load only
> > those HUDI tables as per partitions .I believe 2nd option would be faster
> > i.e. loading the specific partitions and thats what I was trying in above
> > snippet of code. So if have to leverage partitions, is collect() on Kafka
> > Dataset to get the list of partitions  and then supply to HUDI is the
> only
> > option or I can do it just with the spark datasets ?
> >
>

Re: Recommendation to load HUDI data across partitions

Posted by Vinoth Chandar <vi...@apache.org>.
Hi Tanuj,

From this example, it appears as if you are trying to use sparkSession from
within the executor? This will be problematic. Can you please open a
support ticket with the full stack trace?

I think what you are describing is a join between Kafka and Hudi tables. So
I'd read from Kafka first, cache the 2K messages in memory, find out what
partitions they belong to, and only load those affected partitions instead
of the entire table.
At this point, you will have two datasets : kafkaDF and hudiDF (or RDD or
DataSet.. my suggestion remains valid)
And instead of hand crafting the join at the record level, like you have.
you can just use RDD/DataSet level join operations and then get a resultDF

then you do a resultDF.write.format("hudi") and you are done?

On Tue, Aug 11, 2020 at 2:33 AM Tanuj <ta...@gmail.com> wrote:

> Hi,
> I have a problem statement where I am consuming messages from Kafka and
> then depending upon that Kafka message (2K records) I need to query Hudi
> table and create a dataset (with both updates and inserts) and push them
> back to Hudi table.
>
> I tried following but it threw NP exception from sparkSession scala code
> and rightly so as sparkSession was used in Executor.
>
>  Dataset<KafkaRecord> hudiDs = companyStatusDf.flatMap(new
> FlatMapFunction<KafkaRecord, HudiRecord>() {
>             @Override
>             public Iterator<HudiRecord> call(KafkaRecord kafkaRecord)
> throws Exception {
>                 String prop1= kafkaRecord.getProp1();
>                 String prop2= kafkaRecord.getProp2();
>                 HudiRecord hudiRecord =  sparkSession.read()
>                         .format(HUDI_DATASOURCE)
>                         .schema(<schema>)
>                         .load(<path>)
>                         .as(Encoders.bean((HudiRecord.class)))
>                         .filter(<FILTER_ON_KAFKA_RECORD> say prop1);
>                 hudiRecord = tranform();
>                 // Modificiation in hudi record
>                 return Arrays.asList(kafkaRecord, hudiRecord).iterator();
>                 }
>
>             }
>         }, Encoders.bean(CompanyStatusGoldenRecord.class));
>
> In HUDI, I have 2 level of partitions (year and month) so for eg if I get
> 2K records from Kafka which will be spanned across multiple partitions -
> what is advisable load first the full table like "/*/*/*" or first read
> kafka record, find out which partitions need to be hit and then load only
> those HUDI tables as per partitions .I believe 2nd option would be faster
> i.e. loading the specific partitions and thats what I was trying in above
> snippet of code. So if have to leverage partitions, is collect() on Kafka
> Dataset to get the list of partitions  and then supply to HUDI is the only
> option or I can do it just with the spark datasets ?
>