You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Camilo Alvarez <ca...@gmail.com> on 2015/12/09 23:43:55 UTC

Computation on NodeEntries

Hi ,

I want to know if it is possible to execute a computation only on the
entries of the node?. I want to add apox. 200 million objects in a
distributed cache and then execute a computation in each node, this
computation should work only whith the local entries creating a local
index. I am using the default hash algorithm.


Regards,

Camilo

Re: Computation on NodeEntries

Posted by Sergi Vladykin <se...@gmail.com>.
https://issues.apache.org/jira/browse/IGNITE-2177

Sergi

2015-12-16 12:16 GMT+03:00 Yakov Zhdanov <yz...@apache.org>:

> I see your point Andrey, and I agree that having an opportunity not to
> bring results back, but process them in place is cool.
>
> Sergi, can you please file a ticket and put your design suggestions to it?
>
> --Yakov
>
> 2015-12-15 20:51 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
>
>> Or, alternatively it would be nice to have a way submitting a query from
>> a single node. But instead of bringing the results back, the user would be
>> able to specify a closure that would be sent along with the query to the
>> nodes where the query will be executed. The closure would then be invoked
>> by the query executor for each matching entry locally. With this approach
>> the user would not have to worry about the state of partitions since Ignite
>> would hopefully be able to take care of that (as it presumably already does
>> while executing the SQL query).
>>
>> I believe the good ol' GridGain product used to have an API like that,
>> but then, in a misguided effort to dumb down the APIs, this useful feature
>> was dropped.
>>
>> Thanks
>> Andrey
>>
>> ------------------------------
>> Date: Tue, 15 Dec 2015 20:26:39 +0300
>> Subject: Re: Computation on NodeEntries
>> From: alexey.goncharuk@gmail.com
>> To: user@ignite.apache.org
>>
>> Ah, now I see your point. It looks like we need to add an ability to run
>> an SQL query against an individual partition - this would have worked for
>> your use-case the same way a Scan query works.
>>
>> I wonder of anybody in the community with a deeper knowledge of query
>> processing can estimate the complexity of this feature and create a ticket
>> with a proper description.
>>
>>
>

Re: Computation on NodeEntries

Posted by Yakov Zhdanov <yz...@apache.org>.
I see your point Andrey, and I agree that having an opportunity not to
bring results back, but process them in place is cool.

Sergi, can you please file a ticket and put your design suggestions to it?

--Yakov

2015-12-15 20:51 GMT+03:00 Andrey Kornev <an...@hotmail.com>:

> Or, alternatively it would be nice to have a way submitting a query from a
> single node. But instead of bringing the results back, the user would be
> able to specify a closure that would be sent along with the query to the
> nodes where the query will be executed. The closure would then be invoked
> by the query executor for each matching entry locally. With this approach
> the user would not have to worry about the state of partitions since Ignite
> would hopefully be able to take care of that (as it presumably already does
> while executing the SQL query).
>
> I believe the good ol' GridGain product used to have an API like that, but
> then, in a misguided effort to dumb down the APIs, this useful feature was
> dropped.
>
> Thanks
> Andrey
>
> ------------------------------
> Date: Tue, 15 Dec 2015 20:26:39 +0300
> Subject: Re: Computation on NodeEntries
> From: alexey.goncharuk@gmail.com
> To: user@ignite.apache.org
>
> Ah, now I see your point. It looks like we need to add an ability to run
> an SQL query against an individual partition - this would have worked for
> your use-case the same way a Scan query works.
>
> I wonder of anybody in the community with a deeper knowledge of query
> processing can estimate the complexity of this feature and create a ticket
> with a proper description.
>
>

RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
Or, alternatively it would be nice to have a way submitting a query from a single node. But instead of bringing the results back, the user would be able to specify a closure that would be sent along with the query to the nodes where the query will be executed. The closure would then be invoked by the query executor for each matching entry locally. With this approach the user would not have to worry about the state of partitions since Ignite would hopefully be able to take care of that (as it presumably already does while executing the SQL query).

I believe the good ol' GridGain product used to have an API like that, but then, in a misguided effort to dumb down the APIs, this useful feature was dropped.

Thanks
Andrey

Date: Tue, 15 Dec 2015 20:26:39 +0300
Subject: Re: Computation on NodeEntries
From: alexey.goncharuk@gmail.com
To: user@ignite.apache.org

Ah, now I see your point. It looks like we need to add an ability to run an SQL query against an individual partition - this would have worked for your use-case the same way a Scan query works.
I wonder of anybody in the community with a deeper knowledge of query processing can estimate the complexity of this feature and create a ticket with a proper description.
 		 	   		  

Re: Computation on NodeEntries

Posted by Alexey Goncharuk <al...@gmail.com>.
Ah, now I see your point. It looks like we need to add an ability to run an
SQL query against an individual partition - this would have worked for your
use-case the same way a Scan query works.

