You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Chris Berry <ch...@gmail.com> on 2017/11/15 19:00:19 UTC

ComputeTask is including Nodes for ComputeJobs prematurely

Hi,

We are using Ignite’s Compute Grid. And mostly, it is working beautifully. 
But there is one glitch I need to work around.

AWS, Nodes come and go.
And when a new Node joins the existing Grid, it is given ComputeJobs before
it is actually ready to handle them.

In our infrastructure, a Node announces its availability by returning a 200
from an “Alive page”.
This is used by the Load Balancers to determine if a particular Instance can
take load.
This works very nicely at that level.

Is there a way to provide a similar functionality to this in the ComputeTask
map() method??
Obviously, we cannot make an HTTP call for every map().

Perhaps there is a way to handle this “upstream” from the ComputeTask??
Perhaps using a similar “Alive page” functionality that Ignite could
employ??

Or perhaps there is a way to enable the Ignite “communications port” only
when it is ready?? 
(But would this break other things in Ignite for startup??)

Make sense??

Any help/suggestions would be greatly appreciated.

Thanks, 
-- Chris 




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Denis Mekhanikov <dm...@gmail.com>.
Continuation of this thread is here:
http://apache-ignite-users.70518.x6.nabble.com/ignite-compute-ClusterGroup-is-broken-td18655.html

пн, 27 нояб. 2017 г. в 3:30, Chris Berry <ch...@gmail.com>:

> >>>> Affinity is not affected by IgniteCompute. My mistake.
> >>>> I get a hold of an IgniteCompute _before_ the Affinity is applied
> >> I'm not sure, what you are trying to say.
>
> I mean is -- I do this;
>
>     ignite.compute().withTimeout(timeout).execute(computeTask, request);
>
> Up-front.  Before I execute the Affinity part. Like so
>
>     public Map<? extends ComputeJob, ClusterNode>
>          map(List<ClusterNode> subgrid, @Nullable TRequest request) throws
> IgniteException {
>         Map<ComputeJob, ClusterNode> jobMap = new HashMap<>();
>         try {
>             List<UUID> cacheKeys = getIgniteAffinityCacheKeys(request);
>             Map<ClusterNode, Collection&lt;UUID>> nodeToKeysMap =
>
>
> ignite.<UUID>affinity(getIgniteAffinityCacheName()).mapKeysToNodes(cacheKeys);
>
> >> Correct me, if I'm wrong, but you are trying to choose a node to run a
> >> compute task on,
> >> depending on where a needed key is stored (just like affinityRun).
> >> And at the same time you want to make sure, that the corresponding node
> >> is already initialized.
> >> And if a node, that you chose by affinity, is not ready yet, then there
> >> will be no suitable nodes.
>
> We need the existing Grid to keep computing as Nodes come and go, which is
> often the case in the Cloud.
>
> As in; NodeA somehow die, and consequently another; NodeB, soon replaces
> it.
> Meanwhile, there are 3 other, additional copies of the data that was on
> NodeA available. (1 Primary & 3 Backups)
>
> What I want is that the compute load, which is constant and high, to not be
> sent to NodeB until it is truly ready to accept load.
>
> And that is not when NodeB starts Ignite, but sometime afterwards – which
> can, possibly, be a few minutes.
>
> >> Another thing I don't understand is why you need a ComputeTask#map
> >> method.
> >> Why ClusterGroup-specific compute is not enough for you?
>
> As shown above. I execute each ComputeTask against a large batch of Keys.
> These are then mapped to many different Nodes -- those with the data for
> that Key.
> And I send the ComputeJob to the appropriate Node using the map() function
>
> Thank you for all your help Denis.
> Cheers,
> -- Chris
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
>>>> Affinity is not affected by IgniteCompute. My mistake.
>>>> I get a hold of an IgniteCompute _before_ the Affinity is applied
>> I'm not sure, what you are trying to say.

I mean is -- I do this;

    ignite.compute().withTimeout(timeout).execute(computeTask, request);

Up-front.  Before I execute the Affinity part. Like so

    public Map<? extends ComputeJob, ClusterNode> 
         map(List<ClusterNode> subgrid, @Nullable TRequest request) throws
IgniteException {
        Map<ComputeJob, ClusterNode> jobMap = new HashMap<>();
        try {
            List<UUID> cacheKeys = getIgniteAffinityCacheKeys(request);
            Map<ClusterNode, Collection&lt;UUID>> nodeToKeysMap = 
                  
ignite.<UUID>affinity(getIgniteAffinityCacheName()).mapKeysToNodes(cacheKeys);

>> Correct me, if I'm wrong, but you are trying to choose a node to run a
>> compute task on, 
>> depending on where a needed key is stored (just like affinityRun). 
>> And at the same time you want to make sure, that the corresponding node
>> is already initialized. 
>> And if a node, that you chose by affinity, is not ready yet, then there
>> will be no suitable nodes.

We need the existing Grid to keep computing as Nodes come and go, which is
often the case in the Cloud.

As in; NodeA somehow die, and consequently another; NodeB, soon replaces it.
Meanwhile, there are 3 other, additional copies of the data that was on
NodeA available. (1 Primary & 3 Backups)

What I want is that the compute load, which is constant and high, to not be
sent to NodeB until it is truly ready to accept load.

And that is not when NodeB starts Ignite, but sometime afterwards – which
can, possibly, be a few minutes.

>> Another thing I don't understand is why you need a ComputeTask#map
>> method. 
>> Why ClusterGroup-specific compute is not enough for you?

As shown above. I execute each ComputeTask against a large batch of Keys.
These are then mapped to many different Nodes -- those with the data for
that Key.
And I send the ComputeJob to the appropriate Node using the map() function

Thank you for all your help Denis.
Cheers, 
-- Chris 




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Denis Mekhanikov <dm...@gmail.com>.
> Affinity is not affected by IgniteCompute. My mistake.
> I get a hold of an IgniteCompute _before_ the Affinity is applied
I'm not sure, what you are trying to say.

Correct me, if I'm wrong, but you are trying to choose a node to run a
compute task on, depending on where a needed key is stored (just like
affinityRun
<https://apacheignite.readme.io/docs/affinity-collocation#section-collocating-compute-with-data>
).
And at the same time you want to make sure, that the corresponding node is
already initialized. And if a node, that you chose by affinity, is not
ready yet, then there will be no suitable nodes.

Another thing I don't understand is why you need a ComputeTask#map method.
Why ClusterGroup-specific compute is not enough for you?

Denis


чт, 23 нояб. 2017 г. в 2:12, Chris Berry <ch...@gmail.com>:

> Hi Denis,
>
> You are correct. Thank you!
> Affinity is not affected by IgniteCompute. My mistake.
> I get a hold of an IgniteCompute _before_ the Affinity is applied.
>
> So. It seems that a “dynamic cluster group based on a predicate” will work
> beautifully!!
> I can simply add my gating function there.
>
> I knew that Ignite most likely had some clever solution hiding in the
> wings!!
>
> These are my thoughts on implementing it.
>
> 1) Create a REPLICATED, ReadyForLoadCache
> As Nodes become “ready for load”, write a new row to the ReadyForLoadCache.
> I.e. write  a Key=NodeId  Value=”true” row
>
> 2) Use a Continuous Query that watches the ReadyForLoadCache.
> It will use a SQL Query to collect all rows that are “true”
> The results will be stored locally in a Set<NodeId> nodesReadyForLoad;
>
> 3) In the ComputeTask::map() method, use;
>
>    ClusterGroup readyNodes = cluster.forPredicate((node) ->
> readyForLoadMonitor.isTakingLoad(node));
>    IgniteCompute c = ignite.compute(readyNodes);
>
> where isTakingLoad(node) will simply check the Set<NodeId>
> nodesReadyForLoad
>
> 4) I still need to handle Nodes coming & going with some sort of “outside
> monitor”, that will monitor when Nodes leave the Grid. And thus, can remove
> the row from the ReadyForLoadCache (or make it "false")
> This will lilely just listen to Grid Events.
>
> Any comments??
>
> FWIW, I do not think synchronization can work for me.
> This is very high volume, low latency app.
> And honestly, I dont really see how I'd implement it
>
> Thanks again for all of your help.
> Cheers,
> -- Chris
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
Hi Denis,

You are correct. Thank you!
Affinity is not affected by IgniteCompute. My mistake.
I get a hold of an IgniteCompute _before_ the Affinity is applied.

So. It seems that a “dynamic cluster group based on a predicate” will work
beautifully!!
I can simply add my gating function there.

I knew that Ignite most likely had some clever solution hiding in the
wings!!

These are my thoughts on implementing it.

1) Create a REPLICATED, ReadyForLoadCache
As Nodes become “ready for load”, write a new row to the ReadyForLoadCache. 
I.e. write  a Key=NodeId  Value=”true” row 

2) Use a Continuous Query that watches the ReadyForLoadCache.
It will use a SQL Query to collect all rows that are “true”
The results will be stored locally in a Set<NodeId> nodesReadyForLoad;

