You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by v-shaal <vi...@gmail.com> on 2020/01/21 09:14:32 UTC
What is best approach, for kafkaStreamer/ dataStreamer to check if
key exists and update
Hi
I have a kafka streamer to load data into a cache. while loading I want to
upsert the data in a way that there can be more than one stream updating the
value existing in cache.
for example
KafkaStreamer<AffinityKey<Long>, UWTransactions> kafkaStreamer = new
KafkaStreamer<>();
IgniteDataStreamer<AffinityKey<Long>, UWTransactions> stmr
=Ignition.ignite().dataStreamer(KAFKA_CACHE);
kafkaStreamer.setIgnite(ignite);
kafkaStreamer.setStreamer(stmr);
kafkaStreamer.setMultipleTupleExtractor(
record -> {
Map<Long, UWTransactions> entries = new HashMap<>();
try {
ObjectMapper mapper = new ObjectMapper();
Person person = mapper.readValue(record.value().toString(),
Person.class);
entries.put(person.userId, person);
}
catch (Exception ex) {
fail("Unexpected error." + ex);
}
return entries;
});
now say Person Class has following attribute:
1) name
2) salary
3) age
4) address
5)userId
and there is another stream, and another kafkaStreamer which only has userId
and salary info
ie. say Salary class with attribute
1)Salary
2) userId
and I want to check if the UserId key exist and update only Salary component
, keeping (name,age,address) as pervious
How can I achieve it . and what is the best way?
--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: What is best approach, for kafkaStreamer/ dataStreamer to check
if key exists and update
Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello!
You can supply StreamReceiver to DataStreamer which will be used for
resolving upsert conflicts.
However, I have no idea about KafkaStreamer.
Regards,
--
Ilya Kasnacheev
вт, 21 янв. 2020 г. в 12:14, v-shaal <vi...@gmail.com>:
> Hi
>
> I have a kafka streamer to load data into a cache. while loading I want to
> upsert the data in a way that there can be more than one stream updating
> the
> value existing in cache.
>
> for example
>
> KafkaStreamer<AffinityKey<Long>, UWTransactions> kafkaStreamer = new
> KafkaStreamer<>();
> IgniteDataStreamer<AffinityKey<Long>, UWTransactions> stmr
> =Ignition.ignite().dataStreamer(KAFKA_CACHE);
>
> kafkaStreamer.setIgnite(ignite);
> kafkaStreamer.setStreamer(stmr);
>
> kafkaStreamer.setMultipleTupleExtractor(
> record -> {
> Map<Long, UWTransactions> entries = new HashMap<>();
> try {
> ObjectMapper mapper = new ObjectMapper();
>
> Person person = mapper.readValue(record.value().toString(),
> Person.class);
> entries.put(person.userId, person);
> }
> catch (Exception ex) {
> fail("Unexpected error." + ex);
> }
> return entries;
> });
>
> now say Person Class has following attribute:
> 1) name
> 2) salary
> 3) age
> 4) address
> 5)userId
>
> and there is another stream, and another kafkaStreamer which only has
> userId
> and salary info
> ie. say Salary class with attribute
> 1)Salary
> 2) userId
>
> and I want to check if the UserId key exist and update only Salary
> component
> , keeping (name,age,address) as pervious
>
> How can I achieve it . and what is the best way?
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>