I wonder of anybody in the community with a deeper knowledge of query
processing can estimate the complexity of this feature and create a ticket
with a proper description.

RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
Alexey,

Yakov's approach works perfectly when one needs to process all (or most) of the entries in the cache. However, in my case I may have 200 million entries in the cache and only, for example, 10 of them would be relevant for a particular run of the computation. I can't do a ScanQuery. I must use a SQL query to quickly filter the data out and process the 10 matching the query.

Hope it makes it more clear.
Andrey

Date: Tue, 15 Dec 2015 19:11:01 +0300
Subject: Re: Computation on NodeEntries
From: alexey.goncharuk@gmail.com
To: user@ignite.apache.org

Andrey,
I did not catch why Yakov's suggestion of data processing on per-partition basis does not work for you? I assume that during this processing the cache is not updated concurrently because otherwise the task does not make sense since there are no full cache snapshots in Ignite (yet).
To summarize what has been posted in this thread so far (Sergi, pls correct me if I am wrong):1) SQL query issued from a single node is guaranteed to get consistent results on changing topology (local should be set to false). Currently there is no 'native' way to parallelize SQL query results processing, such as limit SQL query to one partition.
2) Scan query issued for a single partition is guaranteed to get consistent results on changing topology (local should be set to false). Note that setting local=true and issuing a query locally is not guaranteed to get consistent results. 
Having said that, I see no problem with the following approach:1) Create a compute task that will create a job for each partition2) Map each partition's job to the partition's primary node3) Execute a per-partition Scan query inside the job.
In a worst-case scenario, if a job arrives on the node which has already lost the partition ownership, data will be fetched from a remote node.
You can open a ticket to fail per-partition Scan query if local flag is set to true and partition has been moved - in this case 'wrong' jobs could be failed over to correct nodes. 		 	   		  

Re: Computation on NodeEntries

Posted by Alexey Goncharuk <al...@gmail.com>.
Andrey,

I did not catch why Yakov's suggestion of data processing on per-partition
basis does not work for you? I assume that during this processing the cache
is not updated concurrently because otherwise the task does not make sense
since there are no full cache snapshots in Ignite (yet).

To summarize what has been posted in this thread so far (Sergi, pls correct
me if I am wrong):
1) SQL query issued from a single node is guaranteed to get consistent
results on changing topology (local should be set to false). Currently
there is no 'native' way to parallelize SQL query results processing, such
as limit SQL query to one partition.
2) Scan query issued for a single partition is guaranteed to get consistent
results on changing topology (local should be set to false). Note that
setting local=true and issuing a query locally is not guaranteed to get
consistent results.

Having said that, I see no problem with the following approach:
1) Create a compute task that will create a job for each partition
2) Map each partition's job to the partition's primary node
3) Execute a per-partition Scan query inside the job.

In a worst-case scenario, if a job arrives on the node which has already
lost the partition ownership, data will be fetched from a remote node.

You can open a ticket to fail per-partition Scan query if local flag is set
to true and partition has been moved - in this case 'wrong' jobs could be
failed over to correct nodes.

RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
So now we are back to square one. I'm all ears to suggestions about how does one implement the following use case in a reliable way:
- given a partitioned cache
- process cache entries on all nodes in parallel
- only the cache entries matching a query should be processed
- cluster topology changes must not cause inconsistent results

This is a fundamental use case, a particular variation of the so-called "collocation of compute and data" and at the moment I don't see how I can reliably implement it on top Ignite.

Thanks
Andrey

Date: Tue, 15 Dec 2015 11:40:48 +0300
Subject: Re: Computation on NodeEntries
From: yzhdanov@apache.org
To: user@ignite.apache.org

SQL query guarantees that the entire data set will be processed. Of course, this should be a single query, but not a sequence of broadcasted local queries.--Yakov 		 	   		  

Re: Computation on NodeEntries

Posted by Yakov Zhdanov <yz...@apache.org>.
SQL query guarantees that the entire data set will be processed. Of course,
this should be a single query, but not a sequence of broadcasted local
queries.

--Yakov

RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
Dmitriy, thanks!

The issue here is not related to the state of the partitions, but to the fact that the launch of the local queries on individual nodes is not atomic with respect to the cluster topology in any way, which makes the race described by Denis possible.

I'm looking for a correct way to handle this situation.

Regards
Andrey

From: dsetrakyan@apache.org
Date: Mon, 14 Dec 2015 16:03:50 -0800
Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org



On Mon, Dec 14, 2015 at 3:16 PM, Andrey Kornev <an...@hotmail.com> wrote:



Hey Denis,

Thanks for your reply! The race you've described is definitely possible.

However, it seems SqlQuery would also be vulnerable to the same race condition. I broadcast the tasks to all nodes and while each task is trying to start a local SqlQuery on the node a new node joins the cluster. Wouldn't I run into the same issue? 

