You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by v-shaal <vi...@gmail.com> on 2020/01/21 09:14:32 UTC

What is best approach, for kafkaStreamer/ dataStreamer to check if key exists and update

Hi

I have a kafka streamer to load data into a cache.  while loading I want to
upsert the data in a way that there can be more than one stream updating the
value existing in cache.

for example

KafkaStreamer<AffinityKey&lt;Long>, UWTransactions> kafkaStreamer = new
KafkaStreamer<>();
IgniteDataStreamer<AffinityKey&lt;Long>, UWTransactions> stmr
=Ignition.ignite().dataStreamer(KAFKA_CACHE);

kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);

kafkaStreamer.setMultipleTupleExtractor(
            record -> {
              Map<Long, UWTransactions> entries = new HashMap<>();
              try {
                ObjectMapper mapper = new ObjectMapper();
               
               Person person = mapper.readValue(record.value().toString(),
Person.class);
                entries.put(person.userId, person);
              }
              catch (Exception ex) {
                fail("Unexpected error." + ex);
              }
              return entries;
            });

now say Person Class has following attribute:
1) name
2) salary
3) age
4) address
5)userId

and there is another stream, and another kafkaStreamer which only has userId
and salary info
ie. say Salary class with attribute
1)Salary
2) userId

and I want to check if the UserId key exist and update only Salary component 
, keeping (name,age,address) as pervious

How can I achieve it . and what is the best way?



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: What is best approach, for kafkaStreamer/ dataStreamer to check if key exists and update

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!

You can supply StreamReceiver to DataStreamer which will be used for
resolving upsert conflicts.

However, I have no idea about KafkaStreamer.

Regards,
-- 
Ilya Kasnacheev


вт, 21 янв. 2020 г. в 12:14, v-shaal <vi...@gmail.com>:

> Hi
>
> I have a kafka streamer to load data into a cache.  while loading I want to
> upsert the data in a way that there can be more than one stream updating
> the
> value existing in cache.
>
> for example
>
> KafkaStreamer<AffinityKey&lt;Long>, UWTransactions> kafkaStreamer = new
> KafkaStreamer<>();
> IgniteDataStreamer<AffinityKey&lt;Long>, UWTransactions> stmr
> =Ignition.ignite().dataStreamer(KAFKA_CACHE);
>
> kafkaStreamer.setIgnite(ignite);
> kafkaStreamer.setStreamer(stmr);
>
> kafkaStreamer.setMultipleTupleExtractor(
>             record -> {
>               Map<Long, UWTransactions> entries = new HashMap<>();
>               try {
>                 ObjectMapper mapper = new ObjectMapper();
>
>                Person person = mapper.readValue(record.value().toString(),
> Person.class);
>                 entries.put(person.userId, person);
>               }
>               catch (Exception ex) {
>                 fail("Unexpected error." + ex);
>               }
>               return entries;
>             });
>
> now say Person Class has following attribute:
> 1) name
> 2) salary
> 3) age
> 4) address
> 5)userId
>
> and there is another stream, and another kafkaStreamer which only has
> userId
> and salary info
> ie. say Salary class with attribute
> 1)Salary
> 2) userId
>
> and I want to check if the UserId key exist and update only Salary
> component
> , keeping (name,age,address) as pervious
>
> How can I achieve it . and what is the best way?
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>