3) In the ComputeTask::map() method, use;

   ClusterGroup readyNodes = cluster.forPredicate((node) ->
readyForLoadMonitor.isTakingLoad(node));
   IgniteCompute c = ignite.compute(readyNodes);

where isTakingLoad(node) will simply check the Set<NodeId> nodesReadyForLoad

4) I still need to handle Nodes coming & going with some sort of “outside
monitor”, that will monitor when Nodes leave the Grid. And thus, can remove
the row from the ReadyForLoadCache (or make it "false")
This will lilely just listen to Grid Events.

Any comments??

FWIW, I do not think synchronization can work for me.
This is very high volume, low latency app.
And honestly, I dont really see how I'd implement it 

Thanks again for all of your help.
Cheers,
-- Chris





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Denis Mekhanikov <dm...@gmail.com>.
> Would you stop Ignite & start it again??
Yes, I meant that. You can't tell Ignite not to take compute tasks, you can
only configure compute tasks not to be executed on certain nodes. So, the
first run will be to read configuration, the second one - to execute tasks
and do everything else. To avoid rebalancing, you can configure node
filters for caches as well:
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/configuration/CacheConfiguration.html#setNodeFilter(org.apache.ignite.lang.IgnitePredicate
)

> Both the “start twice” and the “dynamic cluster group solution” do not appear
to work for Affinity??
Affinity doesn't have anything to do with cluster groups. I recommended you
to use cluster groups to separate nodes, that are reading configuration
from those, which execute compute tasks.

What about synchronization? This solution seems to be the easiest.

Denis

ср, 22 нояб. 2017 г. в 17:11, Chris Berry <ch...@gmail.com>:

> Hi Denis,
>
> Thank you very much for responding.
>
> I’m unclear what you meant by “start Ignite twice”??
> I don’t really understand how that scenario would work.
> Would you stop Ignite & start it again??
> And, if so, wouldn’t that cause a whole new “rebalancing” to occur??
> Is there a “soft restart” that I am missing??
>
> Unfortunately, I do not see how the “dynamic cluster group based on a
> predicate” will work for me??
> Otherwise, it _would_ be a great solution…
>
> We rely heavily on Affinity to assure that the cache data is all collocated
> for computation.
>
>     Map<ComputeJob, ClusterNode> jobMap = new HashMap<>();
>     try {
>         List<UUID> cacheKeys = getIgniteAffinityCacheKeys(request);
>        Map<ClusterNode, Collection&lt;UUID>> nodeToKeysMap = ignite.
>
> <UUID>affinity(getIgniteAffinityCacheName()).mapKeysToNodes(cacheKeys);
>
> And, unfortunately, I do NOT believe that this works with ClusterGroups.
> Correct??
>
> So. It seems that I still do not understand how to implement this…
> Both the “start twice” and the “dynamic cluster group solution” do not
> appear to work for Affinity??
>
> Thanks for your help!!
> Cheers,
> -- Chris
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
Hi Denis,

Thank you very much for responding.

I’m unclear what you meant by “start Ignite twice”??
I don’t really understand how that scenario would work.
Would you stop Ignite & start it again??
And, if so, wouldn’t that cause a whole new “rebalancing” to occur??
Is there a “soft restart” that I am missing??

Unfortunately, I do not see how the “dynamic cluster group based on a
predicate” will work for me??
Otherwise, it _would_ be a great solution…

We rely heavily on Affinity to assure that the cache data is all collocated
for computation.

    Map<ComputeJob, ClusterNode> jobMap = new HashMap<>();
    try {
        List<UUID> cacheKeys = getIgniteAffinityCacheKeys(request);
       Map<ClusterNode, Collection&lt;UUID>> nodeToKeysMap = ignite.
               
<UUID>affinity(getIgniteAffinityCacheName()).mapKeysToNodes(cacheKeys);

And, unfortunately, I do NOT believe that this works with ClusterGroups.
Correct??

So. It seems that I still do not understand how to implement this…
Both the “start twice” and the “dynamic cluster group solution” do not
appear to work for Affinity??

Thanks for your help!!
Cheers,
-- Chris




--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Denis Mekhanikov <dm...@gmail.com>.
Chris,

If it's exactly the thing that you want to achieve, you can start Ignite
twice. First time - to get contents of the "admin cache", and second time,
after all initialization is finished - to accept compute jobs.
To avoid compute jobs from being executed by the first node, you can run
them (jobs), using a cluster group filter. Something like this:

IgniteCompute compute = ignite.compute(
    ignite.cluster().forAttribute("ROLE", "worker"));

And the second node should be started with corresponding user attributes
configured. You can find here more about cluster groups and node
attributes:
https://apacheignite.readme.io/v2.3/docs/cluster-groups#section-cluster-groups-with-node-attributes

But this solution looks clumsy, I think. IMO, It's better to add some
synchronization to the beginning of your jobs. So, if a job comes to a
node, that haven't initialized yet, it will just wait until the
initialization is finished.
But if initialization is too long, and you think, that it will affect
latency too much, you can go with the first option.

Denis

вт, 21 нояб. 2017 г. в 0:06, Chris Berry <ch...@gmail.com>:

> Hi Ilya,
>
> I think you are missing the point. It is a chicken-and-egg problem.
>
> I need to start Ignite so that I can do various things – like use some
> various “admin caches” to look up things, etc – BEFORE I can accept real
> load.
>
> But the way it stands today, Ignite _assumes_ that if it is started, then
> it
> must also be ready to participate in the ComputeGrid.
> That may be true for some, or even most, use cases, but it will NOT be true
> for all.
> These are separate lifecycle events.
> In my case, Ignite starts slamming the Node with load before it is fully
> initialized and ready for real load.
>
> Thus, I’ve had to reject all ComputeJobs in my implementation (in
> ComputeJob::execute()), and force them to be retried in
> ComputeTask:::result() (returning FAILOVER).
> This works, of course.
> But is pretty silly. And hurts performance badly during the transition
> period.
>
> What is needed is a simple gate. E.g “acceptComputeJobs”
> It could default to “true” at start-up.
> But, for those that need finer control of that lifecycle, it could allow
> you
> to start-up with acceptComputeJobs=false
> And, then flip it to true manually, when the Node is ready to participate.
>
> Make sense??
> Thanks,
> -- Chris
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
Hi Ilya,

I think you are missing the point. It is a chicken-and-egg problem.

I need to start Ignite so that I can do various things – like use some
various “admin caches” to look up things, etc – BEFORE I can accept real
load.

But the way it stands today, Ignite _assumes_ that if it is started, then it
must also be ready to participate in the ComputeGrid.
That may be true for some, or even most, use cases, but it will NOT be true
for all.
These are separate lifecycle events.
In my case, Ignite starts slamming the Node with load before it is fully
initialized and ready for real load.

Thus, I’ve had to reject all ComputeJobs in my implementation (in
ComputeJob::execute()), and force them to be retried in
ComputeTask:::result() (returning FAILOVER).
This works, of course.
But is pretty silly. And hurts performance badly during the transition
period.

What is needed is a simple gate. E.g “acceptComputeJobs”
It could default to “true” at start-up.
But, for those that need finer control of that lifecycle, it could allow you
to start-up with acceptComputeJobs=false
And, then flip it to true manually, when the Node is ready to participate.

Make sense??
Thanks, 
-- Chris 





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Ilya Kasnacheev <il...@gmail.com>.
Hello Chris?

Are you using ignite-spring bindings? There's a problem with initialization
lifecycle that will be fixed in the next release:
https://issues.apache.org/jira/browse/IGNITE-6555

If that is the case, as a workaround you can put Ignite in a dedicated
context and only start it at the last moment when the parent context is up
and running, as described here:
https://stackoverflow.com/questions/44696908/spring-make-serveltdispatcher-context-await-until-another-context-finish-loadin
(It will not be serviet context as in that example but a context containing
Ignite)

If it is your own code starting Ignite, it is your duty to start it at
appropriate time, using the same workaround as an example. Listen to Spring
events, run Ignite when ready.

Regards,



-- 
Ilya Kasnacheev

2017-11-16 21:24 GMT+03:00 Chris Berry <ch...@gmail.com>:

> No. There are things in the general application that are not yet
> initialized.
> It is essentially a chicken-and-egg problem.
> I need to start Ignite, and do various other things before I am ready to
> take load.
> But as soon as Ignite is started -- it starts to take ComputeJobs.
>
> We really need a way to tell Ignite that the app is ready to accept
> ComputJobs.
> Instead of it assuming that once it is started, then it must be ready, by
> default.
>
> Basically, a gate.
>
> Thanks,
> -- Chris
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Denis Mekhanikov <dm...@gmail.com>.
Hi Chris!

Can you just start Ignite after you finished all initialization that is
needed by compute jobs?

Denis

чт, 16 нояб. 2017 г. в 21:25, Chris Berry <ch...@gmail.com>:

> No. There are things in the general application that are not yet
> initialized.
> It is essentially a chicken-and-egg problem.
> I need to start Ignite, and do various other things before I am ready to
> take load.
> But as soon as Ignite is started -- it starts to take ComputeJobs.
>
> We really need a way to tell Ignite that the app is ready to accept
> ComputJobs.
> Instead of it assuming that once it is started, then it must be ready, by
> default.
>
> Basically, a gate.
>
> Thanks,
> -- Chris
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
No. There are things in the general application that are not yet initialized.
It is essentially a chicken-and-egg problem.
I need to start Ignite, and do various other things before I am ready to
take load.
But as soon as Ignite is started -- it starts to take ComputeJobs.

We really need a way to tell Ignite that the app is ready to accept
ComputJobs.
Instead of it assuming that once it is started, then it must be ready, by
default.

Basically, a gate.

Thanks,
-- Chris 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by vkulichenko <va...@gmail.com>.
Chris,

Sorry, I still don't understand. What are the things that need to happen
exactly? Is there anything on Ignite level that is not initialized when job
arrives for execution?

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
Hi Val,

"Prematurely" in this case is a Node not fully initialized.

In a nutshell, Ignite is started, enters the "discovery set", but further
things must happen for it to be ready to do computation (take real load).
When the Grid is initially started, this is pretty easy to control -- we can
gate that at the load-balancer -- only allowing it to receive load when the
Grid is "complete", and the caches are primed. By allowing the health-check
to only return a 200 when the Grid can truly receive load.

But later, when the Grid is complete, a Node entering the Grid must still be
fully initialized before it can take load. But there is no easy way to say;
"I am ready for load" for the ComputeTask. It seems there really needs to
be. 

That said, I do not think the AdaptiveLoadBalancingSpi works in my
scenario??

Because we are using Affinity, and the docs seem to indicate that load
balancing and Affinity are mutually exclusive.


> Data Affinity
> Note that load balancing is triggered whenever your jobs are not
> collocated with data or have no real preference on which node to execute.
> If Collocation Of Compute and Data is used, then data affinity takes
> priority over load balancing.

A further question:

Could you please confirm that:  Affinity::mapKeysToNodes() returns ONLY
PRIMARY Nodes ??
The javadoc is unclear there.

And that I would need to use Affinity::mapKeyToPrimaryAndBackups() if I want
to provide alternate Nodes when I want to provide alternate Nodes.

Thanks,
-- Chris 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by vkulichenko <va...@gmail.com>.
Hi Chris,

Can you define "prematurely"? What exactly is required for a node to be
ready to execute jobs? From Ignite perspective, it is ready as long as it's
in topology on discovery level, and that's when it starts receiving
requests.

BTW, with AdaptiveLoadBalancingSpi you can implement custom
AdaptiveLoadProbe which can use any information, there is no requirement to
rely on out of the box metrics.

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
Hmmm, the raw code does not appear to be visible.
Reposting....

    public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid, @Nullable TRequest request) throws IgniteException {
            Map<ComputeJob, ClusterNode> jobMap = new HashMap<>();
            List<UUID> cacheKeys = getIgniteAffinityCacheKeys(request);
            Map<ClusterNode, Collection&lt;UUID>> nodeToKeysMap =
ignite.<UUID>affinity(getIgniteAffinityCacheName()).mapKeysToNodes(cacheKeys);

            for (Map.Entry<ClusterNode, Collection&lt;UUID>> mapping :
nodeToKeysMap.entrySet()) {
                ClusterNode node = mapping.getKey();
                Collection<UUID> mappedKeys = mapping.getValue();
                final List<UUID> uuidList = (mappedKeys instanceof List ?
(List<UUID>) mappedKeys : new ArrayList<>(mappedKeys));
                if (node != null) {
                    TRequest reducedRequest =
requestWithPerNodeUUIDs(request, uuidList);
                    AbstractComputeJob job = createJob(reducedRequest,
context);
                    jobMap.put(job, node);
                }
            }
           return jobMap;
    }

Thanks,
-- Chris



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: ComputeTask is including Nodes for ComputeJobs prematurely

Posted by Chris Berry <ch...@gmail.com>.
I figure I should probably add my map() method.


I think this is all further complicated because we rely heavily on Affinity.

I was thinking that could maybe use the AdaptiveLoadBalancingSpi 
But I see no way to add my own metrics to ClusterMetrics

Plus, I do not see how this would work with 
affinity(getIgniteAffinityCacheName()).mapKeysToNodes(cacheKeys);

Thanks,
-- Chris 



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/