Andrey, SqlQuery or ScanQuery should guarantee that you are running on a locked partitions which are in a complete state. If a partition has not been filled yet, e.g. due to a node just joining, then another partition, which has full state on another node, will be picked for a query. 
I'd like emphasize I'm not talking about a single node launching a query against the cluster. I'd like to process my data on each node in parallel, but only the data that matches a certain SQL query.

Thanks
Andrey

Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org
From: dmagda@gridgain.com
Date: Mon, 14 Dec 2015 10:53:04 +0300


  
    
  
  
    Hi Andrey,

    

    Please see below.

    

    On 12/13/2015 9:08 AM, Andrey Kornev
      wrote:

    
    
      Yakov,

        

        
        If partions do not migrate while they are being iterated
          over, wouldn't it then suffice to simply execute a single
          ScanQuery with its isLocal set to true? My reasoning here is
          that the scan would create an iterator for all affinity
          partitions, thus preventing their migration. If it's not the
          case, how would then a local ScanQuery behave in presence of
          topology changes?
      
    
    From what I see in the code setting ScanQuery's isLocal to 'true'
    gives an ability to iterate over all the partitions that belonged to
    a node at the time the query is started. All the partitions won't be
    moved to another node until the querys's iterator is closed.

    

    However, here I see the following issue. Imagine that your cluster
    has two nodes and you decided to iterate over all local partitions
    of two nodes and the execution sequence looks like this:

    

    1) ScanQuery with isLocal=true started executing on node A.
    All the partitions are blocked and won't be moved.

    2) Node B receives the same compute job with the ScanQuery
    in it. However because of an OS scheduler a Thread that is in charge
    of starting the query is blocked for some time. So the iterator over
    local partitions is not ready yet and the partitions are not
    blocked;

    3) Third node C joins the topology. Partitions that are
    owned by Node B may be rebalanced among node A and
    node C. 

    4) Partitions that are rebalanced from node B to node A
    won't be visited by your code because node's A iterator is already
    built while node's B iterator is constructed after the rebalancing.

    

    The issue can't happen when you specify partitions explicitly using
    Yakov's approach below. Because in the worst case in the situation
    like above a just rebalanced partition's data will be uploaded to a
    node that was initially an owner of the partition (at the time when
    you calculated partitions owners).

    

    

    
      
        

        
        Also, what's the best way to handle topology changes while
          using the SqlQuery rather than ScanQuery? Basically, it's the
          same use case, only instead of scanning the entire partition
          I'd like to first filter the cache entries using a query.
        

        
      
    
    SqlQueries will work transparently for you and guarantee to return a
    full and consistent result set even if a topology is changed while a
    query is in progress.

    

    --

    Denis

    
      
        Thanks
        Andrey
      
      _____________________________

        From: Yakov Zhdanov <yz...@apache.org>

        Sent: Friday, December 11, 2015 10:55 AM

        Subject: RE: Computation on NodeEntries

        To: <us...@ignite.apache.org>

        

        

        Partition will not migrate if local or remote
          iterator is not finished/closed.
         On Dec 11, 2015 21:05, "Andrey Kornev"
          < andrewkornev@hotmail.com>
          wrote: 

          
            
               Great suggestion! Thank you, Yakov! 

                

                Just one more question. :) Let's say the scan job is
                running node A and processing partition 42. At the same
                time, a new node B joins and partition 42 needs to be
                moved to this node. What will happen to my scan query
                that is still running on node A and iterating over the
                partition's entries? Would it complete processing the
                entire partition despite the change of ownership? Or,
                would the query terminate at some arbitrary point once
                the partition ownership transfer has completed? 

                

                Thanks a lot! 

                Andrey 

                

                
                  Date: Fri, 11 Dec 2015 16:06:16 +0300 

                  Subject: Re: Computation on NodeEntries 

                  From: yzhdanov@apache.org
                  

                  To: user@ignite.apache.org
                  

                  

                   Guys, I would do the following:
                     

                    
                     1. Map all my partitions to
                      nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
                    
                     2. Send jobs (with its list of partitions) to
                      each node using map returned on step1 
                     3. Job may be like: 
                    
                      new Runnable() {
    @Override public void run() {
        for (Integer part : parts) {
            Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
            
            // do the stuff...
        }
        
    }
};

                      This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
                    
                     

                      
                        
                           --Yakov 
                        
                      
                      

                       2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
                        

                        
                          
                             Dmitriy, 

                              

                              Given the approach you suggested below,
                              what would be your recommendation for
                              dealing with cluster topology changes
                              while the iteration is in progress? An
                              obvious one I can think of is to 

                              - somehow detect the change, 

                              - cancel the tasks on all the nodes 

                              - wait until the rebalancing is finished
                              and 

                              - restart the computation. 

                              

                              Are there any other ways? Ideally, I'd
                              like to have the "exactly-once" execution
                              semantics. 

                              

                              Thanks 

                              Andrey 

                            
                          
                        
                      
                    
                  
                
              
            
          
        
        

        

      
    
    
 		 	   		  

 		 	   		  

