You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by Raymond Wilson <ra...@trimble.com> on 2018/04/23 01:10:40 UTC

Using a cache as an affinity co-located processing buffer in Ignite.Net

All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Are cache interceptors an internal Ignite construct not available in the
Java client?



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Friday, May 18, 2018 1:53 AM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Cache interceptors are not available in Ignite.NET and there are no plans
to add them.



On Tue, May 15, 2018 at 10:37 PM, Raymond Wilson <ra...@trimble.com>
wrote:

Hi Dmitriy,



My question regarding cache interceptor availability in .Net is still
outstanding.



I have working code using a continuous query, but am interested in cache
interceptor if this is a better solution.



Thanks.

Raymond.



*From:* Dmitry Pavlov [mailto:dpavlov.spb@gmail.com]
*Sent:* Wednesday, May 16, 2018 3:45 AM
*To:* user@ignite.apache.org
*Cc:* dev@ignite.apache.org


*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



Was this question answered?



Sincerely,

Dmitriy Pavlov



вт, 1 мая 2018 г. в 0:20, Raymond Wilson <ra...@trimble.com>:

Cross posting to dev list for comment on cache interceptor availability on
Ignite .Net client.

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 10:35 PM
To: 'user@ignite.apache.org' <us...@ignite.apache.org>
Subject: RE: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Further investigation shows CacheInterceptor is not a part of the
Ignite.NET API.

Is there a plan/ticket for this to be done?

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 1:08 PM
To: user@ignite.apache.org
Subject: Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Val,

Are the interceptors invoked in the affinity co-located context of the
item? The help is a little unclear on that.

Thanks,
Raymond.

Sent from my iPhone

> On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
wrote:
>
> Raymond,
>
> If you go with approach I described above, I would actually recommend to
use
> interceptors:
>
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
CacheInterceptor.html
>
> Continuous query seems to be a bit cumbersome for this.
>
> -Val
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
Cache interceptors are not available in Ignite.NET and there are no plans
to add them.

On Tue, May 15, 2018 at 10:37 PM, Raymond Wilson <raymond_wilson@trimble.com
> wrote:

> Hi Dmitriy,
>
>
>
> My question regarding cache interceptor availability in .Net is still
> outstanding.
>
>
>
> I have working code using a continuous query, but am interested in cache
> interceptor if this is a better solution.
>
>
>
> Thanks.
>
> Raymond.
>
>
>
> *From:* Dmitry Pavlov [mailto:dpavlov.spb@gmail.com]
> *Sent:* Wednesday, May 16, 2018 3:45 AM
> *To:* user@ignite.apache.org
> *Cc:* dev@ignite.apache.org
>
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> Was this question answered?
>
>
>
> Sincerely,
>
> Dmitriy Pavlov
>
>
>
> вт, 1 мая 2018 г. в 0:20, Raymond Wilson <ra...@trimble.com>:
>
> Cross posting to dev list for comment on cache interceptor availability on
> Ignite .Net client.
>
> -----Original Message-----
> From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
> Sent: Saturday, April 28, 2018 10:35 PM
> To: 'user@ignite.apache.org' <us...@ignite.apache.org>
> Subject: RE: Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
> Further investigation shows CacheInterceptor is not a part of the
> Ignite.NET API.
>
> Is there a plan/ticket for this to be done?
>
> -----Original Message-----
> From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
> Sent: Saturday, April 28, 2018 1:08 PM
> To: user@ignite.apache.org
> Subject: Re: Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
> Val,
>
> Are the interceptors invoked in the affinity co-located context of the
> item? The help is a little unclear on that.
>
> Thanks,
> Raymond.
>
> Sent from my iPhone
>
> > On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
> wrote:
> >
> > Raymond,
> >
> > If you go with approach I described above, I would actually recommend to
> use
> > interceptors:
> >
> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
> CacheInterceptor.html
> >
> > Continuous query seems to be a bit cumbersome for this.
> >
> > -Val
> >
> >
> >
> > --
> > Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Hi Dmitriy,



My question regarding cache interceptor availability in .Net is still
outstanding.



I have working code using a continuous query, but am interested in cache
interceptor if this is a better solution.



Thanks.

Raymond.



*From:* Dmitry Pavlov [mailto:dpavlov.spb@gmail.com]
*Sent:* Wednesday, May 16, 2018 3:45 AM
*To:* user@ignite.apache.org
*Cc:* dev@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



Was this question answered?



Sincerely,

Dmitriy Pavlov



вт, 1 мая 2018 г. в 0:20, Raymond Wilson <ra...@trimble.com>:

Cross posting to dev list for comment on cache interceptor availability on
Ignite .Net client.

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 10:35 PM
To: 'user@ignite.apache.org' <us...@ignite.apache.org>
Subject: RE: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Further investigation shows CacheInterceptor is not a part of the
Ignite.NET API.

Is there a plan/ticket for this to be done?

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 1:08 PM
To: user@ignite.apache.org
Subject: Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Val,

Are the interceptors invoked in the affinity co-located context of the
item? The help is a little unclear on that.

Thanks,
Raymond.

Sent from my iPhone

> On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
wrote:
>
> Raymond,
>
> If you go with approach I described above, I would actually recommend to
use
> interceptors:
>
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
CacheInterceptor.html
>
> Continuous query seems to be a bit cumbersome for this.
>
> -Val
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Hi Dmitriy,



My question regarding cache interceptor availability in .Net is still
outstanding.



I have working code using a continuous query, but am interested in cache
interceptor if this is a better solution.



Thanks.

Raymond.



*From:* Dmitry Pavlov [mailto:dpavlov.spb@gmail.com]
*Sent:* Wednesday, May 16, 2018 3:45 AM
*To:* user@ignite.apache.org
*Cc:* dev@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



Was this question answered?



Sincerely,

Dmitriy Pavlov



вт, 1 мая 2018 г. в 0:20, Raymond Wilson <ra...@trimble.com>:

Cross posting to dev list for comment on cache interceptor availability on
Ignite .Net client.

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 10:35 PM
To: 'user@ignite.apache.org' <us...@ignite.apache.org>
Subject: RE: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Further investigation shows CacheInterceptor is not a part of the
Ignite.NET API.

Is there a plan/ticket for this to be done?

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 1:08 PM
To: user@ignite.apache.org
Subject: Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Val,

Are the interceptors invoked in the affinity co-located context of the
item? The help is a little unclear on that.

Thanks,
Raymond.

Sent from my iPhone

> On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
wrote:
>
> Raymond,
>
> If you go with approach I described above, I would actually recommend to
use
> interceptors:
>
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
CacheInterceptor.html
>
> Continuous query seems to be a bit cumbersome for this.
>
> -Val
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Dmitry Pavlov <dp...@gmail.com>.
Hi Raymond,

Was this question answered?

Sincerely,
Dmitriy Pavlov

вт, 1 мая 2018 г. в 0:20, Raymond Wilson <ra...@trimble.com>:

> Cross posting to dev list for comment on cache interceptor availability on
> Ignite .Net client.
>
> -----Original Message-----
> From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
> Sent: Saturday, April 28, 2018 10:35 PM
> To: 'user@ignite.apache.org' <us...@ignite.apache.org>
> Subject: RE: Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
> Further investigation shows CacheInterceptor is not a part of the
> Ignite.NET API.
>
> Is there a plan/ticket for this to be done?
>
> -----Original Message-----
> From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
> Sent: Saturday, April 28, 2018 1:08 PM
> To: user@ignite.apache.org
> Subject: Re: Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
> Val,
>
> Are the interceptors invoked in the affinity co-located context of the
> item? The help is a little unclear on that.
>
> Thanks,
> Raymond.
>
> Sent from my iPhone
>
> > On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
> wrote:
> >
> > Raymond,
> >
> > If you go with approach I described above, I would actually recommend to
> use
> > interceptors:
> >
> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
> CacheInterceptor.html
> <https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/CacheInterceptor.html>
> >
> > Continuous query seems to be a bit cumbersome for this.
> >
> > -Val
> >
> >
> >
> > --
> > Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Dmitry Pavlov <dp...@gmail.com>.
Hi Raymond,

Was this question answered?

Sincerely,
Dmitriy Pavlov

вт, 1 мая 2018 г. в 0:20, Raymond Wilson <ra...@trimble.com>:

> Cross posting to dev list for comment on cache interceptor availability on
> Ignite .Net client.
>
> -----Original Message-----
> From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
> Sent: Saturday, April 28, 2018 10:35 PM
> To: 'user@ignite.apache.org' <us...@ignite.apache.org>
> Subject: RE: Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
> Further investigation shows CacheInterceptor is not a part of the
> Ignite.NET API.
>
> Is there a plan/ticket for this to be done?
>
> -----Original Message-----
> From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
> Sent: Saturday, April 28, 2018 1:08 PM
> To: user@ignite.apache.org
> Subject: Re: Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
> Val,
>
> Are the interceptors invoked in the affinity co-located context of the
> item? The help is a little unclear on that.
>
> Thanks,
> Raymond.
>
> Sent from my iPhone
>
> > On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
> wrote:
> >
> > Raymond,
> >
> > If you go with approach I described above, I would actually recommend to
> use
> > interceptors:
> >
> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
> CacheInterceptor.html
> <https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/CacheInterceptor.html>
> >
> > Continuous query seems to be a bit cumbersome for this.
> >
> > -Val
> >
> >
> >
> > --
> > Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Cross posting to dev list for comment on cache interceptor availability on
Ignite .Net client.

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 10:35 PM
To: 'user@ignite.apache.org' <us...@ignite.apache.org>
Subject: RE: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Further investigation shows CacheInterceptor is not a part of the
Ignite.NET API.

Is there a plan/ticket for this to be done?

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 1:08 PM
To: user@ignite.apache.org
Subject: Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Val,

Are the interceptors invoked in the affinity co-located context of the
item? The help is a little unclear on that.

Thanks,
Raymond.

Sent from my iPhone

> On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
wrote:
>
> Raymond,
>
> If you go with approach I described above, I would actually recommend to
use
> interceptors:
>
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
CacheInterceptor.html
>
> Continuous query seems to be a bit cumbersome for this.
>
> -Val
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Cross posting to dev list for comment on cache interceptor availability on
Ignite .Net client.

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 10:35 PM
To: 'user@ignite.apache.org' <us...@ignite.apache.org>
Subject: RE: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Further investigation shows CacheInterceptor is not a part of the
Ignite.NET API.

