You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Camilo Alvarez <ca...@gmail.com> on 2015/12/09 23:43:55 UTC
Computation on NodeEntries
Hi ,
I want to know if it is possible to execute a computation only on the
entries of the node?. I want to add apox. 200 million objects in a
distributed cache and then execute a computation in each node, this
computation should work only whith the local entries creating a local
index. I am using the default hash algorithm.
Regards,
Camilo
Re: Computation on NodeEntries
Posted by Sergi Vladykin <se...@gmail.com>.
https://issues.apache.org/jira/browse/IGNITE-2177
Sergi
2015-12-16 12:16 GMT+03:00 Yakov Zhdanov <yz...@apache.org>:
> I see your point Andrey, and I agree that having an opportunity not to
> bring results back, but process them in place is cool.
>
> Sergi, can you please file a ticket and put your design suggestions to it?
>
> --Yakov
>
> 2015-12-15 20:51 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
>
>> Or, alternatively it would be nice to have a way submitting a query from
>> a single node. But instead of bringing the results back, the user would be
>> able to specify a closure that would be sent along with the query to the
>> nodes where the query will be executed. The closure would then be invoked
>> by the query executor for each matching entry locally. With this approach
>> the user would not have to worry about the state of partitions since Ignite
>> would hopefully be able to take care of that (as it presumably already does
>> while executing the SQL query).
>>
>> I believe the good ol' GridGain product used to have an API like that,
>> but then, in a misguided effort to dumb down the APIs, this useful feature
>> was dropped.
>>
>> Thanks
>> Andrey
>>
>> ------------------------------
>> Date: Tue, 15 Dec 2015 20:26:39 +0300
>> Subject: Re: Computation on NodeEntries
>> From: alexey.goncharuk@gmail.com
>> To: user@ignite.apache.org
>>
>> Ah, now I see your point. It looks like we need to add an ability to run
>> an SQL query against an individual partition - this would have worked for
>> your use-case the same way a Scan query works.
>>
>> I wonder of anybody in the community with a deeper knowledge of query
>> processing can estimate the complexity of this feature and create a ticket
>> with a proper description.
>>
>>
>
Re: Computation on NodeEntries
Posted by Yakov Zhdanov <yz...@apache.org>.
I see your point Andrey, and I agree that having an opportunity not to
bring results back, but process them in place is cool.
Sergi, can you please file a ticket and put your design suggestions to it?
--Yakov
2015-12-15 20:51 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
> Or, alternatively it would be nice to have a way submitting a query from a
> single node. But instead of bringing the results back, the user would be
> able to specify a closure that would be sent along with the query to the
> nodes where the query will be executed. The closure would then be invoked
> by the query executor for each matching entry locally. With this approach
> the user would not have to worry about the state of partitions since Ignite
> would hopefully be able to take care of that (as it presumably already does
> while executing the SQL query).
>
> I believe the good ol' GridGain product used to have an API like that, but
> then, in a misguided effort to dumb down the APIs, this useful feature was
> dropped.
>
> Thanks
> Andrey
>
> ------------------------------
> Date: Tue, 15 Dec 2015 20:26:39 +0300
> Subject: Re: Computation on NodeEntries
> From: alexey.goncharuk@gmail.com
> To: user@ignite.apache.org
>
> Ah, now I see your point. It looks like we need to add an ability to run
> an SQL query against an individual partition - this would have worked for
> your use-case the same way a Scan query works.
>
> I wonder of anybody in the community with a deeper knowledge of query
> processing can estimate the complexity of this feature and create a ticket
> with a proper description.
>
>
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
Or, alternatively it would be nice to have a way submitting a query from a single node. But instead of bringing the results back, the user would be able to specify a closure that would be sent along with the query to the nodes where the query will be executed. The closure would then be invoked by the query executor for each matching entry locally. With this approach the user would not have to worry about the state of partitions since Ignite would hopefully be able to take care of that (as it presumably already does while executing the SQL query).
I believe the good ol' GridGain product used to have an API like that, but then, in a misguided effort to dumb down the APIs, this useful feature was dropped.
Thanks
Andrey
Date: Tue, 15 Dec 2015 20:26:39 +0300
Subject: Re: Computation on NodeEntries
From: alexey.goncharuk@gmail.com
To: user@ignite.apache.org
Ah, now I see your point. It looks like we need to add an ability to run an SQL query against an individual partition - this would have worked for your use-case the same way a Scan query works.
I wonder of anybody in the community with a deeper knowledge of query processing can estimate the complexity of this feature and create a ticket with a proper description.
Re: Computation on NodeEntries
Posted by Alexey Goncharuk <al...@gmail.com>.
Ah, now I see your point. It looks like we need to add an ability to run an
SQL query against an individual partition - this would have worked for your
use-case the same way a Scan query works.
I wonder of anybody in the community with a deeper knowledge of query
processing can estimate the complexity of this feature and create a ticket
with a proper description.
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
Alexey,
Yakov's approach works perfectly when one needs to process all (or most) of the entries in the cache. However, in my case I may have 200 million entries in the cache and only, for example, 10 of them would be relevant for a particular run of the computation. I can't do a ScanQuery. I must use a SQL query to quickly filter the data out and process the 10 matching the query.
Hope it makes it more clear.
Andrey
Date: Tue, 15 Dec 2015 19:11:01 +0300
Subject: Re: Computation on NodeEntries
From: alexey.goncharuk@gmail.com
To: user@ignite.apache.org
Andrey,
I did not catch why Yakov's suggestion of data processing on per-partition basis does not work for you? I assume that during this processing the cache is not updated concurrently because otherwise the task does not make sense since there are no full cache snapshots in Ignite (yet).
To summarize what has been posted in this thread so far (Sergi, pls correct me if I am wrong):1) SQL query issued from a single node is guaranteed to get consistent results on changing topology (local should be set to false). Currently there is no 'native' way to parallelize SQL query results processing, such as limit SQL query to one partition.
2) Scan query issued for a single partition is guaranteed to get consistent results on changing topology (local should be set to false). Note that setting local=true and issuing a query locally is not guaranteed to get consistent results.
Having said that, I see no problem with the following approach:1) Create a compute task that will create a job for each partition2) Map each partition's job to the partition's primary node3) Execute a per-partition Scan query inside the job.
In a worst-case scenario, if a job arrives on the node which has already lost the partition ownership, data will be fetched from a remote node.
You can open a ticket to fail per-partition Scan query if local flag is set to true and partition has been moved - in this case 'wrong' jobs could be failed over to correct nodes.
Re: Computation on NodeEntries
Posted by Alexey Goncharuk <al...@gmail.com>.
Andrey,
I did not catch why Yakov's suggestion of data processing on per-partition
basis does not work for you? I assume that during this processing the cache
is not updated concurrently because otherwise the task does not make sense
since there are no full cache snapshots in Ignite (yet).
To summarize what has been posted in this thread so far (Sergi, pls correct
me if I am wrong):
1) SQL query issued from a single node is guaranteed to get consistent
results on changing topology (local should be set to false). Currently
there is no 'native' way to parallelize SQL query results processing, such
as limit SQL query to one partition.
2) Scan query issued for a single partition is guaranteed to get consistent
results on changing topology (local should be set to false). Note that
setting local=true and issuing a query locally is not guaranteed to get
consistent results.
Having said that, I see no problem with the following approach:
1) Create a compute task that will create a job for each partition
2) Map each partition's job to the partition's primary node
3) Execute a per-partition Scan query inside the job.
In a worst-case scenario, if a job arrives on the node which has already
lost the partition ownership, data will be fetched from a remote node.
You can open a ticket to fail per-partition Scan query if local flag is set
to true and partition has been moved - in this case 'wrong' jobs could be
failed over to correct nodes.
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
So now we are back to square one. I'm all ears to suggestions about how does one implement the following use case in a reliable way:
- given a partitioned cache
- process cache entries on all nodes in parallel
- only the cache entries matching a query should be processed
- cluster topology changes must not cause inconsistent results
This is a fundamental use case, a particular variation of the so-called "collocation of compute and data" and at the moment I don't see how I can reliably implement it on top Ignite.
Thanks
Andrey
Date: Tue, 15 Dec 2015 11:40:48 +0300
Subject: Re: Computation on NodeEntries
From: yzhdanov@apache.org
To: user@ignite.apache.org
SQL query guarantees that the entire data set will be processed. Of course, this should be a single query, but not a sequence of broadcasted local queries.--Yakov
Re: Computation on NodeEntries
Posted by Yakov Zhdanov <yz...@apache.org>.
SQL query guarantees that the entire data set will be processed. Of course,
this should be a single query, but not a sequence of broadcasted local
queries.
--Yakov
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
Dmitriy, thanks!
The issue here is not related to the state of the partitions, but to the fact that the launch of the local queries on individual nodes is not atomic with respect to the cluster topology in any way, which makes the race described by Denis possible.
I'm looking for a correct way to handle this situation.
Regards
Andrey
From: dsetrakyan@apache.org
Date: Mon, 14 Dec 2015 16:03:50 -0800
Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org
On Mon, Dec 14, 2015 at 3:16 PM, Andrey Kornev <an...@hotmail.com> wrote:
Hey Denis,
Thanks for your reply! The race you've described is definitely possible.
However, it seems SqlQuery would also be vulnerable to the same race condition. I broadcast the tasks to all nodes and while each task is trying to start a local SqlQuery on the node a new node joins the cluster. Wouldn't I run into the same issue?
Andrey, SqlQuery or ScanQuery should guarantee that you are running on a locked partitions which are in a complete state. If a partition has not been filled yet, e.g. due to a node just joining, then another partition, which has full state on another node, will be picked for a query.
I'd like emphasize I'm not talking about a single node launching a query against the cluster. I'd like to process my data on each node in parallel, but only the data that matches a certain SQL query.
Thanks
Andrey
Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org
From: dmagda@gridgain.com
Date: Mon, 14 Dec 2015 10:53:04 +0300
Hi Andrey,
Please see below.
On 12/13/2015 9:08 AM, Andrey Kornev
wrote:
Yakov,
If partions do not migrate while they are being iterated
over, wouldn't it then suffice to simply execute a single
ScanQuery with its isLocal set to true? My reasoning here is
that the scan would create an iterator for all affinity
partitions, thus preventing their migration. If it's not the
case, how would then a local ScanQuery behave in presence of
topology changes?
From what I see in the code setting ScanQuery's isLocal to 'true'
gives an ability to iterate over all the partitions that belonged to
a node at the time the query is started. All the partitions won't be
moved to another node until the querys's iterator is closed.
However, here I see the following issue. Imagine that your cluster
has two nodes and you decided to iterate over all local partitions
of two nodes and the execution sequence looks like this:
1) ScanQuery with isLocal=true started executing on node A.
All the partitions are blocked and won't be moved.
2) Node B receives the same compute job with the ScanQuery
in it. However because of an OS scheduler a Thread that is in charge
of starting the query is blocked for some time. So the iterator over
local partitions is not ready yet and the partitions are not
blocked;
3) Third node C joins the topology. Partitions that are
owned by Node B may be rebalanced among node A and
node C.
4) Partitions that are rebalanced from node B to node A
won't be visited by your code because node's A iterator is already
built while node's B iterator is constructed after the rebalancing.
The issue can't happen when you specify partitions explicitly using
Yakov's approach below. Because in the worst case in the situation
like above a just rebalanced partition's data will be uploaded to a
node that was initially an owner of the partition (at the time when
you calculated partitions owners).
Also, what's the best way to handle topology changes while
using the SqlQuery rather than ScanQuery? Basically, it's the
same use case, only instead of scanning the entire partition
I'd like to first filter the cache entries using a query.
SqlQueries will work transparently for you and guarantee to return a
full and consistent result set even if a topology is changed while a
query is in progress.
--
Denis
Thanks
Andrey
_____________________________
From: Yakov Zhdanov <yz...@apache.org>
Sent: Friday, December 11, 2015 10:55 AM
Subject: RE: Computation on NodeEntries
To: <us...@ignite.apache.org>
Partition will not migrate if local or remote
iterator is not finished/closed.
On Dec 11, 2015 21:05, "Andrey Kornev"
< andrewkornev@hotmail.com>
wrote:
Great suggestion! Thank you, Yakov!
Just one more question. :) Let's say the scan job is
running node A and processing partition 42. At the same
time, a new node B joins and partition 42 needs to be
moved to this node. What will happen to my scan query
that is still running on node A and iterating over the
partition's entries? Would it complete processing the
entire partition despite the change of ownership? Or,
would the query terminate at some arbitrary point once
the partition ownership transfer has completed?
Thanks a lot!
Andrey
Date: Fri, 11 Dec 2015 16:06:16 +0300
Subject: Re: Computation on NodeEntries
From: yzhdanov@apache.org
To: user@ignite.apache.org
Guys, I would do the following:
1. Map all my partitions to
nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
2. Send jobs (with its list of partitions) to
each node using map returned on step1
3. Job may be like:
new Runnable() {
@Override public void run() {
for (Integer part : parts) {
Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
// do the stuff...
}
}
};
This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
--Yakov
2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
Dmitriy,
Given the approach you suggested below,
what would be your recommendation for
dealing with cluster topology changes
while the iteration is in progress? An
obvious one I can think of is to
- somehow detect the change,
- cancel the tasks on all the nodes
- wait until the rebalancing is finished
and
- restart the computation.
Are there any other ways? Ideally, I'd
like to have the "exactly-once" execution
semantics.
Thanks
Andrey
Re: Computation on NodeEntries
Posted by Dmitriy Setrakyan <ds...@apache.org>.
On Mon, Dec 14, 2015 at 3:16 PM, Andrey Kornev <an...@hotmail.com>
wrote:
> Hey Denis,
>
> Thanks for your reply! The race you've described is definitely possible.
>
> However, it seems SqlQuery would also be vulnerable to the same race
> condition. I broadcast the tasks to all nodes and while each task is trying
> to start a local SqlQuery on the node a new node joins the cluster.
> Wouldn't I run into the same issue?
>
Andrey, SqlQuery or ScanQuery should guarantee that you are running on a
locked partitions which are in a complete state. If a partition has not
been filled yet, e.g. due to a node just joining, then another partition,
which has full state on another node, will be picked for a query.
>
> I'd like emphasize I'm not talking about a single node launching a query
> against the cluster. I'd like to process my data on each node in parallel,
> but only the data that matches a certain SQL query.
>
> Thanks
> Andrey
>
> ------------------------------
> Subject: Re: Computation on NodeEntries
> To: user@ignite.apache.org
> From: dmagda@gridgain.com
> Date: Mon, 14 Dec 2015 10:53:04 +0300
>
>
> Hi Andrey,
>
> Please see below.
>
> On 12/13/2015 9:08 AM, Andrey Kornev wrote:
>
> Yakov,
>
> If partions do not migrate while they are being iterated over, wouldn't it
> then suffice to simply execute a single ScanQuery with its isLocal set to
> true? My reasoning here is that the scan would create an iterator for all
> affinity partitions, thus preventing their migration. If it's not the case,
> how would then a local ScanQuery behave in presence of topology changes?
>
> From what I see in the code setting ScanQuery's isLocal to 'true' gives an
> ability to iterate over all the partitions that belonged to a node at the
> time the query is started. All the partitions won't be moved to another
> node until the querys's iterator is closed.
>
> However, here I see the following issue. Imagine that your cluster has two
> nodes and you decided to iterate over all local partitions of two nodes and
> the execution sequence looks like this:
>
> 1) ScanQuery with isLocal=true started executing on *node A*. All the
> partitions are blocked and won't be moved.
> 2) *Node B* receives the same compute job with the ScanQuery in it.
> However because of an OS scheduler a Thread that is in charge of starting
> the query is blocked for some time. So the iterator over local partitions
> is not ready yet and the partitions are not blocked;
> 3) Third *node C* joins the topology. Partitions that are owned by *Node
> B *may be rebalanced among *node A* and *node C*.
> 4) Partitions that are rebalanced from node B to *node A* won't be
> visited by your code because node's A iterator is already built while
> node's B iterator is constructed after the rebalancing.
>
> The issue can't happen when you specify partitions explicitly using
> Yakov's approach below. Because in the worst case in the situation like
> above a just rebalanced partition's data will be uploaded to a node that
> was initially an owner of the partition (at the time when you calculated
> partitions owners).
>
>
>
> Also, what's the best way to handle topology changes while using the
> SqlQuery rather than ScanQuery? Basically, it's the same use case, only
> instead of scanning the entire partition I'd like to first filter the cache
> entries using a query.
>
> SqlQueries will work transparently for you and guarantee to return a full
> and consistent result set even if a topology is changed while a query is in
> progress.
>
> --
> Denis
>
> Thanks
> Andrey
> _____________________________
> From: Yakov Zhdanov <yz...@apache.org>
> Sent: Friday, December 11, 2015 10:55 AM
> Subject: RE: Computation on NodeEntries
> To: <us...@ignite.apache.org>
>
>
> Partition will not migrate if local or remote iterator is not
> finished/closed.
> On Dec 11, 2015 21:05, "Andrey Kornev" < andrewkornev@hotmail.com> wrote:
>
> Great suggestion! Thank you, Yakov!
>
> Just one more question. :) Let's say the scan job is running node A and
> processing partition 42. At the same time, a new node B joins and partition
> 42 needs to be moved to this node. What will happen to my scan query that
> is still running on node A and iterating over the partition's entries?
> Would it complete processing the entire partition despite the change of
> ownership? Or, would the query terminate at some arbitrary point once the
> partition ownership transfer has completed?
>
> Thanks a lot!
> Andrey
>
> ------------------------------
> Date: Fri, 11 Dec 2015 16:06:16 +0300
> Subject: Re: Computation on NodeEntries
> From: yzhdanov@apache.org
> To: user@ignite.apache.org
>
> Guys, I would do the following:
>
> 1. Map all my partitions to
> nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
> 2. Send jobs (with its list of partitions) to each node using map returned
> on step1
> 3. Job may be like:
>
> new Runnable() {
> @Override public void run() {
> for (Integer part : parts) {
> Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
>
> // do the stuff... }
>
> }
> };
>
> This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
>
>
> --Yakov
>
> 2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
>
> Dmitriy,
>
> Given the approach you suggested below, what would be your recommendation
> for dealing with cluster topology changes while the iteration is in
> progress? An obvious one I can think of is to
> - somehow detect the change,
> - cancel the tasks on all the nodes
> - wait until the rebalancing is finished and
> - restart the computation.
>
> Are there any other ways? Ideally, I'd like to have the "exactly-once"
> execution semantics.
>
> Thanks
> Andrey
>
>
>
>
>
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
Hey Denis,
Thanks for your reply! The race you've described is definitely possible.
However, it seems SqlQuery would also be vulnerable to the same race condition. I broadcast the tasks to all nodes and while each task is trying to start a local SqlQuery on the node a new node joins the cluster. Wouldn't I run into the same issue?
I'd like emphasize I'm not talking about a single node launching a query against the cluster. I'd like to process my data on each node in parallel, but only the data that matches a certain SQL query.
Thanks
Andrey
Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org
From: dmagda@gridgain.com
Date: Mon, 14 Dec 2015 10:53:04 +0300
Hi Andrey,
Please see below.
On 12/13/2015 9:08 AM, Andrey Kornev
wrote:
Yakov,
If partions do not migrate while they are being iterated
over, wouldn't it then suffice to simply execute a single
ScanQuery with its isLocal set to true? My reasoning here is
that the scan would create an iterator for all affinity
partitions, thus preventing their migration. If it's not the
case, how would then a local ScanQuery behave in presence of
topology changes?
From what I see in the code setting ScanQuery's isLocal to 'true'
gives an ability to iterate over all the partitions that belonged to
a node at the time the query is started. All the partitions won't be
moved to another node until the querys's iterator is closed.
However, here I see the following issue. Imagine that your cluster
has two nodes and you decided to iterate over all local partitions
of two nodes and the execution sequence looks like this:
1) ScanQuery with isLocal=true started executing on node A.
All the partitions are blocked and won't be moved.
2) Node B receives the same compute job with the ScanQuery
in it. However because of an OS scheduler a Thread that is in charge
of starting the query is blocked for some time. So the iterator over
local partitions is not ready yet and the partitions are not
blocked;
3) Third node C joins the topology. Partitions that are
owned by Node B may be rebalanced among node A and
node C.
4) Partitions that are rebalanced from node B to node A
won't be visited by your code because node's A iterator is already
built while node's B iterator is constructed after the rebalancing.
The issue can't happen when you specify partitions explicitly using
Yakov's approach below. Because in the worst case in the situation
like above a just rebalanced partition's data will be uploaded to a
node that was initially an owner of the partition (at the time when
you calculated partitions owners).
Also, what's the best way to handle topology changes while
using the SqlQuery rather than ScanQuery? Basically, it's the
same use case, only instead of scanning the entire partition
I'd like to first filter the cache entries using a query.
SqlQueries will work transparently for you and guarantee to return a
full and consistent result set even if a topology is changed while a
query is in progress.
--
Denis
Thanks
Andrey
_____________________________
From: Yakov Zhdanov <yz...@apache.org>
Sent: Friday, December 11, 2015 10:55 AM
Subject: RE: Computation on NodeEntries
To: <us...@ignite.apache.org>
Partition will not migrate if local or remote
iterator is not finished/closed.
On Dec 11, 2015 21:05, "Andrey Kornev"
< andrewkornev@hotmail.com>
wrote:
Great suggestion! Thank you, Yakov!
Just one more question. :) Let's say the scan job is
running node A and processing partition 42. At the same
time, a new node B joins and partition 42 needs to be
moved to this node. What will happen to my scan query
that is still running on node A and iterating over the
partition's entries? Would it complete processing the
entire partition despite the change of ownership? Or,
would the query terminate at some arbitrary point once
the partition ownership transfer has completed?
Thanks a lot!
Andrey
Date: Fri, 11 Dec 2015 16:06:16 +0300
Subject: Re: Computation on NodeEntries
From: yzhdanov@apache.org
To: user@ignite.apache.org
Guys, I would do the following:
1. Map all my partitions to
nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
2. Send jobs (with its list of partitions) to
each node using map returned on step1
3. Job may be like:
new Runnable() {
@Override public void run() {
for (Integer part : parts) {
Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
// do the stuff...
}
}
};
This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
--Yakov
2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
Dmitriy,
Given the approach you suggested below,
what would be your recommendation for
dealing with cluster topology changes
while the iteration is in progress? An
obvious one I can think of is to
- somehow detect the change,
- cancel the tasks on all the nodes
- wait until the rebalancing is finished
and
- restart the computation.
Are there any other ways? Ideally, I'd
like to have the "exactly-once" execution
semantics.
Thanks
Andrey
Re: Computation on NodeEntries
Posted by Denis Magda <dm...@gridgain.com>.
Hi Andrey,
Please see below.
On 12/13/2015 9:08 AM, Andrey Kornev wrote:
> Yakov,
>
> If partions do not migrate while they are being iterated over,
> wouldn't it then suffice to simply execute a single ScanQuery with its
> isLocal set to true? My reasoning here is that the scan would create
> an iterator for all affinity partitions, thus preventing their
> migration. If it's not the case, how would then a local ScanQuery
> behave in presence of topology changes?
From what I see in the code setting ScanQuery's isLocal to 'true' gives
an ability to iterate over all the partitions that belonged to a node at
the time the query is started. All the partitions won't be moved to
another node until the querys's iterator is closed.
However, here I see the following issue. Imagine that your cluster has
two nodes and you decided to iterate over all local partitions of two
nodes and the execution sequence looks like this:
1) ScanQuery with isLocal=true started executing on *node A*. All the
partitions are blocked and won't be moved.
2) *Node B* receives the same compute job with the ScanQuery in it.
However because of an OS scheduler a Thread that is in charge of
starting the query is blocked for some time. So the iterator over local
partitions is not ready yet and the partitions are not blocked;
3) Third *node C* joins the topology. Partitions that are owned by *Node
B *may be rebalanced among *node A* and *node C*.
4) Partitions that are rebalanced from node B to *node A* won't be
visited by your code because node's A iterator is already built while
node's B iterator is constructed after the rebalancing.
The issue can't happen when you specify partitions explicitly using
Yakov's approach below. Because in the worst case in the situation like
above a just rebalanced partition's data will be uploaded to a node that
was initially an owner of the partition (at the time when you calculated
partitions owners).
>
> Also, what's the best way to handle topology changes while using the
> SqlQuery rather than ScanQuery? Basically, it's the same use case,
> only instead of scanning the entire partition I'd like to first filter
> the cache entries using a query.
>
SqlQueries will work transparently for you and guarantee to return a
full and consistent result set even if a topology is changed while a
query is in progress.
--
Denis
> Thanks
> Andrey
> _____________________________
> From: Yakov Zhdanov <yzhdanov@apache.org <ma...@apache.org>>
> Sent: Friday, December 11, 2015 10:55 AM
> Subject: RE: Computation on NodeEntries
> To: <user@ignite.apache.org <ma...@ignite.apache.org>>
>
>
> Partition will not migrate if local or remote iterator is not
> finished/closed.
>
> On Dec 11, 2015 21:05, "Andrey Kornev" < andrewkornev@hotmail.com
> <ma...@hotmail.com>> wrote:
>
> Great suggestion! Thank you, Yakov!
>
> Just one more question. :) Let's say the scan job is running node
> A and processing partition 42. At the same time, a new node B
> joins and partition 42 needs to be moved to this node. What will
> happen to my scan query that is still running on node A and
> iterating over the partition's entries? Would it complete
> processing the entire partition despite the change of ownership?
> Or, would the query terminate at some arbitrary point once the
> partition ownership transfer has completed?
>
> Thanks a lot!
> Andrey
>
> ------------------------------------------------------------------------
> Date: Fri, 11 Dec 2015 16:06:16 +0300
> Subject: Re: Computation on NodeEntries
> From: yzhdanov@apache.org <ma...@apache.org>
> To: user@ignite.apache.org <ma...@ignite.apache.org>
>
> Guys, I would do the following:
>
> 1. Map all my partitions to
> nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
> 2. Send jobs (with its list of partitions) to each node using map
> returned on step1
> 3. Job may be like:
>
> new Runnable() {
> @Override public void run() {
> for (Integer part : parts) {
> Iterator<Cache.Entry<Object, Object>> it =cache.query(new ScanQuery<>(part)).iterator();
>
> // do the stuff... }
>
> }
> };
>
> This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
>
>
> --Yakov
>
> 2015-12-11 2:13 GMT+03:00 Andrey Kornev <andrewkornev@hotmail.com
> <ma...@hotmail.com>>:
>
> Dmitriy,
>
> Given the approach you suggested below, what would be your
> recommendation for dealing with cluster topology changes while
> the iteration is in progress? An obvious one I can think of is to
> - somehow detect the change,
> - cancel the tasks on all the nodes
> - wait until the rebalancing is finished and
> - restart the computation.
>
> Are there any other ways? Ideally, I'd like to have the
> "exactly-once" execution semantics.
>
> Thanks
> Andrey
>
>
>
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
Yakov,
If partions do not migrate while they are being iterated over, wouldn't it then suffice to simply execute a single ScanQuery with its isLocal set to true? My reasoning here is that the scan would create an iterator for all affinity partitions, thus preventing their migration. If it's not the case, how would then a local ScanQuery behave in presence of topology changes?
Also, what's the best way to handle topology changes while using the SqlQuery rather than ScanQuery? Basically, it's the same use case, only instead of scanning the entire partition I'd like to first filter the cache entries using a query.
ThanksAndrey
_____________________________
From: Yakov Zhdanov <yz...@apache.org>
Sent: Friday, December 11, 2015 10:55 AM
Subject: RE: Computation on NodeEntries
To: <us...@ignite.apache.org>
Partition will not migrate if local or remote iterator is not finished/closed. On Dec 11, 2015 21:05, "Andrey Kornev" < andrewkornev@hotmail.com> wrote:
Great suggestion! Thank you, Yakov!
Just one more question. :) Let's say the scan job is running node A and processing partition 42. At the same time, a new node B joins and partition 42 needs to be moved to this node. What will happen to my scan query that is still running on node A and iterating over the partition's entries? Would it complete processing the entire partition despite the change of ownership? Or, would the query terminate at some arbitrary point once the partition ownership transfer has completed?
Thanks a lot!
Andrey
Date: Fri, 11 Dec 2015 16:06:16 +0300
Subject: Re: Computation on NodeEntries
From: yzhdanov@apache.org
To: user@ignite.apache.org
Guys, I would do the following:
1. Map all my partitions to nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes 2. Send jobs (with its list of partitions) to each node using map returned on step1 3. Job may be like: new Runnable() {
@Override public void run() {
for (Integer part : parts) {
Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
// do the stuff...
}
}
};
This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
--Yakov
2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
Dmitriy,
Given the approach you suggested below, what would be your recommendation for dealing with cluster topology changes while the iteration is in progress? An obvious one I can think of is to
- somehow detect the change,
- cancel the tasks on all the nodes
- wait until the rebalancing is finished and
- restart the computation.
Are there any other ways? Ideally, I'd like to have the "exactly-once" execution semantics.
Thanks
Andrey
RE: Computation on NodeEntries
Posted by Yakov Zhdanov <yz...@apache.org>.
Partition will not migrate if local or remote iterator is not
finished/closed.
On Dec 11, 2015 21:05, "Andrey Kornev" <an...@hotmail.com> wrote:
> Great suggestion! Thank you, Yakov!
>
> Just one more question. :) Let's say the scan job is running node A and
> processing partition 42. At the same time, a new node B joins and partition
> 42 needs to be moved to this node. What will happen to my scan query that
> is still running on node A and iterating over the partition's entries?
> Would it complete processing the entire partition despite the change of
> ownership? Or, would the query terminate at some arbitrary point once the
> partition ownership transfer has completed?
>
> Thanks a lot!
> Andrey
>
> ------------------------------
> Date: Fri, 11 Dec 2015 16:06:16 +0300
> Subject: Re: Computation on NodeEntries
> From: yzhdanov@apache.org
> To: user@ignite.apache.org
>
> Guys, I would do the following:
>
> 1. Map all my partitions to
> nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
> 2. Send jobs (with its list of partitions) to each node using map returned
> on step1
> 3. Job may be like:
>
> new Runnable() {
> @Override public void run() {
> for (Integer part : parts) {
> Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
>
> // do the stuff...
> }
>
> }
> };
>
> This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
>
>
> --Yakov
>
> 2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
>
> Dmitriy,
>
> Given the approach you suggested below, what would be your recommendation
> for dealing with cluster topology changes while the iteration is in
> progress? An obvious one I can think of is to
> - somehow detect the change,
> - cancel the tasks on all the nodes
> - wait until the rebalancing is finished and
> - restart the computation.
>
> Are there any other ways? Ideally, I'd like to have the "exactly-once"
> execution semantics.
>
> Thanks
> Andrey
>
>
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
Great suggestion! Thank you, Yakov!
Just one more question. :) Let's say the scan job is running node A and processing partition 42. At the same time, a new node B joins and partition 42 needs to be moved to this node. What will happen to my scan query that is still running on node A and iterating over the partition's entries? Would it complete processing the entire partition despite the change of ownership? Or, would the query terminate at some arbitrary point once the partition ownership transfer has completed?
Thanks a lot!
Andrey
Date: Fri, 11 Dec 2015 16:06:16 +0300
Subject: Re: Computation on NodeEntries
From: yzhdanov@apache.org
To: user@ignite.apache.org
Guys, I would do the following:
1. Map all my partitions to nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes2. Send jobs (with its list of partitions) to each node using map returned on step13. Job may be like:new Runnable() {
@Override public void run() {
for (Integer part : parts) {
Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
// do the stuff...
}
}
};
This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.--Yakov
2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
Dmitriy,
Given the approach you suggested below, what would be your recommendation for dealing with cluster topology changes while the iteration is in progress? An obvious one I can think of is to
- somehow detect the change,
- cancel the tasks on all the nodes
- wait until the rebalancing is finished and
- restart the computation.
Are there any other ways? Ideally, I'd like to have the "exactly-once" execution semantics.
Thanks
Andrey
Re: Computation on NodeEntries
Posted by Yakov Zhdanov <yz...@apache.org>.
Guys, I would do the following:
1. Map all my partitions to
nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
2. Send jobs (with its list of partitions) to each node using map returned
on step1
3. Job may be like:
new Runnable() {
@Override public void run() {
for (Integer part : parts) {
Iterator<Cache.Entry<Object, Object>> it = cache.query(new
ScanQuery<>(part)).iterator();
// do the stuff...
}
}
};
This may result in network calls for some worst cases when topology
changes under your feet, but even in this case this should work.
--Yakov
2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
> Dmitriy,
>
> Given the approach you suggested below, what would be your recommendation
> for dealing with cluster topology changes while the iteration is in
> progress? An obvious one I can think of is to
> - somehow detect the change,
> - cancel the tasks on all the nodes
> - wait until the rebalancing is finished and
> - restart the computation.
>
> Are there any other ways? Ideally, I'd like to have the "exactly-once"
> execution semantics.
>
> Thanks
> Andrey
>
RE: Computation on NodeEntries
Posted by Andrey Kornev <an...@hotmail.com>.
Dmitriy,
Given the approach you suggested below, what would be your recommendation for dealing with cluster topology changes while the iteration is in progress? An obvious one I can think of is to
- somehow detect the change,
- cancel the tasks on all the nodes
- wait until the rebalancing is finished and
- restart the computation.
Are there any other ways? Ideally, I'd like to have the "exactly-once" execution semantics.
Thanks
Andrey
From: dsetrakyan@apache.org
Date: Wed, 9 Dec 2015 23:47:08 -0800
Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org
To get an iterator over the local entries, you can use this method:
IgniteCache.localEntries(CachePeekMode.PRIMARY)
The CachePeekMode.PRIMARY allows to iterate only over the primary copies, ignoring the backups.
You can broadcast a computation to the cache nodes using standard IgniteCompute API and iterate through local entries inside your computation logic.
D.
On Wed, Dec 9, 2015 at 2:43 PM, Camilo Alvarez <ca...@gmail.com> wrote:
Hi ,
I want to know if it is possible to execute a computation only on the entries of the node?. I want to add apox. 200 million objects in a distributed cache and then execute a computation in each node, this computation should work only whith the local entries creating a local index. I am using the default hash algorithm.
Regards,
Camilo
Re: Computation on NodeEntries
Posted by Dmitriy Setrakyan <ds...@apache.org>.
To get an iterator over the local entries, you can use this method:
IgniteCache.localEntries(CachePeekMode.PRIMARY)
The CachePeekMode.PRIMARY allows to iterate only over the primary copies,
ignoring the backups.
You can broadcast a computation to the cache nodes using standard
IgniteCompute API and iterate through local entries inside your computation
logic.
D.
On Wed, Dec 9, 2015 at 2:43 PM, Camilo Alvarez <ca...@gmail.com> wrote:
> Hi ,
>
> I want to know if it is possible to execute a computation only on the
> entries of the node?. I want to add apox. 200 million objects in a
> distributed cache and then execute a computation in each node, this
> computation should work only whith the local entries creating a local
> index. I am using the default hash algorithm.
>
>
> Regards,
>
> Camilo
>