Re: Computation on NodeEntries

Posted by Dmitriy Setrakyan <ds...@apache.org>.
On Mon, Dec 14, 2015 at 3:16 PM, Andrey Kornev <an...@hotmail.com>
wrote:

> Hey Denis,
>
> Thanks for your reply! The race you've described is definitely possible.
>
> However, it seems SqlQuery would also be vulnerable to the same race
> condition. I broadcast the tasks to all nodes and while each task is trying
> to start a local SqlQuery on the node a new node joins the cluster.
> Wouldn't I run into the same issue?
>

Andrey, SqlQuery or ScanQuery should guarantee that you are running on a
locked partitions which are in a complete state. If a partition has not
been filled yet, e.g. due to a node just joining, then another partition,
which has full state on another node, will be picked for a query.


>
> I'd like emphasize I'm not talking about a single node launching a query
> against the cluster. I'd like to process my data on each node in parallel,
> but only the data that matches a certain SQL query.
>
> Thanks
> Andrey
>
> ------------------------------
> Subject: Re: Computation on NodeEntries
> To: user@ignite.apache.org
> From: dmagda@gridgain.com
> Date: Mon, 14 Dec 2015 10:53:04 +0300
>
>
> Hi Andrey,
>
> Please see below.
>
> On 12/13/2015 9:08 AM, Andrey Kornev wrote:
>
> Yakov,
>
> If partions do not migrate while they are being iterated over, wouldn't it
> then suffice to simply execute a single ScanQuery with its isLocal set to
> true? My reasoning here is that the scan would create an iterator for all
> affinity partitions, thus preventing their migration. If it's not the case,
> how would then a local ScanQuery behave in presence of topology changes?
>
> From what I see in the code setting ScanQuery's isLocal to 'true' gives an
> ability to iterate over all the partitions that belonged to a node at the
> time the query is started. All the partitions won't be moved to another
> node until the querys's iterator is closed.
>
> However, here I see the following issue. Imagine that your cluster has two
> nodes and you decided to iterate over all local partitions of two nodes and
> the execution sequence looks like this:
>
> 1) ScanQuery with isLocal=true started executing on *node A*. All the
> partitions are blocked and won't be moved.
> 2) *Node B* receives the same compute job with the ScanQuery in it.
> However because of an OS scheduler a Thread that is in charge of starting
> the query is blocked for some time. So the iterator over local partitions
> is not ready yet and the partitions are not blocked;
> 3) Third *node C* joins the topology. Partitions that are owned by *Node
> B *may be rebalanced among *node A* and *node C*.
> 4) Partitions that are rebalanced from node B to *node A* won't be
> visited by your code because node's A iterator is already built while
> node's B iterator is constructed after the rebalancing.
>
> The issue can't happen when you specify partitions explicitly using
> Yakov's approach below. Because in the worst case in the situation like
> above a just rebalanced partition's data will be uploaded to a node that
> was initially an owner of the partition (at the time when you calculated
> partitions owners).
>
>
>
> Also, what's the best way to handle topology changes while using the
> SqlQuery rather than ScanQuery? Basically, it's the same use case, only
> instead of scanning the entire partition I'd like to first filter the cache
> entries using a query.
>
> SqlQueries will work transparently for you and guarantee to return a full
> and consistent result set even if a topology is changed while a query is in
> progress.
>
> --
> Denis
>
> Thanks
> Andrey
> _____________________________
> From: Yakov Zhdanov <yz...@apache.org>
> Sent: Friday, December 11, 2015 10:55 AM
> Subject: RE: Computation on NodeEntries
> To: <us...@ignite.apache.org>
>
>
> Partition will not migrate if local or remote iterator is not
> finished/closed.
> On Dec 11, 2015 21:05, "Andrey Kornev" < andrewkornev@hotmail.com> wrote:
>
> Great suggestion! Thank you, Yakov!
>
> Just one more question. :) Let's say the scan job is running node A and
> processing partition 42. At the same time, a new node B joins and partition
> 42 needs to be moved to this node. What will happen to my scan query that
> is still running on node A and iterating over the partition's entries?
> Would it complete processing the entire partition despite the change of
> ownership? Or, would the query terminate at some arbitrary point once the
> partition ownership transfer has completed?
>
> Thanks a lot!
> Andrey
>
> ------------------------------
> Date: Fri, 11 Dec 2015 16:06:16 +0300
> Subject: Re: Computation on NodeEntries
> From: yzhdanov@apache.org
> To: user@ignite.apache.org
>
> Guys, I would do the following:
>
> 1. Map all my partitions to
> nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
> 2. Send jobs (with its list of partitions) to each node using map returned
> on step1
> 3. Job may be like:
>
> new Runnable() {
>     @Override public void run() {
>         for (Integer part : parts) {
>             Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
>
>             // do the stuff...        }
>
>     }
> };
>
> This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
>
>
> --Yakov
>
> 2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
>
> Dmitriy,
>
> Given the approach you suggested below, what would be your recommendation
> for dealing with cluster topology changes while the iteration is in
> progress? An obvious one I can think of is to
> - somehow detect the change,
> - cancel the tasks on all the nodes
> - wait until the rebalancing is finished and
> - restart the computation.
>
> Are there any other ways? Ideally, I'd like to have the "exactly-once"
> execution semantics.
>
> Thanks
> Andrey
>
>
>
>
>

RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
Hey Denis,

Thanks for your reply! The race you've described is definitely possible.

However, it seems SqlQuery would also be vulnerable to the same race condition. I broadcast the tasks to all nodes and while each task is trying to start a local SqlQuery on the node a new node joins the cluster. Wouldn't I run into the same issue? 

I'd like emphasize I'm not talking about a single node launching a query against the cluster. I'd like to process my data on each node in parallel, but only the data that matches a certain SQL query.

Thanks
Andrey

Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org
From: dmagda@gridgain.com
Date: Mon, 14 Dec 2015 10:53:04 +0300


  
    
  
  
    Hi Andrey,

    

    Please see below.

    

    On 12/13/2015 9:08 AM, Andrey Kornev
      wrote:

    
    
      Yakov,

        

        
        If partions do not migrate while they are being iterated
          over, wouldn't it then suffice to simply execute a single
          ScanQuery with its isLocal set to true? My reasoning here is
          that the scan would create an iterator for all affinity
          partitions, thus preventing their migration. If it's not the
          case, how would then a local ScanQuery behave in presence of
          topology changes?
      
    
    From what I see in the code setting ScanQuery's isLocal to 'true'
    gives an ability to iterate over all the partitions that belonged to
    a node at the time the query is started. All the partitions won't be
    moved to another node until the querys's iterator is closed.

    

    However, here I see the following issue. Imagine that your cluster
    has two nodes and you decided to iterate over all local partitions
    of two nodes and the execution sequence looks like this:

    

    1) ScanQuery with isLocal=true started executing on node A.
    All the partitions are blocked and won't be moved.

    2) Node B receives the same compute job with the ScanQuery
    in it. However because of an OS scheduler a Thread that is in charge
    of starting the query is blocked for some time. So the iterator over
    local partitions is not ready yet and the partitions are not
    blocked;

    3) Third node C joins the topology. Partitions that are
    owned by Node B may be rebalanced among node A and
    node C. 

    4) Partitions that are rebalanced from node B to node A
    won't be visited by your code because node's A iterator is already
    built while node's B iterator is constructed after the rebalancing.

    

    The issue can't happen when you specify partitions explicitly using
    Yakov's approach below. Because in the worst case in the situation
    like above a just rebalanced partition's data will be uploaded to a
    node that was initially an owner of the partition (at the time when
    you calculated partitions owners).

    

    

    
      
        

        
        Also, what's the best way to handle topology changes while
          using the SqlQuery rather than ScanQuery? Basically, it's the
          same use case, only instead of scanning the entire partition
          I'd like to first filter the cache entries using a query.
        

        
      
    
    SqlQueries will work transparently for you and guarantee to return a
    full and consistent result set even if a topology is changed while a
    query is in progress.

    

    --

    Denis

    
      
        Thanks
        Andrey
      
      _____________________________

        From: Yakov Zhdanov <yz...@apache.org>

        Sent: Friday, December 11, 2015 10:55 AM

        Subject: RE: Computation on NodeEntries

        To: <us...@ignite.apache.org>

        

        

        Partition will not migrate if local or remote
          iterator is not finished/closed.
         On Dec 11, 2015 21:05, "Andrey Kornev"
          < andrewkornev@hotmail.com>
          wrote: 

          
            
               Great suggestion! Thank you, Yakov! 

                

                Just one more question. :) Let's say the scan job is
                running node A and processing partition 42. At the same
                time, a new node B joins and partition 42 needs to be
                moved to this node. What will happen to my scan query
                that is still running on node A and iterating over the
                partition's entries? Would it complete processing the
                entire partition despite the change of ownership? Or,
                would the query terminate at some arbitrary point once
                the partition ownership transfer has completed? 

                

                Thanks a lot! 

                Andrey 

                

                
                  Date: Fri, 11 Dec 2015 16:06:16 +0300 

                  Subject: Re: Computation on NodeEntries 

                  From: yzhdanov@apache.org
                  

                  To: user@ignite.apache.org
                  

                  

                   Guys, I would do the following:
                     

                    
                     1. Map all my partitions to
                      nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
                    
                     2. Send jobs (with its list of partitions) to
                      each node using map returned on step1 
                     3. Job may be like: 
                    
                      new Runnable() {
    @Override public void run() {
        for (Integer part : parts) {
            Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
            
            // do the stuff...
        }
        
    }
};

                      This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
                    
                     

                      
                        
                           --Yakov 
                        
                      
                      

                       2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
                        

                        
                          
                             Dmitriy, 

                              

                              Given the approach you suggested below,
                              what would be your recommendation for
                              dealing with cluster topology changes
                              while the iteration is in progress? An
                              obvious one I can think of is to 

                              - somehow detect the change, 

                              - cancel the tasks on all the nodes 

                              - wait until the rebalancing is finished
                              and 

                              - restart the computation. 

                              

                              Are there any other ways? Ideally, I'd
                              like to have the "exactly-once" execution
                              semantics. 

                              

                              Thanks 

                              Andrey 

                            
                          
                        
                      
                    
                  
                
              
            
          
        
        

        

      
    
    
 		 	   		  