Is there a plan/ticket for this to be done?

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 1:08 PM
To: user@ignite.apache.org
Subject: Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Val,

Are the interceptors invoked in the affinity co-located context of the
item? The help is a little unclear on that.

Thanks,
Raymond.

Sent from my iPhone

> On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
wrote:
>
> Raymond,
>
> If you go with approach I described above, I would actually recommend to
use
> interceptors:
>
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
CacheInterceptor.html
>
> Continuous query seems to be a bit cumbersome for this.
>
> -Val
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Further investigation shows CacheInterceptor is not a part of the
Ignite.NET API.

Is there a plan/ticket for this to be done?

-----Original Message-----
From: Raymond Wilson [mailto:raymond_wilson@trimble.com]
Sent: Saturday, April 28, 2018 1:08 PM
To: user@ignite.apache.org
Subject: Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Val,

Are the interceptors invoked in the affinity co-located context of the
item? The help is a little unclear on that.

Thanks,
Raymond.

Sent from my iPhone

> On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com>
wrote:
>
> Raymond,
>
> If you go with approach I described above, I would actually recommend to
use
> interceptors:
>
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/
CacheInterceptor.html
>
> Continuous query seems to be a bit cumbersome for this.
>
> -Val
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Val,

Are the interceptors invoked in the affinity co-located context of the item? The help is a little unclear on that. 

Thanks,
Raymond.

Sent from my iPhone

> On 28/04/2018, at 12:12 PM, vkulichenko <va...@gmail.com> wrote:
> 
> Raymond,
> 
> If you go with approach I described above, I would actually recommend to use
> interceptors:
> https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/CacheInterceptor.html
> 
> Continuous query seems to be a bit cumbersome for this.
> 
> -Val
> 
> 
> 
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by vkulichenko <va...@gmail.com>.
Raymond,

If you go with approach I described above, I would actually recommend to use
interceptors:
https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/cache/CacheInterceptor.html

Continuous query seems to be a bit cumbersome for this.

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
If I use local continuous queries can't I omit the remote filter and just use the local delivery handler for the continuous query?

Sent from my iPhone

> On 28/04/2018, at 9:43 AM, vkulichenko <va...@gmail.com> wrote:
> 
> Raymond,
> 
> It sounds like you want to run certain computation on every data update in
> the cache, is that right?
> 
> To achieve that you can use local continuous queries, but:
> - Remote filter would be executed on both primary and backup, so computation
> will be executed more than once.
> - You can filter out by primary flag so that execution happens once, but in
> this case there is a chance some computation would NEVER execute in case of
> failure.
> 
> One of the ways to go around that is to have a 'status' field in the cache
> object that indicates whether computation for this object was completed or
> not. Then, if one of the nodes dies, you can run a query to select all
> unfinished jobs and resubmit them. This way duplicated computation would
> happen only when topology changes, which on stable topology executions will
> happen only on primary nodes.
> 
> -Val
> 
> 
> 
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by vkulichenko <va...@gmail.com>.
Raymond,

It sounds like you want to run certain computation on every data update in
the cache, is that right?

To achieve that you can use local continuous queries, but:
- Remote filter would be executed on both primary and backup, so computation
will be executed more than once.
- You can filter out by primary flag so that execution happens once, but in
this case there is a chance some computation would NEVER execute in case of
failure.

One of the ways to go around that is to have a 'status' field in the cache
object that indicates whether computation for this object was completed or
not. Then, if one of the nodes dies, you can run a query to select all
unfinished jobs and resubmit them. This way duplicated computation would
happen only when topology changes, which on stable topology executions will
happen only on primary nodes.

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Val,

What I really want is a .Net implementation of the Ignite distributed queue, assuming co-located server processing can pull members off the queue that are co-located with that node (yeah, that's probably mangling queue semantics a bit, but not that badly :) )

I have previously made a POC code implementation where a service is deployed to every node and runs a continuous query against the local members of the cache (this one does not abuse remote filters). Ie: one CQ per node. This approach requires a single context to issue the service deployment (which goes back to the best practices question)

I have a second implementation where a single context issues the CQ and relies on remote filters to deal with the cache entries. To be fair, the remote filter just shunts the item into another class which asynchronously processes them (so I don't think there is a significant locking issue), though this introduces the need for a static singleton to handle the dual remote filter contexts for the scan query and continuous query aspects of the CQ.

The exactly once guarantee differences are food for thought. The processing is idempotent, but still not desirable to process things more than once if not needed. 

Sent from my iPhone

> On 27/04/2018, at 5:49 AM, vkulichenko <va...@gmail.com> wrote:
> 
> Raymond,
> 
> If you want to have a single continuous query on one of the server nodes,
> then I think singleton service is the best option to achieve that. It will
> not only guarantee that there is one query at a time, but will also make
> sure to redeploy it in case of failure.
> 
> Also I would be accurate with doing processing in the remote filter. Filter
> is not designed for this, it's designed to do the filtering of updates
> before sending them to listening node. Major pitfall here is that it's
> invoked within entry lock, so any heavy processing would block the entry.
> Moreover, synchronous cache or other operations can cause deadlocks and/or
> thread starvation. Finally, keep in mind that filter can be invoked multiple
> times (at least for primary and backups, sometimes even more in case of
> failures). There is no exactly-once guarantee for remote filter, it's only
> applied to local listener.
> 
> -Val
> 
> 
> 
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Val,

Thanks for the feedback.

Essentially what I really want in Ignite.Net version of the Ignite
distributed Queue with local consumers on each server reading and
processing just the elements that server is responsible for (which may be
a slight mangling of queue semantics...)

I have written a code POC that does create a CQ on each server node and
restricts itself to reading local elements for the cache. This doesn't
abuse the remote filters so much, but I have been pondering the best
practice for establishing the service initially from a practical
perspective.

With the single CQ approach, the remote filters are really just shunting
the cache items into another class that performs asynchronous processing
of them, so locking should not be a significant problem. However, the
different remote filters aspects for the initial scan and continuous query
elements mean there needs to be some sleight of hand to ensure the
asynchronous processing context gets fed correctly from both.

I was not aware of the 'deliver once' guarantee only being relevant to the
local filter. While my processing is idempotent, it's still not a good
idea to reprocess things unnecessarily.

Given a service deployment seems appropriate either way I might look again
at per-node grid service deployment of a CQ operating locally on cache
elements so as to remove potential abuse of the remote filter.

Thanks,
Raymond.

-----Original Message-----
From: vkulichenko [mailto:valentin.kulichenko@gmail.com]
Sent: Friday, April 27, 2018 5:49 AM
To: user@ignite.apache.org
Subject: RE: Using a cache as an affinity co-located processing buffer in
Ignite.Net

Raymond,

If you want to have a single continuous query on one of the server nodes,
then I think singleton service is the best option to achieve that. It will
not only guarantee that there is one query at a time, but will also make
sure to redeploy it in case of failure.

Also I would be accurate with doing processing in the remote filter.
Filter is not designed for this, it's designed to do the filtering of
updates before sending them to listening node. Major pitfall here is that
it's invoked within entry lock, so any heavy processing would block the
entry.
Moreover, synchronous cache or other operations can cause deadlocks and/or
thread starvation. Finally, keep in mind that filter can be invoked
multiple times (at least for primary and backups, sometimes even more in
case of failures). There is no exactly-once guarantee for remote filter,
it's only applied to local listener.

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by vkulichenko <va...@gmail.com>.
Raymond,

If you want to have a single continuous query on one of the server nodes,
then I think singleton service is the best option to achieve that. It will
not only guarantee that there is one query at a time, but will also make
sure to redeploy it in case of failure.

Also I would be accurate with doing processing in the remote filter. Filter
is not designed for this, it's designed to do the filtering of updates
before sending them to listening node. Major pitfall here is that it's
invoked within entry lock, so any heavy processing would block the entry.
Moreover, synchronous cache or other operations can cause deadlocks and/or
thread starvation. Finally, keep in mind that filter can be invoked multiple
times (at least for primary and backups, sometimes even more in case of
failures). There is no exactly-once guarantee for remote filter, it's only
applied to local listener.

-Val



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Getting back to best practices…



I now have some code that can create and establish the continuous query
that will inject the filters into the remote nodes to perform the
processing work on the new items arriving in the cache.



What is the usual metaphor for managing the lifecycle of such a singleton
element when using Ignite. If I instantiate it on the server nodes then I
will have more than one of them (probably undesirable), yet it seems
overkill to create an entirely separate client node to create and host the
continuous query.



I know I could create a grid singleton service to house it, but I’m curious
if there are other common patterns.



Suggestions?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Wednesday, April 25, 2018 6:02 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



There will be two instances, and these instances will have different
lifetimes.



>  it seems weird that there are two different interfaces for essentially
the same purpose

They are a bit different, one operates on ICacheEntry, another on
ICacheEntryEvent (which carries additional data)



On Wed, Apr 25, 2018 at 8:40 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Let’s say I do this, where I define a single class FileFilter that
implements both remote filter interfaces (BTW, it seems weird that there
are two different interfaces for essentially the same purpose):



    public class RemoteFileFilter :

        ICacheEntryFilter<BufferQueueKey, BufferQueueItem>,

        ICacheEntryEventFilter<BufferQueueKey, BufferQueueItem>,





            RemoteFileFilter FileFilter = new RemoteFileFilter();



            // Construct the continuous query machinery

            // Set the initial query to return all elements in the cache

            // Instantiate the queryHandle and start the continous query on
the remote nodes

            // Note: Only cache items held on this local node will be
handled here

            IContinuousQueryHandle<ICacheEntry<BufferQueueKey,
BufferQueueItem>> queryHandle = queueCache.QueryContinuous

                (qry: new ContinuousQuery<BufferQueueKey, BufferQueueItem>(
new LocalFileListener())

                {

                    Filter = FileFilter

                },

                initialQry: new ScanQuery<BufferQueueKey, BufferQueueItem>

                {

                    Filter = FileFilter

                }))



Here, the same filter is supplied to both the continuous query and initial
scan query aspects. When this continuous query is serialized, send to the
remote node, then deserialised, will the continuous query on the remote
node retain two references to the same remote filter, or two instances of
the remote filter?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Wednesday, April 25, 2018 5:19 PM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



What do you mean by "instance"? In terms of CLR that would be a different
instance on every node.



On Wed, Apr 25, 2018 at 2:50 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I’m using a Continuous Query in both options (grid deployed service using a
CQ versus an independent context using a CQ). I was curious which context
using a CQ would be seen as desirable.



In the case where a filter is provided to a CQ for both the initial query
and for newly items arriving in the cache I would need to supply the same
filter instance for both as the processing logic has state that will need
to be shared between the two. Once the CQ has been serialized to the remote
nodes, will that filter be two separate instances or will is retain the
same singular instance?



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Wednesday, April 25, 2018 6:08 AM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



ContinuousQuery is the best practice for most kinds of streaming use cases.
I think it fits your use case as well.



On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Thanks, that makes sense.



From a best practices perspective, is better to have a grid deployed
service on each node executing local continuous queries against the cache
and orchestrating the processing from within the service, versus having
some singular context in the grid that uses the continuous query by placing
processing orchestration logic in the filter sent to the remote nodes?

Sent from my iPhone


On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <pt...@apache.org> wrote:

Sorry, looks like I have misunderstood you.



If you need initial scan, of course you can have it by using ScanQuery as
initialQuery.

Place all the processing logic into the ScanQuery filter, and return false
from there.

This way you can process all existing entries in a co-located fashion
without sending them to the initiator node.



Thanks,

Pavel



On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <ra...@trimble.com>
wrote:

Not being able to do an initial scan of elements on the remote nodes is a
bit of a problem (possibly a bug?)



Something that’s occurred to me is to wrap this behaviour into an Ignite
service deployed onto all of the server nodes, and use a local mode
continuous query from within each service to perform an initial scan of
elements and then steady state handling as new elements arrive.



The reason the initial scan is important is I need to handle cases where
there may be a non-trivial queue of items waiting for processing and there
is either a shutdown/restart of the grid, or there is a topology change
event that triggers rebalancing



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Tuesday, April 24, 2018 5:54 AM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



>  Is the initial query also run in the context of the remote node and the
remote filter?

No, it is just a query (can be SQL or Scan) which allows you to get a "full
picture" on the calling node:

all existing data and all future data.



So in your scenario it is not very useful.



>   return false from the filter so the element is not sent to the local
listener

Yes, exactly



On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <ra...@trimble.com>
wrote:

OK – I see how that works.



In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
there is this code:



using (var queryHandle = cache.QueryContinuous(qry, initialQry))

{

    // Iterate through existing data stored in cache.

    foreach (var entry in queryHandle.GetInitialQueryCursor())

        Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);



    // Add a few more keys and watch a few more query notifications.

    for (int i = 5; i < 15; i++)

        cache.Put(i, i.ToString());

}



