You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Stas Girkin <st...@gmail.com> on 2019/09/27 14:58:57 UTC

Map Reduce over cache items, where values are sequences

Hello everyone,

I would like to use MapReduce over cache items representing events happened
in a process to calculate certain statistics. Could you be so kind to help
me how can I do that with apache ignite?

I have tens of millions of processes that happened in the past. The
processes look like a sequence of events [event1, event2, event3, ...
eventN], where number of events per process could vary (50-100). Every
event has certain sets of attributes like timestamp, event type, set of
metrics. I put these data to a cache as process_id => [e1, e2, e3, e4,
...]. What I would like to get is to get a histogram how often event of a
certain type happens in all the processes or processes that have certain
condition. What I managed to do is to broadcast a callable that lands on
ignite nodes and can access local cache items and counts what I want and
returns it back to the caller in K chunks which I have to aggregate on the
client.

Ignite localIgnite = Ignition.localIgnite();
IgniteCache<String, MyProcess> localCache = localIgnite.cache("processes");
MyHistogram hist = new MyHistogram()
for (Cache.Entry<String, MyProcess> e : localCache.localEntries()) {
    hist.process(e.getValue());
}
return hist;

The problem with the approach is it utilizes only a single core on the
ignite node, while I have 64. How could I do something similar in more
efficient manner?

thank you in advance.

Re: Map Reduce over cache items, where values are sequences

Posted by Evgenii Zhuravlev <e....@gmail.com>.
For example, query with partition will look like :

QueryCursor<Cache.Entry<Integer, Integer>> qry =
    cache.query(new ScanQuery<Integer, Integer>().setPartition(part));


вт, 1 окт. 2019 г. в 15:25, Evgenii Zhuravlev <e....@gmail.com>:

> Hi,
>
> To parallelise everything properly, I would recommend starting an
> affinityCallable per partition(1024 by default). Inside this compute job,
> you can collect information for the certain partition only using
> ScanQuery(or SQLQuery)
>
>
> пт, 27 сент. 2019 г. в 18:09, Stas Girkin <st...@gmail.com>:
>
>> Hello everyone,
>>
>> I would like to use MapReduce over cache items representing events
>> happened in a process to calculate certain statistics. Could you be so kind
>> to help me how can I do that with apache ignite?
>>
>> I have tens of millions of processes that happened in the past. The
>> processes look like a sequence of events [event1, event2, event3, ...
>> eventN], where number of events per process could vary (50-100). Every
>> event has certain sets of attributes like timestamp, event type, set of
>> metrics. I put these data to a cache as process_id => [e1, e2, e3, e4,
>> ...]. What I would like to get is to get a histogram how often event of a
>> certain type happens in all the processes or processes that have certain
>> condition. What I managed to do is to broadcast a callable that lands on
>> ignite nodes and can access local cache items and counts what I want and
>> returns it back to the caller in K chunks which I have to aggregate on the
>> client.
>>
>> Ignite localIgnite = Ignition.localIgnite();
>> IgniteCache<String, MyProcess> localCache =
>> localIgnite.cache("processes");
>> MyHistogram hist = new MyHistogram()
>> for (Cache.Entry<String, MyProcess> e : localCache.localEntries()) {
>>     hist.process(e.getValue());
>> }
>> return hist;
>>
>> The problem with the approach is it utilizes only a single core on the
>> ignite node, while I have 64. How could I do something similar in more
>> efficient manner?
>>
>> thank you in advance.
>>
>

Re: Map Reduce over cache items, where values are sequences

Posted by Evgenii Zhuravlev <e....@gmail.com>.
Hi,

To parallelise everything properly, I would recommend starting an
affinityCallable per partition(1024 by default). Inside this compute job,
you can collect information for the certain partition only using
ScanQuery(or SQLQuery)


пт, 27 сент. 2019 г. в 18:09, Stas Girkin <st...@gmail.com>:

> Hello everyone,
>
> I would like to use MapReduce over cache items representing events
> happened in a process to calculate certain statistics. Could you be so kind
> to help me how can I do that with apache ignite?
>
> I have tens of millions of processes that happened in the past. The
> processes look like a sequence of events [event1, event2, event3, ...
> eventN], where number of events per process could vary (50-100). Every
> event has certain sets of attributes like timestamp, event type, set of
> metrics. I put these data to a cache as process_id => [e1, e2, e3, e4,
> ...]. What I would like to get is to get a histogram how often event of a
> certain type happens in all the processes or processes that have certain
> condition. What I managed to do is to broadcast a callable that lands on
> ignite nodes and can access local cache items and counts what I want and
> returns it back to the caller in K chunks which I have to aggregate on the
> client.
>
> Ignite localIgnite = Ignition.localIgnite();
> IgniteCache<String, MyProcess> localCache = localIgnite.cache("processes");
> MyHistogram hist = new MyHistogram()
> for (Cache.Entry<String, MyProcess> e : localCache.localEntries()) {
>     hist.process(e.getValue());
> }
> return hist;
>
> The problem with the approach is it utilizes only a single core on the
> ignite node, while I have 64. How could I do something similar in more
> efficient manner?
>
> thank you in advance.
>