Re: Computation on NodeEntries

Posted by Denis Magda <dm...@gridgain.com>.
Hi Andrey,

Please see below.

On 12/13/2015 9:08 AM, Andrey Kornev wrote:
> Yakov,
>
> If partions do not migrate while they are being iterated over, 
> wouldn't it then suffice to simply execute a single ScanQuery with its 
> isLocal set to true? My reasoning here is that the scan would create 
> an iterator for all affinity partitions, thus preventing their 
> migration. If it's not the case, how would then a local ScanQuery 
> behave in presence of topology changes?
 From what I see in the code setting ScanQuery's isLocal to 'true' gives 
an ability to iterate over all the partitions that belonged to a node at 
the time the query is started. All the partitions won't be moved to 
another node until the querys's iterator is closed.

However, here I see the following issue. Imagine that your cluster has 
two nodes and you decided to iterate over all local partitions of two 
nodes and the execution sequence looks like this:

1) ScanQuery with isLocal=true started executing on *node A*. All the 
partitions are blocked and won't be moved.
2) *Node B* receives the same compute job with the ScanQuery in it. 
However because of an OS scheduler a Thread that is in charge of 
starting the query is blocked for some time. So the iterator over local 
partitions is not ready yet and the partitions are not blocked;
3) Third *node C* joins the topology. Partitions that are owned by *Node 
B *may be rebalanced among *node A* and *node C*.
4) Partitions that are rebalanced from node B to *node A* won't be 
visited by your code because node's A iterator is already built while 
node's B iterator is constructed after the rebalancing.