Is the initial query also run in the context of the remote node and the
remote filter?



Construction of the ContinuousQuery also requires provision of
LocalListener to receive the cache update items. Is the approach here to
processing the element in the remote filter context then return false from
the filter so the element is not sent to the local listener?





*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:50 PM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.

In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.



So you can place your processing logic into the Remote Filter.



On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Hi Pavel,



Yes, I looked at continuous queries. They appear to be oriented toward a
single context being sent the newly arrived elements in the cache from all
primary nodes hosting the cache involved in the query.



In the use case I outlined below, I would like to have the items processed
in co-located contexts (ie: the data does not move and is processed in situ
on the primary node). How do you do that with a continuous query?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:18 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



To process incoming data in a co-located fashion there is a Continuous
Query feature [1].

Looks like it fits your use case quite well.





[1] https://apacheignite-net.readme.io/docs/continuous-queries



On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
There will be two instances, and these instances will have different
lifetimes.

>  it seems weird that there are two different interfaces for essentially
the same purpose
They are a bit different, one operates on ICacheEntry, another on
ICacheEntryEvent (which carries additional data)

On Wed, Apr 25, 2018 at 8:40 AM, Raymond Wilson <ra...@trimble.com>
wrote:

> Let’s say I do this, where I define a single class FileFilter that
> implements both remote filter interfaces (BTW, it seems weird that there
> are two different interfaces for essentially the same purpose):
>
>
>
>     public class RemoteFileFilter :
>
>         ICacheEntryFilter<BufferQueueKey, BufferQueueItem>,
>
>         ICacheEntryEventFilter<BufferQueueKey, BufferQueueItem>,
>
>
>
>
>
>             RemoteFileFilter FileFilter = new RemoteFileFilter();
>
>
>
>             // Construct the continuous query machinery
>
>             // Set the initial query to return all elements in the cache
>
>             // Instantiate the queryHandle and start the continous query
> on the remote nodes
>
>             // Note: Only cache items held on this local node will be
> handled here
>
>             IContinuousQueryHandle<ICacheEntry<BufferQueueKey,
> BufferQueueItem>> queryHandle = queueCache.QueryContinuous
>
>                 (qry: new ContinuousQuery<BufferQueueKey,
> BufferQueueItem>(new LocalFileListener())
>
>                 {
>
>                     Filter = FileFilter
>
>                 },
>
>                 initialQry: new ScanQuery<BufferQueueKey, BufferQueueItem>
>
>                 {
>
>                     Filter = FileFilter
>
>                 }))
>
>
>
> Here, the same filter is supplied to both the continuous query and initial
> scan query aspects. When this continuous query is serialized, send to the
> remote node, then deserialised, will the continuous query on the remote
> node retain two references to the same remote filter, or two instances of
> the remote filter?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Wednesday, April 25, 2018 5:19 PM
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> What do you mean by "instance"? In terms of CLR that would be a different
> instance on every node.
>
>
>
> On Wed, Apr 25, 2018 at 2:50 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> I’m using a Continuous Query in both options (grid deployed service using
> a CQ versus an independent context using a CQ). I was curious which context
> using a CQ would be seen as desirable.
>
>
>
> In the case where a filter is provided to a CQ for both the initial query
> and for newly items arriving in the cache I would need to supply the same
> filter instance for both as the processing logic has state that will need
> to be shared between the two. Once the CQ has been serialized to the remote
> nodes, will that filter be two separate instances or will is retain the
> same singular instance?
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Wednesday, April 25, 2018 6:08 AM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> ContinuousQuery is the best practice for most kinds of streaming use
> cases. I think it fits your use case as well.
>
>
>
> On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Thanks, that makes sense.
>
>
>
> From a best practices perspective, is better to have a grid deployed
> service on each node executing local continuous queries against the cache
> and orchestrating the processing from within the service, versus having
> some singular context in the grid that uses the continuous query by placing
> processing orchestration logic in the filter sent to the remote nodes?
>
> Sent from my iPhone
>
>
> On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <pt...@apache.org> wrote:
>
> Sorry, looks like I have misunderstood you.
>
>
>
> If you need initial scan, of course you can have it by using ScanQuery as
> initialQuery.
>
> Place all the processing logic into the ScanQuery filter, and return false
> from there.
>
> This way you can process all existing entries in a co-located fashion
> without sending them to the initiator node.
>
>
>
> Thanks,
>
> Pavel
>
>
>
> On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Not being able to do an initial scan of elements on the remote nodes is a
> bit of a problem (possibly a bug?)
>
>
>
> Something that’s occurred to me is to wrap this behaviour into an Ignite
> service deployed onto all of the server nodes, and use a local mode
> continuous query from within each service to perform an initial scan of
> elements and then steady state handling as new elements arrive.
>
>
>
> The reason the initial scan is important is I need to handle cases where
> there may be a non-trivial queue of items waiting for processing and there
> is either a shutdown/restart of the grid, or there is a topology change
> event that triggers rebalancing
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Tuesday, April 24, 2018 5:54 AM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> >  Is the initial query also run in the context of the remote node and
> the remote filter?
>
> No, it is just a query (can be SQL or Scan) which allows you to get a
> "full picture" on the calling node:
>
> all existing data and all future data.
>
>
>
> So in your scenario it is not very useful.
>
>
>
> >   return false from the filter so the element is not sent to the local
> listener
>
> Yes, exactly
>
>
>
> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> OK – I see how that works.
>
>
>
> In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
> there is this code:
>
>
>
> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>
> {
>
>     // Iterate through existing data stored in cache.
>
>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>
>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>
>
>
>     // Add a few more keys and watch a few more query notifications.
>
>     for (int i = 5; i < 15; i++)
>
>         cache.Put(i, i.ToString());
>
> }
>
>
>
> Is the initial query also run in the context of the remote node and the
> remote filter?
>
>
>
> Construction of the ContinuousQuery also requires provision of
> LocalListener to receive the cache update items. Is the approach here to
> processing the element in the remote filter context then return false from
> the filter so the element is not sent to the local listener?
>
>
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:50 PM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Remote Listener is deployed on every cache node and is invoked only on a
> primary node for that key.
>
> In other words, for each key there is only one invocation of the remote
> filter, and that invocation is local to that key.
>
>
>
> So you can place your processing logic into the Remote Filter.
>
>
>
> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Hi Pavel,
>
>
>
> Yes, I looked at continuous queries. They appear to be oriented toward a
> single context being sent the newly arrived elements in the cache from all
> primary nodes hosting the cache involved in the query.
>
>
>
> In the use case I outlined below, I would like to have the items processed
> in co-located contexts (ie: the data does not move and is processed in situ
> on the primary node). How do you do that with a continuous query?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:18 PM
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> To process incoming data in a co-located fashion there is a Continuous
> Query feature [1].
>
> Looks like it fits your use case quite well.
>
>
>
>
>
> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>
>
>
> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* user@ignite.apache.org
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Let’s say I do this, where I define a single class FileFilter that
implements both remote filter interfaces (BTW, it seems weird that there
are two different interfaces for essentially the same purpose):



    public class RemoteFileFilter :

        ICacheEntryFilter<BufferQueueKey, BufferQueueItem>,

        ICacheEntryEventFilter<BufferQueueKey, BufferQueueItem>,





            RemoteFileFilter FileFilter = new RemoteFileFilter();



            // Construct the continuous query machinery

            // Set the initial query to return all elements in the cache

            // Instantiate the queryHandle and start the continous query on
the remote nodes

            // Note: Only cache items held on this local node will be
handled here

            IContinuousQueryHandle<ICacheEntry<BufferQueueKey,
BufferQueueItem>> queryHandle = queueCache.QueryContinuous

                (qry: new ContinuousQuery<BufferQueueKey, BufferQueueItem>(
new LocalFileListener())

                {

                    Filter = FileFilter

                },

                initialQry: new ScanQuery<BufferQueueKey, BufferQueueItem>

                {

                    Filter = FileFilter

                }))



