You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by "William.L" <wi...@gmail.com> on 2021/04/23 16:16:50 UTC

MapReduce - How to efficiently scan through subset of the caches?

Hi,

I am investigating whether the MapReduce API is the right tool for my
scenario. Here's the context of the caches:
* Multiple caches for different type of dataset
* Each cache has multi-tenant data and the tenant id is part of the cache
key
* Each cache entry is a complex json/binary object that I want to do
computation on (let's just say it is hard to do it in SQL) and return some
complex results for each entry (e.g. a dictionary) that I want to do
reduce/aggregation on.
* The cluster is persistence enabled because we have more data then memory

My scenario is to do the MapReduce operation only on data for a specific
tenant (small subset of the data). From reading the forum about MapReduce,
it seems like the best way to do this is using the IgniteCache.localEntries
API and iterate through the node's local cache. My concern with this
approach is that we are looping through the whole cache (K&V) which is very
inefficient. Is there a more efficient way to filter only the relevant keys
and then access the matching entries only?

Thanks.




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: MapReduce - How to efficiently scan through subset of the caches?

Posted by Pavel Tupitsyn <pt...@apache.org>.
> For Compute.affinityRun, I am not sure how to work with it for my scenario

affinityRun and affinityCall have partition-based overloads (taking int
partId).
Partition-based compute is the reliable way to process all data in the
cluster,
even in the face of topology changes/rebalance (as opposed to localEntries
or local queries).

The whole thing can look like this:

1. From the initiator node, start processing all partitions in parallel
  for (int part = 0; part < ignite.affinity().partitions(); i++)
      var fut = ignite.compute().affinityCallAsync(cacheNames, part, new
MyJob(part));

2. Inside MyJob, find tenant data with SQL
    var entries = cache.query(new SqlFieldsQuery().setPartitions(part)...);

3. Still inside MyJob, process the data in any way, return results from the
job
    return process(entries);

4. Aggregate job results on the initiator


Here, Ignite guarantees that steps 2 and 3:
- Operate on local data (job runs on the node where the partition is
located)
- The partition is locked while job runs, so the data won't be moved in
case of topology changes

On Sat, Apr 24, 2021 at 3:12 AM William.L <wi...@gmail.com> wrote:

> Thanks for the pointers stephendarlington, ptupitsyn.
>
> Looks like I can run a mapper that does a local SQL query to get the set of
> keys for the tenant (that resides on the local server node), and then do
> Compute.affinityRun or Cache.invokeAll.
>
> For Cache.invokeAll, it takes a dictionary of keys to EntryProcessor so
> that
> is easy to understand.
>
> For Compute.affinityRun, I am not sure how to work with it for my scenario:
> * It takes an affinity key to find the partition's server to run the
> IgniteRunnable but I don't see an interface to pass in the specific keys?
> Am
> I expected to pass the key set as part of IgniteRunnable object?
> * Suppose the cache use user_id as the affinity key then it is possible
> that
> 2 user_id will map to the same partition. How do I avoid duplicate
> processing/scanning?
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: MapReduce - How to efficiently scan through subset of the caches?

Posted by "William.L" <wi...@gmail.com>.
Thanks for the pointers stephendarlington, ptupitsyn.

Looks like I can run a mapper that does a local SQL query to get the set of
keys for the tenant (that resides on the local server node), and then do
Compute.affinityRun or Cache.invokeAll.

For Cache.invokeAll, it takes a dictionary of keys to EntryProcessor so that
is easy to understand.

For Compute.affinityRun, I am not sure how to work with it for my scenario:
* It takes an affinity key to find the partition's server to run the
IgniteRunnable but I don't see an interface to pass in the specific keys? Am
I expected to pass the key set as part of IgniteRunnable object? 
* Suppose the cache use user_id as the affinity key then it is possible that
2 user_id will map to the same partition. How do I avoid duplicate
processing/scanning?





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: MapReduce - How to efficiently scan through subset of the caches?

Posted by Pavel Tupitsyn <pt...@apache.org>.
1. Use a separate cache as an index.
   E.g. for every tenant store a list of IDs for quick retrieval,
   then use Compute.affinityRun or Cache.invokeAll to process the subset of
data

2. Use SQL with index, but enable it only for the tenantId field.
    Get entry IDs for a given tenant with SQL, then again AffinityRun or
InvokeAll

> IgniteCache.localEntries
Be careful with localEntries - when topology changes and rebalance is in
progress,
you'll miss some data and/or process some of it twice.

Prefer Cache.invoke and Compute.affinity* APIs - they provide guarantees
that the given part of data (key or partition) is locked during the
processing.

On Fri, Apr 23, 2021 at 7:17 PM William.L <wi...@gmail.com> wrote:

> Hi,
>
> I am investigating whether the MapReduce API is the right tool for my
> scenario. Here's the context of the caches:
> * Multiple caches for different type of dataset
> * Each cache has multi-tenant data and the tenant id is part of the cache
> key
> * Each cache entry is a complex json/binary object that I want to do
> computation on (let's just say it is hard to do it in SQL) and return some
> complex results for each entry (e.g. a dictionary) that I want to do
> reduce/aggregation on.
> * The cluster is persistence enabled because we have more data then memory
>
> My scenario is to do the MapReduce operation only on data for a specific
> tenant (small subset of the data). From reading the forum about MapReduce,
> it seems like the best way to do this is using the IgniteCache.localEntries
> API and iterate through the node's local cache. My concern with this
> approach is that we are looping through the whole cache (K&V) which is very
> inefficient. Is there a more efficient way to filter only the relevant keys
> and then access the matching entries only?
>
> Thanks.
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: MapReduce - How to efficiently scan through subset of the caches?

Posted by Stephen Darlington <st...@gridgain.com>.
Add an index on the tenant, run a SQL query with setLocal(true). You might also want to look at the IgniteCompute#affinityRun method that takes a partition as a parameter and run it by partition rather than node (higher degree of parallelism, potentially makes failover easier).

> On 23 Apr 2021, at 17:16, William.L <wi...@gmail.com> wrote:
> 
> Hi,
> 
> I am investigating whether the MapReduce API is the right tool for my
> scenario. Here's the context of the caches:
> * Multiple caches for different type of dataset
> * Each cache has multi-tenant data and the tenant id is part of the cache
> key
> * Each cache entry is a complex json/binary object that I want to do
> computation on (let's just say it is hard to do it in SQL) and return some
> complex results for each entry (e.g. a dictionary) that I want to do
> reduce/aggregation on.
> * The cluster is persistence enabled because we have more data then memory
> 
> My scenario is to do the MapReduce operation only on data for a specific
> tenant (small subset of the data). From reading the forum about MapReduce,
> it seems like the best way to do this is using the IgniteCache.localEntries
> API and iterate through the node's local cache. My concern with this
> approach is that we are looping through the whole cache (K&V) which is very
> inefficient. Is there a more efficient way to filter only the relevant keys
> and then access the matching entries only?
> 
> Thanks.
> 
> 
> 
> 
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/