The issue can't happen when you specify partitions explicitly using 
Yakov's approach below. Because in the worst case in the situation like 
above a just rebalanced partition's data will be uploaded to a node that 
was initially an owner of the partition (at the time when you calculated 
partitions owners).


>
> Also, what's the best way to handle topology changes while using the 
> SqlQuery rather than ScanQuery? Basically, it's the same use case, 
> only instead of scanning the entire partition I'd like to first filter 
> the cache entries using a query.
>
SqlQueries will work transparently for you and guarantee to return a 
full and consistent result set even if a topology is changed while a 
query is in progress.

--
Denis
> Thanks
> Andrey
> _____________________________
> From: Yakov Zhdanov <yzhdanov@apache.org <ma...@apache.org>>
> Sent: Friday, December 11, 2015 10:55 AM
> Subject: RE: Computation on NodeEntries
> To: <user@ignite.apache.org <ma...@ignite.apache.org>>
>
>
> Partition will not migrate if local or remote iterator is not 
> finished/closed.
>
> On Dec 11, 2015 21:05, "Andrey Kornev" < andrewkornev@hotmail.com 
> <ma...@hotmail.com>> wrote:
>
>     Great suggestion! Thank you, Yakov!
>
>     Just one more question. :) Let's say the scan job is running node
>     A and processing partition 42. At the same time, a new node B
>     joins and partition 42 needs to be moved to this node. What will
>     happen to my scan query that is still running on node A and
>     iterating over the partition's entries? Would it complete
>     processing the entire partition despite the change of ownership?
>     Or, would the query terminate at some arbitrary point once the
>     partition ownership transfer has completed?
>
>     Thanks a lot!
>     Andrey
>
>     ------------------------------------------------------------------------
>     Date: Fri, 11 Dec 2015 16:06:16 +0300
>     Subject: Re: Computation on NodeEntries
>     From: yzhdanov@apache.org <ma...@apache.org>
>     To: user@ignite.apache.org <ma...@ignite.apache.org>
>
>     Guys, I would do the following:
>
>     1. Map all my partitions to
>     nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
>     2. Send jobs (with its list of partitions) to each node using map
>     returned on step1
>     3. Job may be like:
>
>     new Runnable() {
>          @Override public void run() {
>              for (Integer part : parts) {
>                  Iterator<Cache.Entry<Object, Object>> it =cache.query(new ScanQuery<>(part)).iterator();
>                  
>                  // do the stuff... }
>              
>          }
>     };
>
>     This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
>
>
>     --Yakov
>
>     2015-12-11 2:13 GMT+03:00 Andrey Kornev <andrewkornev@hotmail.com
>     <ma...@hotmail.com>>:
>
>         Dmitriy,
>
>         Given the approach you suggested below, what would be your
>         recommendation for dealing with cluster topology changes while
>         the iteration is in progress? An obvious one I can think of is to
>         - somehow detect the change,
>         - cancel the tasks on all the nodes
>         - wait until the rebalancing is finished and
>         - restart the computation.
>
>         Are there any other ways? Ideally, I'd like to have the
>         "exactly-once" execution semantics.
>
>         Thanks
>         Andrey
>
>
>


RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
Yakov,

If partions do not migrate while they are being iterated over, wouldn't it then suffice to simply execute a single ScanQuery with its isLocal set to true? My reasoning here is that the scan would create an iterator for all affinity partitions, thus preventing their migration. If it's not the case, how would then a local ScanQuery behave in presence of topology changes?
Also, what's the best way to handle topology changes while using the SqlQuery rather than ScanQuery? Basically, it's the same use case, only instead of scanning the entire partition I'd like to first filter the cache entries using a query.
ThanksAndrey
    _____________________________
From: Yakov Zhdanov <yz...@apache.org>
Sent: Friday, December 11, 2015 10:55 AM
Subject: RE: Computation on NodeEntries
To:  <us...@ignite.apache.org>


    

Partition will not migrate if local or remote iterator is not finished/closed.      On Dec 11, 2015 21:05, "Andrey Kornev" <   andrewkornev@hotmail.com> wrote:   
                   Great suggestion! Thank you, Yakov!      
      
Just one more question. :) Let's say the scan job is running node A and processing partition 42. At the same time, a new node B joins and partition 42 needs to be moved to this node. What will happen to my scan query that is still running on node A and iterating over the partition's entries? Would it complete processing the entire partition despite the change of ownership? Or, would the query terminate at some arbitrary point once the partition ownership transfer has completed?      
      