Here, the same filter is supplied to both the continuous query and initial
scan query aspects. When this continuous query is serialized, send to the
remote node, then deserialised, will the continuous query on the remote
node retain two references to the same remote filter, or two instances of
the remote filter?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Wednesday, April 25, 2018 5:19 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



What do you mean by "instance"? In terms of CLR that would be a different
instance on every node.



On Wed, Apr 25, 2018 at 2:50 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I’m using a Continuous Query in both options (grid deployed service using a
CQ versus an independent context using a CQ). I was curious which context
using a CQ would be seen as desirable.



In the case where a filter is provided to a CQ for both the initial query
and for newly items arriving in the cache I would need to supply the same
filter instance for both as the processing logic has state that will need
to be shared between the two. Once the CQ has been serialized to the remote
nodes, will that filter be two separate instances or will is retain the
same singular instance?



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Wednesday, April 25, 2018 6:08 AM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



ContinuousQuery is the best practice for most kinds of streaming use cases.
I think it fits your use case as well.



On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Thanks, that makes sense.



From a best practices perspective, is better to have a grid deployed
service on each node executing local continuous queries against the cache
and orchestrating the processing from within the service, versus having
some singular context in the grid that uses the continuous query by placing
processing orchestration logic in the filter sent to the remote nodes?

Sent from my iPhone


On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <pt...@apache.org> wrote:

Sorry, looks like I have misunderstood you.



If you need initial scan, of course you can have it by using ScanQuery as
initialQuery.

Place all the processing logic into the ScanQuery filter, and return false
from there.

This way you can process all existing entries in a co-located fashion
without sending them to the initiator node.



Thanks,

Pavel



On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <ra...@trimble.com>
wrote:

Not being able to do an initial scan of elements on the remote nodes is a
bit of a problem (possibly a bug?)



Something that’s occurred to me is to wrap this behaviour into an Ignite
service deployed onto all of the server nodes, and use a local mode
continuous query from within each service to perform an initial scan of
elements and then steady state handling as new elements arrive.



The reason the initial scan is important is I need to handle cases where
there may be a non-trivial queue of items waiting for processing and there
is either a shutdown/restart of the grid, or there is a topology change
event that triggers rebalancing



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Tuesday, April 24, 2018 5:54 AM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



>  Is the initial query also run in the context of the remote node and the
remote filter?

No, it is just a query (can be SQL or Scan) which allows you to get a "full
picture" on the calling node:

all existing data and all future data.



So in your scenario it is not very useful.



>   return false from the filter so the element is not sent to the local
listener

Yes, exactly



On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <ra...@trimble.com>
wrote:

OK – I see how that works.



In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
there is this code:



using (var queryHandle = cache.QueryContinuous(qry, initialQry))

{

    // Iterate through existing data stored in cache.

    foreach (var entry in queryHandle.GetInitialQueryCursor())

        Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);



    // Add a few more keys and watch a few more query notifications.

    for (int i = 5; i < 15; i++)

        cache.Put(i, i.ToString());

}



Is the initial query also run in the context of the remote node and the
remote filter?



Construction of the ContinuousQuery also requires provision of
LocalListener to receive the cache update items. Is the approach here to
processing the element in the remote filter context then return false from
the filter so the element is not sent to the local listener?





*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:50 PM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.

In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.



So you can place your processing logic into the Remote Filter.



On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Hi Pavel,



Yes, I looked at continuous queries. They appear to be oriented toward a
single context being sent the newly arrived elements in the cache from all
primary nodes hosting the cache involved in the query.



In the use case I outlined below, I would like to have the items processed
in co-located contexts (ie: the data does not move and is processed in situ
on the primary node). How do you do that with a continuous query?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:18 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



To process incoming data in a co-located fashion there is a Continuous
Query feature [1].

Looks like it fits your use case quite well.





[1] https://apacheignite-net.readme.io/docs/continuous-queries



On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
What do you mean by "instance"? In terms of CLR that would be a different
instance on every node.

On Wed, Apr 25, 2018 at 2:50 AM, Raymond Wilson <ra...@trimble.com>
wrote:

> I’m using a Continuous Query in both options (grid deployed service using
> a CQ versus an independent context using a CQ). I was curious which context
> using a CQ would be seen as desirable.
>
>
>
> In the case where a filter is provided to a CQ for both the initial query
> and for newly items arriving in the cache I would need to supply the same
> filter instance for both as the processing logic has state that will need
> to be shared between the two. Once the CQ has been serialized to the remote
> nodes, will that filter be two separate instances or will is retain the
> same singular instance?
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Wednesday, April 25, 2018 6:08 AM
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> ContinuousQuery is the best practice for most kinds of streaming use
> cases. I think it fits your use case as well.
>
>
>
> On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Thanks, that makes sense.
>
>
>
> From a best practices perspective, is better to have a grid deployed
> service on each node executing local continuous queries against the cache
> and orchestrating the processing from within the service, versus having
> some singular context in the grid that uses the continuous query by placing
> processing orchestration logic in the filter sent to the remote nodes?
>
> Sent from my iPhone
>
>
> On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <pt...@apache.org> wrote:
>
> Sorry, looks like I have misunderstood you.
>
>
>
> If you need initial scan, of course you can have it by using ScanQuery as
> initialQuery.
>
> Place all the processing logic into the ScanQuery filter, and return false
> from there.
>
> This way you can process all existing entries in a co-located fashion
> without sending them to the initiator node.
>
>
>
> Thanks,
>
> Pavel
>
>
>
> On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Not being able to do an initial scan of elements on the remote nodes is a
> bit of a problem (possibly a bug?)
>
>
>
> Something that’s occurred to me is to wrap this behaviour into an Ignite
> service deployed onto all of the server nodes, and use a local mode
> continuous query from within each service to perform an initial scan of
> elements and then steady state handling as new elements arrive.
>
>
>
> The reason the initial scan is important is I need to handle cases where
> there may be a non-trivial queue of items waiting for processing and there
> is either a shutdown/restart of the grid, or there is a topology change
> event that triggers rebalancing
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Tuesday, April 24, 2018 5:54 AM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> >  Is the initial query also run in the context of the remote node and
> the remote filter?
>
> No, it is just a query (can be SQL or Scan) which allows you to get a
> "full picture" on the calling node:
>
> all existing data and all future data.
>
>
>
> So in your scenario it is not very useful.
>
>
>
> >   return false from the filter so the element is not sent to the local
> listener
>
> Yes, exactly
>
>
>
> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> OK – I see how that works.
>
>
>
> In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
> there is this code:
>
>
>
> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>
> {
>
>     // Iterate through existing data stored in cache.
>
>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>
>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>
>
>
>     // Add a few more keys and watch a few more query notifications.
>
>     for (int i = 5; i < 15; i++)
>
>         cache.Put(i, i.ToString());
>
> }
>
>
>
> Is the initial query also run in the context of the remote node and the
> remote filter?
>
>
>
> Construction of the ContinuousQuery also requires provision of
> LocalListener to receive the cache update items. Is the approach here to
> processing the element in the remote filter context then return false from
> the filter so the element is not sent to the local listener?
>
>
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:50 PM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Remote Listener is deployed on every cache node and is invoked only on a
> primary node for that key.
>
> In other words, for each key there is only one invocation of the remote
> filter, and that invocation is local to that key.
>
>
>
> So you can place your processing logic into the Remote Filter.
>
>
>
> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Hi Pavel,
>
>
>
> Yes, I looked at continuous queries. They appear to be oriented toward a
> single context being sent the newly arrived elements in the cache from all
> primary nodes hosting the cache involved in the query.
>
>
>
> In the use case I outlined below, I would like to have the items processed
> in co-located contexts (ie: the data does not move and is processed in situ
> on the primary node). How do you do that with a continuous query?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:18 PM
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> To process incoming data in a co-located fashion there is a Continuous
> Query feature [1].
>
> Looks like it fits your use case quite well.
>
>
>
>
>
> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>
>
>
> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* user@ignite.apache.org
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
I’m using a Continuous Query in both options (grid deployed service using a
CQ versus an independent context using a CQ). I was curious which context
using a CQ would be seen as desirable.



In the case where a filter is provided to a CQ for both the initial query
and for newly items arriving in the cache I would need to supply the same
filter instance for both as the processing logic has state that will need
to be shared between the two. Once the CQ has been serialized to the remote
nodes, will that filter be two separate instances or will is retain the
same singular instance?



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Wednesday, April 25, 2018 6:08 AM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



ContinuousQuery is the best practice for most kinds of streaming use cases.
I think it fits your use case as well.



On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Thanks, that makes sense.



From a best practices perspective, is better to have a grid deployed
service on each node executing local continuous queries against the cache
and orchestrating the processing from within the service, versus having
some singular context in the grid that uses the continuous query by placing
processing orchestration logic in the filter sent to the remote nodes?

Sent from my iPhone


On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <pt...@apache.org> wrote:

Sorry, looks like I have misunderstood you.



If you need initial scan, of course you can have it by using ScanQuery as
initialQuery.

Place all the processing logic into the ScanQuery filter, and return false
from there.

This way you can process all existing entries in a co-located fashion
without sending them to the initiator node.



Thanks,

Pavel



On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <ra...@trimble.com>
wrote:

Not being able to do an initial scan of elements on the remote nodes is a
bit of a problem (possibly a bug?)



Something that’s occurred to me is to wrap this behaviour into an Ignite
service deployed onto all of the server nodes, and use a local mode
continuous query from within each service to perform an initial scan of
elements and then steady state handling as new elements arrive.



The reason the initial scan is important is I need to handle cases where
there may be a non-trivial queue of items waiting for processing and there
is either a shutdown/restart of the grid, or there is a topology change
event that triggers rebalancing



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Tuesday, April 24, 2018 5:54 AM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



>  Is the initial query also run in the context of the remote node and the
remote filter?

No, it is just a query (can be SQL or Scan) which allows you to get a "full
picture" on the calling node:

all existing data and all future data.



So in your scenario it is not very useful.



>   return false from the filter so the element is not sent to the local
listener

Yes, exactly



On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <ra...@trimble.com>
wrote:

OK – I see how that works.



In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
there is this code:



using (var queryHandle = cache.QueryContinuous(qry, initialQry))

{

    // Iterate through existing data stored in cache.

    foreach (var entry in queryHandle.GetInitialQueryCursor())

        Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);



    // Add a few more keys and watch a few more query notifications.

    for (int i = 5; i < 15; i++)

        cache.Put(i, i.ToString());

}



Is the initial query also run in the context of the remote node and the
remote filter?



Construction of the ContinuousQuery also requires provision of
LocalListener to receive the cache update items. Is the approach here to
processing the element in the remote filter context then return false from
the filter so the element is not sent to the local listener?





*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:50 PM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.

In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.



So you can place your processing logic into the Remote Filter.



On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Hi Pavel,



Yes, I looked at continuous queries. They appear to be oriented toward a
single context being sent the newly arrived elements in the cache from all
primary nodes hosting the cache involved in the query.



In the use case I outlined below, I would like to have the items processed
in co-located contexts (ie: the data does not move and is processed in situ
on the primary node). How do you do that with a continuous query?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:18 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



To process incoming data in a co-located fashion there is a Continuous
Query feature [1].

Looks like it fits your use case quite well.





[1] https://apacheignite-net.readme.io/docs/continuous-queries



On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
ContinuousQuery is the best practice for most kinds of streaming use cases.
I think it fits your use case as well.

On Tue, Apr 24, 2018 at 10:08 AM, Raymond Wilson <raymond_wilson@trimble.com
> wrote:

> Thanks, that makes sense.
>
> From a best practices perspective, is better to have a grid deployed
> service on each node executing local continuous queries against the cache
> and orchestrating the processing from within the service, versus having
> some singular context in the grid that uses the continuous query by placing
> processing orchestration logic in the filter sent to the remote nodes?
>
> Sent from my iPhone
>
> On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <pt...@apache.org> wrote:
>
> Sorry, looks like I have misunderstood you.
>
> If you need initial scan, of course you can have it by using ScanQuery as
> initialQuery.
> Place all the processing logic into the ScanQuery filter, and return false
> from there.
> This way you can process all existing entries in a co-located fashion
> without sending them to the initiator node.
>
> Thanks,
> Pavel
>
> On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
>> Not being able to do an initial scan of elements on the remote nodes is a
>> bit of a problem (possibly a bug?)
>>
>>
>>
>> Something that’s occurred to me is to wrap this behaviour into an Ignite
>> service deployed onto all of the server nodes, and use a local mode
>> continuous query from within each service to perform an initial scan of
>> elements and then steady state handling as new elements arrive.
>>
>>
>>
>> The reason the initial scan is important is I need to handle cases where
>> there may be a non-trivial queue of items waiting for processing and there
>> is either a shutdown/restart of the grid, or there is a topology change
>> event that triggers rebalancing
>>
>>
>>
>> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
>> *Sent:* Tuesday, April 24, 2018 5:54 AM
>>
>> *To:* user@ignite.apache.org
>> *Subject:* Re: Using a cache as an affinity co-located processing buffer
>> in Ignite.Net
>>
>>
>>
>> >  Is the initial query also run in the context of the remote node and
>> the remote filter?
>>
>> No, it is just a query (can be SQL or Scan) which allows you to get a
>> "full picture" on the calling node:
>>
>> all existing data and all future data.
>>
>>
>>
>> So in your scenario it is not very useful.
>>
>>
>>
>> >   return false from the filter so the element is not sent to the local
>> listener
>>
>> Yes, exactly
>>
>>
>>
>> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <
>> raymond_wilson@trimble.com> wrote:
>>
>> OK – I see how that works.
>>
>>
>>
>> In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
>> there is this code:
>>
>>
>>
>> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>>
>> {
>>
>>     // Iterate through existing data stored in cache.
>>
>>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>>
>>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>>
>>
>>
>>     // Add a few more keys and watch a few more query notifications.
>>
>>     for (int i = 5; i < 15; i++)
>>
>>         cache.Put(i, i.ToString());
>>
>> }
>>
>>
>>
>> Is the initial query also run in the context of the remote node and the
>> remote filter?
>>
>>
>>
>> Construction of the ContinuousQuery also requires provision of
>> LocalListener to receive the cache update items. Is the approach here to
>> processing the element in the remote filter context then return false from
>> the filter so the element is not sent to the local listener?
>>
>>
>>
>>
>>
>> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
>> *Sent:* Monday, April 23, 2018 7:50 PM
>>
>>
>> *To:* user@ignite.apache.org
>> *Subject:* Re: Using a cache as an affinity co-located processing buffer
>> in Ignite.Net
>>
>>
>>
>> Remote Listener is deployed on every cache node and is invoked only on a
>> primary node for that key.
>>
>> In other words, for each key there is only one invocation of the remote
>> filter, and that invocation is local to that key.
>>
>>
>>
>> So you can place your processing logic into the Remote Filter.
>>
>>
>>
>> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <
>> raymond_wilson@trimble.com> wrote:
>>
>> Hi Pavel,
>>
>>
>>
>> Yes, I looked at continuous queries. They appear to be oriented toward a
>> single context being sent the newly arrived elements in the cache from all
>> primary nodes hosting the cache involved in the query.
>>
>>
>>
>> In the use case I outlined below, I would like to have the items
>> processed in co-located contexts (ie: the data does not move and is
>> processed in situ on the primary node). How do you do that with a
>> continuous query?
>>
>>
>>
>> Thanks,
>>
>> Raymond.
>>
>>
>>
>> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
>> *Sent:* Monday, April 23, 2018 7:18 PM
>> *To:* user@ignite.apache.org
>> *Subject:* Re: Using a cache as an affinity co-located processing buffer
>> in Ignite.Net
>>
>>
>>
>> Hi Raymond,
>>
>>
>>
>> To process incoming data in a co-located fashion there is a Continuous
>> Query feature [1].
>>
>> Looks like it fits your use case quite well.
>>
>>
>>
>>
>>
>> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>>
>>
>>
>> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
>> raymond_wilson@trimble.com> wrote:
>>
>> I did find ICache.GetLocalEntries() method and have written the following
>> as a proof of concept (yet to exercise it though):
>>
>>
>>
>>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>>
>>
>>
>>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
>> localItems.FirstOrDefault();
>>
>>
>>
>>             if (first != null)
>>
>>             {
>>
>>                 // Get the list of all items in the buffer matching the
>> affinity key of the first item
>>
>>                 // in the list, limiting the result set to 100 TAG files.
>>
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> candidates = localItems
>>
>>                     .Where(x => x.Value.AffinityKey ==
>> first.Value.AffinityKey)
>>
>>                     .Take(100)
>>
>>                     .ToList();
>>
>>
>>
>>                 if (candidates?.Count > 0)
>>
>>                 {
>>
>>                     // Submit the list of items to the processor
>>
>>                     // ...
>>
>>                 }
>>
>>             }
>>
>>
>>
>> This seems like it should do what I want, but I’m a little suspicious
>> that it may evaluate the entire content of the cache against the Where()
>> condition before taking the first 100 results.
>>
>>
>>
>> I think I can constrain it by modifying the LINQ expression like this:
>>
>>
>>
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> candidates = localItems
>>
>>                     .Take(100)
>>
>>                     .Where(x => x.Value.AffinityKey ==
>> first.Value.AffinityKey)
>>
>>                     .ToList();
>>
>>
>>
>> Which will at least limit the overall number examined to be 100, while
>> not capturing the first 100 that do match.
>>
>>
>>
>> I could further modify it to a ‘double-take’ which still constrains the
>> overall query but improves the chances of filling the maximum take of 100
>> matching items
>>
>>
>>
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
>> candidates = localItems
>>
>>                     .Take(1000)
>>
>>                     .Where(x => x.Value.AffinityKey ==
>> first.Value.AffinityKey)
>>
>>                     .Take(100)
>>
>>                     .ToList();
>>
>>
>>
>> Or is there a better way?
>>
>>
>>
>> Thanks,
>>
>> Raymond.
>>
>>
>>
>> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
>> *Sent:* Monday, April 23, 2018 1:11 PM
>> *To:* user@ignite.apache.org
>> *Subject:* Using a cache as an affinity co-located processing buffer in
>> Ignite.Net
>>
>>
>>
>> All,
>>
>>
>>
>> I have been thinking about how to use Ignite.Net to support an affinity
>> co-located ingest pipeline that uses queue buffering to provide fault
>> tolerance and buffering for a flow of ingest packages.
>>
>>
>>
>> At a high level, it looks like this:
>>
>>
>>
>> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
>> affinity co-located with PackageProcessor]
>>
>> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
>> [ProcessedDataCache affinity co-located with PackageProcessor]
>>
>>
>>
>> Essentially, I want a cache that look like this:
>>
>>
>>
>> Public class CacheItem
>>
>> {
>>
>>     Public DateTime date;
>>
>>
>>
>>   [AffinityKeyMapped]
>>
>>      public Guid AffinityKey;
>>
>>
>>
>>      public byte [] Package;
>>
>> }
>>
>>
>>
>>    ICache<string, CacheTime> BufferQueue.
>>
>>
>>
>> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>>
>>                     new CacheConfiguration
>>
>>                     {
>>
>>                         Name = “BufferQueue”,
>>
>>
>>
>>                         KeepBinaryInStore = true,
>>
>>
>>
>>                         // Replicate the maps across nodes
>>
>>                         CacheMode = CacheMode.Partitioned,
>>
>>                     });
>>
>>             }
>>
>>
>>
>> This queue will target a data region that is configured for persistency.
>>
>>
>>
>> Inbound packages will arrive and be injected into the BufferQueue cache
>> from some client node context, like this:
>>
>>
>>
>> public void HandleANewPackage(string key, Guid affinityKey, byte []
>> package)
>>
>> {
>>
>> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey
>> = affinityKey, Package = package});
>>
>> }
>>
>>
>>
>> There will be a collection of server nodes that are responsible for the
>> cache.
>>
>>
>>
>> This is all straightforward. The tricky bit is then processing the
>> elements in the BufferQueue cache.
>>
>>
>>
>> The data is already on the server nodes, nicely co-located according to
>> its affinity. I want to have parallel processing logic that runs on the
>> server nodes that pulls elements from the buffer queue and processes them
>> into some other cache(s).
>>
>>
>>
>> At this point I know I have a cache that may contain something needing to
>> be processed, but I don’t know their keys. I know it’s possible to have
>> logic running on each server node that does this (either as a Grid Service
>> or a Compute::Broadcast() lambda):
>>
>>
>>
>> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>>
>> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new
>> QueryFilter()));
>>
>>
>>
>> foreach (var cacheEntry in cursor)
>>
>>     ProcessItem(CacheEntry);
>>
>>
>>
>> …but I am not sure how to restrict the elements in the cache returned to
>> the query to be only those entries affinity co-located with the server
>> asking for them.
>>
>>
>>
>> Is this so obvious that it just works and does not need documentation, or
>> is this not possible and I should run the processing context from a client
>> node context (as above) and pay the penalty of extracting the packages from
>> the cache with cache.Query() and then resubmitting them using an affinity
>> aware method like AffinityRun()?
>>
>>
>>
>> Thanks,
>>
>> Raymond.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Thanks, that makes sense. 