Thanks a lot!      
Andrey      
      
             Date: Fri, 11 Dec 2015 16:06:16 +0300       
Subject: Re: Computation on NodeEntries       
From:        yzhdanov@apache.org       
To:        user@ignite.apache.org       
       
               Guys, I would do the following:                 
                         1. Map all my partitions to nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes                         2. Send jobs (with its list of partitions) to each node using map returned on step1                         3. Job may be like:                         new Runnable() {
    @Override public void run() {
        for (Integer part : parts) {
            Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
            
            // do the stuff...
        }
        
    }
};
         This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.                         
                                          --Yakov                                        
                   2015-12-11 2:13 GMT+03:00 Andrey Kornev           <an...@hotmail.com>:          
                                               Dmitriy,             
             
Given the approach you suggested below, what would be your recommendation for dealing with cluster topology changes while the iteration is in progress? An obvious one I can think of is to              
- somehow detect the change,              
- cancel the tasks on all the nodes              
- wait until the rebalancing is finished and             
- restart the computation.             
             
Are there any other ways? Ideally, I'd like to have the "exactly-once" execution semantics.             
             
Thanks             
Andrey             
                                                                                 


  

RE: Computation on NodeEntries

Posted by Yakov Zhdanov <yz...@apache.org>.
Partition will not migrate if local or remote iterator is not
finished/closed.
On Dec 11, 2015 21:05, "Andrey Kornev" <an...@hotmail.com> wrote:

> Great suggestion! Thank you, Yakov!
>
> Just one more question. :) Let's say the scan job is running node A and
> processing partition 42. At the same time, a new node B joins and partition
> 42 needs to be moved to this node. What will happen to my scan query that
> is still running on node A and iterating over the partition's entries?
> Would it complete processing the entire partition despite the change of
> ownership? Or, would the query terminate at some arbitrary point once the
> partition ownership transfer has completed?
>
> Thanks a lot!
> Andrey
>
> ------------------------------
> Date: Fri, 11 Dec 2015 16:06:16 +0300
> Subject: Re: Computation on NodeEntries
> From: yzhdanov@apache.org
> To: user@ignite.apache.org
>
> Guys, I would do the following:
>
> 1. Map all my partitions to
> nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
> 2. Send jobs (with its list of partitions) to each node using map returned
> on step1
> 3. Job may be like:
>
> new Runnable() {
>     @Override public void run() {
>         for (Integer part : parts) {
>             Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
>
>             // do the stuff...
>         }
>
>     }
> };
>
> This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.
>
>
> --Yakov
>
> 2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:
>
> Dmitriy,
>
> Given the approach you suggested below, what would be your recommendation
> for dealing with cluster topology changes while the iteration is in
> progress? An obvious one I can think of is to
> - somehow detect the change,
> - cancel the tasks on all the nodes
> - wait until the rebalancing is finished and
> - restart the computation.
>
> Are there any other ways? Ideally, I'd like to have the "exactly-once"
> execution semantics.
>
> Thanks
> Andrey
>
>

RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
Great suggestion! Thank you, Yakov!