From a best practices perspective, is better to have a grid deployed service on each node executing local continuous queries against the cache and orchestrating the processing from within the service, versus having some singular context in the grid that uses the continuous query by placing processing orchestration logic in the filter sent to the remote nodes?

Sent from my iPhone

> On 24/04/2018, at 6:53 PM, Pavel Tupitsyn <pt...@apache.org> wrote:
> 
> Sorry, looks like I have misunderstood you.
> 
> If you need initial scan, of course you can have it by using ScanQuery as initialQuery.
> Place all the processing logic into the ScanQuery filter, and return false from there.
> This way you can process all existing entries in a co-located fashion without sending them to the initiator node.
> 
> Thanks,
> Pavel
> 
>> On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <ra...@trimble.com> wrote:
>> Not being able to do an initial scan of elements on the remote nodes is a bit of a problem (possibly a bug?)
>> 
>>  
>> 
>> Something that’s occurred to me is to wrap this behaviour into an Ignite service deployed onto all of the server nodes, and use a local mode continuous query from within each service to perform an initial scan of elements and then steady state handling as new elements arrive.
>> 
>>  
>> 
>> The reason the initial scan is important is I need to handle cases where there may be a non-trivial queue of items waiting for processing and there is either a shutdown/restart of the grid, or there is a topology change event that triggers rebalancing
>> 
>>  
>> 
>> From: Pavel Tupitsyn [mailto:ptupitsyn@apache.org] 
>> Sent: Tuesday, April 24, 2018 5:54 AM
>> 
>> 
>> To: user@ignite.apache.org
>> Subject: Re: Using a cache as an affinity co-located processing buffer in Ignite.Net
>>  
>> 
>> >  Is the initial query also run in the context of the remote node and the remote filter?
>> 
>> No, it is just a query (can be SQL or Scan) which allows you to get a "full picture" on the calling node:
>> 
>> all existing data and all future data.
>> 
>>  
>> 
>> So in your scenario it is not very useful.
>> 
>>  
>> 
>> >   return false from the filter so the element is not sent to the local listener
>> 
>> Yes, exactly
>> 
>>  
>> 
>> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <ra...@trimble.com> wrote:
>> 
>> OK – I see how that works.
>> 
>>  
>> 
>> In the page https://apacheignite-net.readme.io/docs/continuous-queries , there is this code:
>> 
>>  
>> 
>> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>> 
>> {
>> 
>>     // Iterate through existing data stored in cache.
>> 
>>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>> 
>>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>> 
>>  
>> 
>>     // Add a few more keys and watch a few more query notifications.
>> 
>>     for (int i = 5; i < 15; i++)
>> 
>>         cache.Put(i, i.ToString());
>> 
>> }
>> 
>>  
>> 
>> Is the initial query also run in the context of the remote node and the remote filter?
>> 
>>  
>> 
>> Construction of the ContinuousQuery also requires provision of LocalListener to receive the cache update items. Is the approach here to processing the element in the remote filter context then return false from the filter so the element is not sent to the local listener?
>> 
>>  
>> 
>>  
>> 
>> From: Pavel Tupitsyn [mailto:ptupitsyn@apache.org] 
>> Sent: Monday, April 23, 2018 7:50 PM
>> 
>> 
>> To: user@ignite.apache.org
>> Subject: Re: Using a cache as an affinity co-located processing buffer in Ignite.Net
>> 
>>  
>> 
>> Remote Listener is deployed on every cache node and is invoked only on a primary node for that key.
>> 
>> In other words, for each key there is only one invocation of the remote filter, and that invocation is local to that key.
>> 
>>  
>> 
>> So you can place your processing logic into the Remote Filter.
>> 
>>  
>> 
>> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <ra...@trimble.com> wrote:
>> 
>> Hi Pavel,
>> 
>>  
>> 
>> Yes, I looked at continuous queries. They appear to be oriented toward a single context being sent the newly arrived elements in the cache from all primary nodes hosting the cache involved in the query.
>> 
>>  
>> 
>> In the use case I outlined below, I would like to have the items processed in co-located contexts (ie: the data does not move and is processed in situ on the primary node). How do you do that with a continuous query?
>> 
>>  
>> 
>> Thanks,
>> 
>> Raymond.
>> 
>>  
>> 
>> From: Pavel Tupitsyn [mailto:ptupitsyn@apache.org] 
>> Sent: Monday, April 23, 2018 7:18 PM
>> To: user@ignite.apache.org
>> Subject: Re: Using a cache as an affinity co-located processing buffer in Ignite.Net
>> 
>>  
>> 
>> Hi Raymond,
>> 
>>  
>> 
>> To process incoming data in a co-located fashion there is a Continuous Query feature [1].
>> 
>> Looks like it fits your use case quite well.
>> 
>>  
>> 
>>  
>> 
>> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>> 
>>  
>> 
>> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com> wrote:
>> 
>> I did find ICache.GetLocalEntries() method and have written the following as a proof of concept (yet to exercise it though):
>> 
>>  
>> 
>>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>> 
>>  
>> 
>>             ICacheEntry<BufferQueueKey, BufferQueueItem> first = localItems.FirstOrDefault();
>> 
>>  
>> 
>>             if (first != null)
>> 
>>             {
>> 
>>                 // Get the list of all items in the buffer matching the affinity key of the first item
>> 
>>                 // in the list, limiting the result set to 100 TAG files.
>> 
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>> candidates = localItems
>> 
>>                     .Where(x => x.Value.AffinityKey == first.Value.AffinityKey)
>> 
>>                     .Take(100)
>> 
>>                     .ToList();
>> 
>>  
>> 
>>                 if (candidates?.Count > 0)
>> 
>>                 {
>> 
>>                     // Submit the list of items to the processor
>> 
>>                     // ...
>> 
>>                 }
>> 
>>             }
>> 
>>  
>> 
>> This seems like it should do what I want, but I’m a little suspicious that it may evaluate the entire content of the cache against the Where() condition before taking the first 100 results.
>> 
>>  
>> 
>> I think I can constrain it by modifying the LINQ expression like this:
>> 
>>  
>> 
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>> candidates = localItems
>> 
>>                     .Take(100)
>> 
>>                     .Where(x => x.Value.AffinityKey == first.Value.AffinityKey)
>> 
>>                     .ToList();
>> 
>>  
>> 
>> Which will at least limit the overall number examined to be 100, while not capturing the first 100 that do match.
>> 
>>  
>> 
>> I could further modify it to a ‘double-take’ which still constrains the overall query but improves the chances of filling the maximum take of 100 matching items
>> 
>>  
>> 
>>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>> candidates = localItems
>> 
>>                     .Take(1000)
>> 
>>                     .Where(x => x.Value.AffinityKey == first.Value.AffinityKey)
>> 
>>                     .Take(100)
>> 
>>                     .ToList();
>> 
>>  
>> 
>> Or is there a better way?
>> 
>>  
>> 
>> Thanks,
>> 
>> Raymond.
>> 
>>  
>> 
>> From: Raymond Wilson [mailto:raymond_wilson@trimble.com] 
>> Sent: Monday, April 23, 2018 1:11 PM
>> To: user@ignite.apache.org
>> Subject: Using a cache as an affinity co-located processing buffer in Ignite.Net
>> 
>>  
>> 
>> All,
>> 
>>  
>> 
>> I have been thinking about how to use Ignite.Net to support an affinity co-located ingest pipeline that uses queue buffering to provide fault tolerance and buffering for a flow of ingest packages.
>> 
>>  
>> 
>> At a high level, it looks like this:
>> 
>>  
>> 
>> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity co-located with PackageProcessor]
>> 
>> Processing pipeline: [PackageCache] -> [PackageProcessor] -> [ProcessedDataCache affinity co-located with PackageProcessor]
>> 
>>  
>> 
>> Essentially, I want a cache that look like this:
>> 
>>  
>> 
>> Public class CacheItem
>> 
>> {
>> 
>>     Public DateTime date;
>> 
>>  
>> 
>>   [AffinityKeyMapped]
>> 
>>      public Guid AffinityKey;
>> 
>>  
>> 
>>      public byte [] Package;
>> 
>> }
>> 
>>  
>> 
>>    ICache<string, CacheTime> BufferQueue.
>> 
>>  
>> 
>> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>> 
>>                     new CacheConfiguration
>> 
>>                     {
>> 
>>                         Name = “BufferQueue”,
>> 
>>  
>> 
>>                         KeepBinaryInStore = true,
>> 
>>  
>> 
>>                         // Replicate the maps across nodes
>> 
>>                         CacheMode = CacheMode.Partitioned,
>> 
>>                     });
>> 
>>             }
>> 
>>  
>> 
>> This queue will target a data region that is configured for persistency.
>> 
>>  
>> 
>> Inbound packages will arrive and be injected into the BufferQueue cache from some client node context, like this:
>> 
>>  
>> 
>> public void HandleANewPackage(string key, Guid affinityKey, byte [] package)
>> 
>> {
>> 
>> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey = affinityKey, Package = package});
>> 
>> }
>> 
>>  
>> 
>> There will be a collection of server nodes that are responsible for the cache.
>> 
>>  
>> 
>> This is all straightforward. The tricky bit is then processing the elements in the BufferQueue cache.
>> 
>>  
>> 
>> The data is already on the server nodes, nicely co-located according to its affinity. I want to have parallel processing logic that runs on the server nodes that pulls elements from the buffer queue and processes them into some other cache(s).
>> 
>>  
>> 
>> At this point I know I have a cache that may contain something needing to be processed, but I don’t know their keys. I know it’s possible to have logic running on each server node that does this (either as a Grid Service or a Compute::Broadcast() lambda):
>> 
>>  
>> 
>> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>> 
>> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter()));
>> 
>>  
>> 
>> foreach (var cacheEntry in cursor)
>> 
>>     ProcessItem(CacheEntry);
>> 
>>  
>> 
>> …but I am not sure how to restrict the elements in the cache returned to the query to be only those entries affinity co-located with the server asking for them.
>> 
>>  
>> 
>> Is this so obvious that it just works and does not need documentation, or is this not possible and I should run the processing context from a client node context (as above) and pay the penalty of extracting the packages from the cache with cache.Query() and then resubmitting them using an affinity aware method like AffinityRun()?
>> 
>>  
>> 
>> Thanks,
>> 
>> Raymond.
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
> 

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
Sorry, looks like I have misunderstood you.