Just one more question. :) Let's say the scan job is running node A and processing partition 42. At the same time, a new node B joins and partition 42 needs to be moved to this node. What will happen to my scan query that is still running on node A and iterating over the partition's entries? Would it complete processing the entire partition despite the change of ownership? Or, would the query terminate at some arbitrary point once the partition ownership transfer has completed?

Thanks a lot!
Andrey

Date: Fri, 11 Dec 2015 16:06:16 +0300
Subject: Re: Computation on NodeEntries
From: yzhdanov@apache.org
To: user@ignite.apache.org

Guys, I would do the following:
1. Map all my partitions to nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes2. Send jobs (with its list of partitions) to each node using map returned on step13. Job may be like:new Runnable() {
    @Override public void run() {
        for (Integer part : parts) {
            Iterator<Cache.Entry<Object, Object>> it = cache.query(new ScanQuery<>(part)).iterator();
            
            // do the stuff...
        }
        
    }
};
This may result in network calls for some worst cases when topology changes under your feet, but even in this case this should work.--Yakov

2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:



Dmitriy,

Given the approach you suggested below, what would be your recommendation for dealing with cluster topology changes while the iteration is in progress? An obvious one I can think of is to 
- somehow detect the change, 
- cancel the tasks on all the nodes 
- wait until the rebalancing is finished and
- restart the computation.

Are there any other ways? Ideally, I'd like to have the "exactly-once" execution semantics.

Thanks
Andrey
 		 	   		  

Re: Computation on NodeEntries

Posted by Yakov Zhdanov <yz...@apache.org>.
Guys, I would do the following:

1. Map all my partitions to
nodes: org.apache.ignite.cache.affinity.Affinity#mapPartitionsToNodes
2. Send jobs (with its list of partitions) to each node using map returned
on step1
3. Job may be like:

new Runnable() {
    @Override public void run() {
        for (Integer part : parts) {
            Iterator<Cache.Entry<Object, Object>> it = cache.query(new
ScanQuery<>(part)).iterator();

            // do the stuff...
        }

    }
};

This may result in network calls for some worst cases when topology
changes under your feet, but even in this case this should work.


--Yakov

2015-12-11 2:13 GMT+03:00 Andrey Kornev <an...@hotmail.com>:

> Dmitriy,
>
> Given the approach you suggested below, what would be your recommendation
> for dealing with cluster topology changes while the iteration is in
> progress? An obvious one I can think of is to
> - somehow detect the change,
> - cancel the tasks on all the nodes
> - wait until the rebalancing is finished and
> - restart the computation.
>
> Are there any other ways? Ideally, I'd like to have the "exactly-once"
> execution semantics.
>
> Thanks
> Andrey
>

RE: Computation on NodeEntries

Posted by Andrey Kornev <an...@hotmail.com>.
Dmitriy,

Given the approach you suggested below, what would be your recommendation for dealing with cluster topology changes while the iteration is in progress? An obvious one I can think of is to 
- somehow detect the change, 
- cancel the tasks on all the nodes 
- wait until the rebalancing is finished and
- restart the computation.

Are there any other ways? Ideally, I'd like to have the "exactly-once" execution semantics.

Thanks
Andrey

From: dsetrakyan@apache.org
Date: Wed, 9 Dec 2015 23:47:08 -0800
Subject: Re: Computation on NodeEntries
To: user@ignite.apache.org

To get an iterator over the local entries, you can use this method:
IgniteCache.localEntries(CachePeekMode.PRIMARY)
The CachePeekMode.PRIMARY allows to iterate only over the primary copies, ignoring the backups.
You can broadcast a computation to the cache nodes using standard IgniteCompute API and iterate through local entries inside your computation logic.
D.
On Wed, Dec 9, 2015 at 2:43 PM, Camilo Alvarez <ca...@gmail.com> wrote:
Hi ,
I want to know if it is possible to execute a computation only on the entries of the node?. I want to add apox. 200 million objects in a distributed cache and then execute a computation in each node, this computation should work only whith the local entries creating a local index. I am using the default hash algorithm.

Regards,
Camilo

 		 	   		  

Re: Computation on NodeEntries

Posted by Dmitriy Setrakyan <ds...@apache.org>.
To get an iterator over the local entries, you can use this method:

IgniteCache.localEntries(CachePeekMode.PRIMARY)

The CachePeekMode.PRIMARY allows to iterate only over the primary copies,
ignoring the backups.

You can broadcast a computation to the cache nodes using standard
IgniteCompute API and iterate through local entries inside your computation
logic.

D.

On Wed, Dec 9, 2015 at 2:43 PM, Camilo Alvarez <ca...@gmail.com> wrote:

> Hi ,
>
> I want to know if it is possible to execute a computation only on the
> entries of the node?. I want to add apox. 200 million objects in a
> distributed cache and then execute a computation in each node, this
> computation should work only whith the local entries creating a local
> index. I am using the default hash algorithm.
>
>
> Regards,
>
> Camilo
>