If you need initial scan, of course you can have it by using ScanQuery as
initialQuery.
Place all the processing logic into the ScanQuery filter, and return false
from there.
This way you can process all existing entries in a co-located fashion
without sending them to the initiator node.

Thanks,
Pavel

On Mon, Apr 23, 2018 at 11:50 PM, Raymond Wilson <raymond_wilson@trimble.com
> wrote:

> Not being able to do an initial scan of elements on the remote nodes is a
> bit of a problem (possibly a bug?)
>
>
>
> Something that’s occurred to me is to wrap this behaviour into an Ignite
> service deployed onto all of the server nodes, and use a local mode
> continuous query from within each service to perform an initial scan of
> elements and then steady state handling as new elements arrive.
>
>
>
> The reason the initial scan is important is I need to handle cases where
> there may be a non-trivial queue of items waiting for processing and there
> is either a shutdown/restart of the grid, or there is a topology change
> event that triggers rebalancing
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Tuesday, April 24, 2018 5:54 AM
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> >  Is the initial query also run in the context of the remote node and
> the remote filter?
>
> No, it is just a query (can be SQL or Scan) which allows you to get a
> "full picture" on the calling node:
>
> all existing data and all future data.
>
>
>
> So in your scenario it is not very useful.
>
>
>
> >   return false from the filter so the element is not sent to the local
> listener
>
> Yes, exactly
>
>
>
> On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> OK – I see how that works.
>
>
>
> In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
> there is this code:
>
>
>
> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>
> {
>
>     // Iterate through existing data stored in cache.
>
>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>
>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>
>
>
>     // Add a few more keys and watch a few more query notifications.
>
>     for (int i = 5; i < 15; i++)
>
>         cache.Put(i, i.ToString());
>
> }
>
>
>
> Is the initial query also run in the context of the remote node and the
> remote filter?
>
>
>
> Construction of the ContinuousQuery also requires provision of
> LocalListener to receive the cache update items. Is the approach here to
> processing the element in the remote filter context then return false from
> the filter so the element is not sent to the local listener?
>
>
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:50 PM
>
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Remote Listener is deployed on every cache node and is invoked only on a
> primary node for that key.
>
> In other words, for each key there is only one invocation of the remote
> filter, and that invocation is local to that key.
>
>
>
> So you can place your processing logic into the Remote Filter.
>
>
>
> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Hi Pavel,
>
>
>
> Yes, I looked at continuous queries. They appear to be oriented toward a
> single context being sent the newly arrived elements in the cache from all
> primary nodes hosting the cache involved in the query.
>
>
>
> In the use case I outlined below, I would like to have the items processed
> in co-located contexts (ie: the data does not move and is processed in situ
> on the primary node). How do you do that with a continuous query?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:18 PM
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> To process incoming data in a co-located fashion there is a Continuous
> Query feature [1].
>
> Looks like it fits your use case quite well.
>
>
>
>
>
> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>
>
>
> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* user@ignite.apache.org
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>
>
>
>
>
>
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Not being able to do an initial scan of elements on the remote nodes is a
bit of a problem (possibly a bug?)



Something that’s occurred to me is to wrap this behaviour into an Ignite
service deployed onto all of the server nodes, and use a local mode
continuous query from within each service to perform an initial scan of
elements and then steady state handling as new elements arrive.



The reason the initial scan is important is I need to handle cases where
there may be a non-trivial queue of items waiting for processing and there
is either a shutdown/restart of the grid, or there is a topology change
event that triggers rebalancing



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Tuesday, April 24, 2018 5:54 AM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



>  Is the initial query also run in the context of the remote node and the
remote filter?

No, it is just a query (can be SQL or Scan) which allows you to get a "full
picture" on the calling node:

all existing data and all future data.



So in your scenario it is not very useful.



>   return false from the filter so the element is not sent to the local
listener

Yes, exactly



On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <ra...@trimble.com>
wrote:

OK – I see how that works.



In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
there is this code:



using (var queryHandle = cache.QueryContinuous(qry, initialQry))

{

    // Iterate through existing data stored in cache.

    foreach (var entry in queryHandle.GetInitialQueryCursor())

        Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);



    // Add a few more keys and watch a few more query notifications.

    for (int i = 5; i < 15; i++)

        cache.Put(i, i.ToString());

}



Is the initial query also run in the context of the remote node and the
remote filter?



Construction of the ContinuousQuery also requires provision of
LocalListener to receive the cache update items. Is the approach here to
processing the element in the remote filter context then return false from
the filter so the element is not sent to the local listener?





*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:50 PM


*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.

In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.



So you can place your processing logic into the Remote Filter.



On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Hi Pavel,



Yes, I looked at continuous queries. They appear to be oriented toward a
single context being sent the newly arrived elements in the cache from all
primary nodes hosting the cache involved in the query.



In the use case I outlined below, I would like to have the items processed
in co-located contexts (ie: the data does not move and is processed in situ
on the primary node). How do you do that with a continuous query?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:18 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



To process incoming data in a co-located fashion there is a Continuous
Query feature [1].

Looks like it fits your use case quite well.





[1] https://apacheignite-net.readme.io/docs/continuous-queries



On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
>  Is the initial query also run in the context of the remote node and the
remote filter?
No, it is just a query (can be SQL or Scan) which allows you to get a "full
picture" on the calling node:
all existing data and all future data.

So in your scenario it is not very useful.

>   return false from the filter so the element is not sent to the local
listener
Yes, exactly

On Mon, Apr 23, 2018 at 11:18 AM, Raymond Wilson <raymond_wilson@trimble.com
> wrote:

> OK – I see how that works.
>
>
>
> In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
> there is this code:
>
>
>
> using (var queryHandle = cache.QueryContinuous(qry, initialQry))
>
> {
>
>     // Iterate through existing data stored in cache.
>
>     foreach (var entry in queryHandle.GetInitialQueryCursor())
>
>         Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);
>
>
>
>     // Add a few more keys and watch a few more query notifications.
>
>     for (int i = 5; i < 15; i++)
>
>         cache.Put(i, i.ToString());
>
> }
>
>
>
> Is the initial query also run in the context of the remote node and the
> remote filter?
>
>
>
> Construction of the ContinuousQuery also requires provision of
> LocalListener to receive the cache update items. Is the approach here to
> processing the element in the remote filter context then return false from
> the filter so the element is not sent to the local listener?
>
>
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:50 PM
>
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Remote Listener is deployed on every cache node and is invoked only on a
> primary node for that key.
>
> In other words, for each key there is only one invocation of the remote
> filter, and that invocation is local to that key.
>
>
>
> So you can place your processing logic into the Remote Filter.
>
>
>
> On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> Hi Pavel,
>
>
>
> Yes, I looked at continuous queries. They appear to be oriented toward a
> single context being sent the newly arrived elements in the cache from all
> primary nodes hosting the cache involved in the query.
>
>
>
> In the use case I outlined below, I would like to have the items processed
> in co-located contexts (ie: the data does not move and is processed in situ
> on the primary node). How do you do that with a continuous query?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:18 PM
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> To process incoming data in a co-located fashion there is a Continuous
> Query feature [1].
>
> Looks like it fits your use case quite well.
>
>
>
>
>
> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>
>
>
> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* user@ignite.apache.org
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>
>
>
>
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
OK – I see how that works.



In the page https://apacheignite-net.readme.io/docs/continuous-queries ,
there is this code:



using (var queryHandle = cache.QueryContinuous(qry, initialQry))

{

    // Iterate through existing data stored in cache.

    foreach (var entry in queryHandle.GetInitialQueryCursor())

        Console.WriteLine("key={0}, val={1}", entry.Key, entry.Value);



    // Add a few more keys and watch a few more query notifications.

    for (int i = 5; i < 15; i++)

        cache.Put(i, i.ToString());

}



Is the initial query also run in the context of the remote node and the
remote filter?



Construction of the ContinuousQuery also requires provision of
LocalListener to receive the cache update items. Is the approach here to
processing the element in the remote filter context then return false from
the filter so the element is not sent to the local listener?





*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:50 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.

In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.



So you can place your processing logic into the Remote Filter.



On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <ra...@trimble.com>
wrote:

Hi Pavel,



Yes, I looked at continuous queries. They appear to be oriented toward a
single context being sent the newly arrived elements in the cache from all
primary nodes hosting the cache involved in the query.



In the use case I outlined below, I would like to have the items processed
in co-located contexts (ie: the data does not move and is processed in situ
on the primary node). How do you do that with a continuous query?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:18 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



To process incoming data in a co-located fashion there is a Continuous
Query feature [1].

Looks like it fits your use case quite well.





[1] https://apacheignite-net.readme.io/docs/continuous-queries



On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
Remote Listener is deployed on every cache node and is invoked only on a
primary node for that key.
In other words, for each key there is only one invocation of the remote
filter, and that invocation is local to that key.

So you can place your processing logic into the Remote Filter.

On Mon, Apr 23, 2018 at 10:42 AM, Raymond Wilson <raymond_wilson@trimble.com
> wrote:

> Hi Pavel,
>
>
>
> Yes, I looked at continuous queries. They appear to be oriented toward a
> single context being sent the newly arrived elements in the cache from all
> primary nodes hosting the cache involved in the query.
>
>
>
> In the use case I outlined below, I would like to have the items processed
> in co-located contexts (ie: the data does not move and is processed in situ
> on the primary node). How do you do that with a continuous query?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
> *Sent:* Monday, April 23, 2018 7:18 PM
> *To:* user@ignite.apache.org
> *Subject:* Re: Using a cache as an affinity co-located processing buffer
> in Ignite.Net
>
>
>
> Hi Raymond,
>
>
>
> To process incoming data in a co-located fashion there is a Continuous
> Query feature [1].
>
> Looks like it fits your use case quite well.
>
>
>
>
>
> [1] https://apacheignite-net.readme.io/docs/continuous-queries
>
>
>
> On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <
> raymond_wilson@trimble.com> wrote:
>
> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* user@ignite.apache.org
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>
>
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
Hi Pavel,



Yes, I looked at continuous queries. They appear to be oriented toward a
single context being sent the newly arrived elements in the cache from all
primary nodes hosting the cache involved in the query.



In the use case I outlined below, I would like to have the items processed
in co-located contexts (ie: the data does not move and is processed in situ
on the primary node). How do you do that with a continuous query?



Thanks,

Raymond.



*From:* Pavel Tupitsyn [mailto:ptupitsyn@apache.org]
*Sent:* Monday, April 23, 2018 7:18 PM
*To:* user@ignite.apache.org
*Subject:* Re: Using a cache as an affinity co-located processing buffer in
Ignite.Net



Hi Raymond,



To process incoming data in a co-located fashion there is a Continuous
Query feature [1].

Looks like it fits your use case quite well.





[1] https://apacheignite-net.readme.io/docs/continuous-queries



On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com>
wrote:

I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.

Re: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Pavel Tupitsyn <pt...@apache.org>.
Hi Raymond,

To process incoming data in a co-located fashion there is a Continuous
Query feature [1].
Looks like it fits your use case quite well.


[1] https://apacheignite-net.readme.io/docs/continuous-queries

On Mon, Apr 23, 2018 at 7:32 AM, Raymond Wilson <ra...@trimble.com>
wrote:

> I did find ICache.GetLocalEntries() method and have written the following
> as a proof of concept (yet to exercise it though):
>
>
>
>             IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});
>
>
>
>             ICacheEntry<BufferQueueKey, BufferQueueItem> first =
> localItems.FirstOrDefault();
>
>
>
>             if (first != null)
>
>             {
>
>                 // Get the list of all items in the buffer matching the
> affinity key of the first item
>
>                 // in the list, limiting the result set to 100 TAG files.
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
>                 if (candidates?.Count > 0)
>
>                 {
>
>                     // Submit the list of items to the processor
>
>                     // ...
>
>                 }
>
>             }
>
>
>
> This seems like it should do what I want, but I’m a little suspicious that
> it may evaluate the entire content of the cache against the Where()
> condition before taking the first 100 results.
>
>
>
> I think I can constrain it by modifying the LINQ expression like this:
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(100)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .ToList();
>
>
>
> Which will at least limit the overall number examined to be 100, while not
> capturing the first 100 that do match.
>
>
>
> I could further modify it to a ‘double-take’ which still constrains the
> overall query but improves the chances of filling the maximum take of 100
> matching items
>
>
>
>                 List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
> candidates = localItems
>
>                     .Take(1000)
>
>                     .Where(x => x.Value.AffinityKey ==
> first.Value.AffinityKey)
>
>                     .Take(100)
>
>                     .ToList();
>
>
>
> Or is there a better way?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
> *From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
> *Sent:* Monday, April 23, 2018 1:11 PM
> *To:* user@ignite.apache.org
> *Subject:* Using a cache as an affinity co-located processing buffer in
> Ignite.Net
>
>
>
> All,
>
>
>
> I have been thinking about how to use Ignite.Net to support an affinity
> co-located ingest pipeline that uses queue buffering to provide fault
> tolerance and buffering for a flow of ingest packages.
>
>
>
> At a high level, it looks like this:
>
>
>
> Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache,
> affinity co-located with PackageProcessor]
>
> Processing pipeline: [PackageCache] -> [PackageProcessor] ->
> [ProcessedDataCache affinity co-located with PackageProcessor]
>
>
>
> Essentially, I want a cache that look like this:
>
>
>
> Public class CacheItem
>
> {
>
>     Public DateTime date;
>
>
>
>   [AffinityKeyMapped]
>
>      public Guid AffinityKey;
>
>
>
>      public byte [] Package;
>
> }
>
>
>
>    ICache<string, CacheTime> BufferQueue.
>
>
>
> BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (
>
>                     new CacheConfiguration
>
>                     {
>
>                         Name = “BufferQueue”,
>
>
>
>                         KeepBinaryInStore = true,
>
>
>
>                         // Replicate the maps across nodes
>
>                         CacheMode = CacheMode.Partitioned,
>
>                     });
>
>             }
>
>
>
> This queue will target a data region that is configured for persistency.
>
>
>
> Inbound packages will arrive and be injected into the BufferQueue cache
> from some client node context, like this:
>
>
>
> public void HandleANewPackage(string key, Guid affinityKey, byte []
> package)
>
> {
>
> BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
> affinityKey, Package = package});
>
> }
>
>
>
> There will be a collection of server nodes that are responsible for the
> cache.
>
>
>
> This is all straightforward. The tricky bit is then processing the
> elements in the BufferQueue cache.
>
>
>
> The data is already on the server nodes, nicely co-located according to
> its affinity. I want to have parallel processing logic that runs on the
> server nodes that pulls elements from the buffer queue and processes them
> into some other cache(s).
>
>
>
> At this point I know I have a cache that may contain something needing to
> be processed, but I don’t know their keys. I know it’s possible to have
> logic running on each server node that does this (either as a Grid Service
> or a Compute::Broadcast() lambda):
>
>
>
> var cache = ignite.GetCache<string, CacheItem>("BufferQueue");
>
> var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
> ()));
>
>
>
> foreach (var cacheEntry in cursor)
>
>     ProcessItem(CacheEntry);
>
>
>
> …but I am not sure how to restrict the elements in the cache returned to
> the query to be only those entries affinity co-located with the server
> asking for them.
>
>
>
> Is this so obvious that it just works and does not need documentation, or
> is this not possible and I should run the processing context from a client
> node context (as above) and pay the penalty of extracting the packages from
> the cache with cache.Query() and then resubmitting them using an affinity
> aware method like AffinityRun()?
>
>
>
> Thanks,
>
> Raymond.
>
>
>
>
>

RE: Using a cache as an affinity co-located processing buffer in Ignite.Net

Posted by Raymond Wilson <ra...@trimble.com>.
I did find ICache.GetLocalEntries() method and have written the following
as a proof of concept (yet to exercise it though):



            IEnumerable<ICacheEntry<BufferQueueKey, BufferQueueItem>>
localItems = QueueCache.GetLocalEntries(new [] {CachePeekMode.Primary});



            ICacheEntry<BufferQueueKey, BufferQueueItem> first =
localItems.FirstOrDefault();



            if (first != null)

            {

                // Get the list of all items in the buffer matching the
affinity key of the first item

                // in the list, limiting the result set to 100 TAG files.

                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



                if (candidates?.Count > 0)

                {

                    // Submit the list of items to the processor

                    // ...

                }

            }



This seems like it should do what I want, but I’m a little suspicious that
it may evaluate the entire content of the cache against the Where()
condition before taking the first 100 results.



I think I can constrain it by modifying the LINQ expression like this:



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(100)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .ToList();



Which will at least limit the overall number examined to be 100, while not
capturing the first 100 that do match.



I could further modify it to a ‘double-take’ which still constrains the
overall query but improves the chances of filling the maximum take of 100
matching items



                List<ICacheEntry<BufferQueueKey, BufferQueueItem>>
candidates = localItems

                    .Take(1000)

                    .Where(x => x.Value.AffinityKey ==
first.Value.AffinityKey)

                    .Take(100)

                    .ToList();



Or is there a better way?



Thanks,

Raymond.



*From:* Raymond Wilson [mailto:raymond_wilson@trimble.com]
*Sent:* Monday, April 23, 2018 1:11 PM
*To:* user@ignite.apache.org
*Subject:* Using a cache as an affinity co-located processing buffer in
Ignite.Net



All,



I have been thinking about how to use Ignite.Net to support an affinity
co-located ingest pipeline that uses queue buffering to provide fault
tolerance and buffering for a flow of ingest packages.



At a high level, it looks like this:



Arrival pipeline: [Gateway] -> [PackageReceiver] -> [PackageCache, affinity
co-located with PackageProcessor]

Processing pipeline: [PackageCache] -> [PackageProcessor] ->
[ProcessedDataCache affinity co-located with PackageProcessor]



Essentially, I want a cache that look like this:



Public class CacheItem

{

    Public DateTime date;



  [AffinityKeyMapped]

     public Guid AffinityKey;



     public byte [] Package;

}



   ICache<string, CacheTime> BufferQueue.



BufferQueue =  ignite.GetOrCreateCache <string, CacheItem > (

                    new CacheConfiguration

                    {

                        Name = “BufferQueue”,



                        KeepBinaryInStore = true,



                        // Replicate the maps across nodes

                        CacheMode = CacheMode.Partitioned,

                    });

            }



This queue will target a data region that is configured for persistency.



Inbound packages will arrive and be injected into the BufferQueue cache
from some client node context, like this:



public void HandleANewPackage(string key, Guid affinityKey, byte [] package)

{

BufferQueue.Put(key, new CacheItem() {data = DateTime.Now(), AffinityKey =
affinityKey, Package = package});

}



There will be a collection of server nodes that are responsible for the
cache.



This is all straightforward. The tricky bit is then processing the elements
in the BufferQueue cache.



The data is already on the server nodes, nicely co-located according to its
affinity. I want to have parallel processing logic that runs on the server
nodes that pulls elements from the buffer queue and processes them into
some other cache(s).



At this point I know I have a cache that may contain something needing to
be processed, but I don’t know their keys. I know it’s possible to have
logic running on each server node that does this (either as a Grid Service
or a Compute::Broadcast() lambda):



var cache = ignite.GetCache<string, CacheItem>("BufferQueue");

var cursor = cache.Query(new ScanQuery<string, CacheItem >(new QueryFilter
()));



foreach (var cacheEntry in cursor)

    ProcessItem(CacheEntry);



…but I am not sure how to restrict the elements in the cache returned to
the query to be only those entries affinity co-located with the server
asking for them.



Is this so obvious that it just works and does not need documentation, or
is this not possible and I should run the processing context from a client
node context (as above) and pay the penalty of extracting the packages from
the cache with cache.Query() and then resubmitting them using an affinity
aware method like AffinityRun()?



Thanks,

Raymond.