You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by begineer <re...@gmail.com> on 2016/12/14 11:02:15 UTC

Detecting terminal condition for group of items in Ignite cache.

Hi,
My sample application processes trades for different companies stored in
Ignite cache. When all trades for particular company reaches SUCCESS stage,
an automatic notification should be triggered and some other
system/application will react to it. To do this, When ever any trade reaches
SUCCESS stage, I detect it using Continuous query, I am comparing total size
of trades for particular company in cache and the trades for that company
which are in SUCCESS stage. If they are equal, trigger a notification else
wait.
There are two drawbacks with this approach.
1. I have to query through all the trades for that company twice every time
trade reach SUCCESS stage which is really really bad.
2. notification can be triggered multiple times if multiple threads
processing trades in parallel in SUCCESS stage.(i.e lets say last 5 items
reach SUCCESS stage together, so 5 emails will be sent since continuous
query will run local listener 5 times)

Is there a better way to do same task. I am hoping it is. My current
approach cannot work in real time application.

Below is my code.



public class TerminalEventsUsingContQuery {
	IgniteCache<Integer, Trade> cache;

	public static void main(String[] args) {
		new TerminalEventsUsingContQuery().test();
	}

	private void test() {

		Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
		CacheConfiguration<Integer, Trade> config = new
CacheConfiguration<>("TradesCache");

		cache = ignite.getOrCreateCache(config);
		ContinuousQuery<Integer, Trade> query = new ContinuousQuery<>();
		query.setLocalListener(events -> events.forEach(e ->
process(e.getValue())));

		query.setRemoteFilterFactory(factoryOf(e ->
TradeStatus.SUCCESS.equals(e.getValue().getStatus())));
		query.setInitialQuery(new ScanQuery<Integer, Trade>((k, v) ->
TradeStatus.SUCCESS.equals(v.getStatus())));
		buildData();
		QueryCursor<Entry&lt;Integer, Trade>> cursor = cache.query(query);
		cursor.forEach(entry -> process(entry.getValue()));
		Trade t9 = new Trade(9, TradeStatus.SUCCESS, "type1", 100);
		cache.put(t9.getId(), t9);
	}

	private void process(Trade trade) {
		List<Entry&lt;Integer, Trade>> totalperRef = cache
				.query(new ScanQuery<Integer, Trade>((k, v) -> v.getRef() ==
trade.getRef())).getAll();

		List<Entry&lt;Integer, Trade>> totalSuccessForRef = cache.query(new
ScanQuery<Integer, Trade>(
				(k, v) -> v.getRef() == trade.getRef() &&
TradeStatus.SUCCESS.equals(v.getStatus()))).getAll();

		if (totalperRef.size() == totalSuccessForRef.size()) {
			System.out.println("Terminal condition reached. Notify the handler for :
" + trade.getRef());
		} else {
			System.out.println("Terminal condition not reached yet. Current
processing Trade : " + trade.getId());
		}
	}

	private void buildData() {
		Trade t1 = new Trade(1, TradeStatus.SUCCESS, "type1", 100);
		Trade t2 = new Trade(2, TradeStatus.FAILED, "type1", 101);
		Trade t3 = new Trade(3, TradeStatus.EXPIRED, "type1", 102);
		Trade t4 = new Trade(4, TradeStatus.SUCCESS, "type1", 100);
		Trade t5 = new Trade(5, TradeStatus.CHANGED, "type1", 103);
		Trade t6 = new Trade(6, TradeStatus.SUCCESS, "type1", 100);
		Trade t7 = new Trade(7, TradeStatus.CHANGED, "type1", 103);
		Trade t8 = new Trade(8, TradeStatus.SUCCESS, "type1", 101);
		cache.put(t1.getId(), t1);
		cache.put(t2.getId(), t2);
		cache.put(t3.getId(), t3);
		cache.put(t4.getId(), t4);
		cache.put(t5.getId(), t5);
		cache.put(t6.getId(), t6);
		cache.put(t7.getId(), t7);
		cache.put(t8.getId(), t8);
	}
}

public class Trade { 
        private int id; 
        private TradeStatus status; 
        private String tradeType; 
        public Trade(int id, TradeStatus status, String tradeType) { 
                this.id = id; 
                this.status = status; 
                this.tradeType = tradeType; 
        } 
        
//setter getter, equals, hashcode methods 
}
 
public enum TradeStatus { 
        NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS
}



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Detecting-terminal-condition-for-group-of-items-in-Ignite-cache-tp9526.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Detecting terminal condition for group of items in Ignite cache.

Posted by Taras Ledkov <tl...@gridgain.com>.
In case the performance of a fix rate is appropriate for you I think your
solution make sense because it is very simple and robust.

Also, you can use IgniteScheduler (
https://ignite.apache.org/releases/mobile/org/apache/ignite/IgniteScheduler.html)
instead of spring scheduler.

On Wed, Dec 14, 2016 at 9:22 PM, begineer <re...@gmail.com> wrote:

> Thanks for reply first of all. Well I had this in back of my mind but it
> will
> be like duplicating the data which we already have in other cache(trades
> cache which I am currently querying).
>
> So other way I can think of is using spring scheduler with 1 minute
> fix-rate
> and check if any item moved to SUCCESS state, if yes, postpone execution by
> one minute(something like fix-delay but this works after executing the job,
> but my requirement is  it should delay the execution itself.)
>
> If no more items moved to final state within mentioned time, we execute the
> scheduler.
>
> Any suggestions in this approach please or any better one?
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Detecting-terminal-condition-for-group-
> of-items-in-Ignite-cache-tp9526p9539.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>

Re: Detecting terminal condition for group of items in Ignite cache.

Posted by begineer <re...@gmail.com>.
Thanks for reply first of all. Well I had this in back of my mind but it will
be like duplicating the data which we already have in other cache(trades
cache which I am currently querying).

So other way I can think of is using spring scheduler with 1 minute fix-rate
and check if any item moved to SUCCESS state, if yes, postpone execution by
one minute(something like fix-delay but this works after executing the job,
but my requirement is  it should delay the execution itself.) 

If no more items moved to final state within mentioned time, we execute the
scheduler.

Any suggestions in this approach please or any better one?



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Detecting-terminal-condition-for-group-of-items-in-Ignite-cache-tp9526p9539.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Detecting terminal condition for group of items in Ignite cache.

Posted by Taras Ledkov <tl...@gridgain.com>.
Hi,

What do you think about an atomic counter of not successful trades, e.g.
the atomic cache with a company name as a key and Integer as a counter.
You can update the counter when add trade (increment) and at the listener
of the continuous query (decrement) and do something when counter equals
zero.

-- 

Taras Ledkov
Mail-To: tledkov@gridgain.com


On Wed, Dec 14, 2016 at 2:02 PM, begineer <re...@gmail.com> wrote:

> Hi,
> My sample application processes trades for different companies stored in
> Ignite cache. When all trades for particular company reaches SUCCESS stage,
> an automatic notification should be triggered and some other
> system/application will react to it. To do this, When ever any trade
> reaches
> SUCCESS stage, I detect it using Continuous query, I am comparing total
> size
> of trades for particular company in cache and the trades for that company
> which are in SUCCESS stage. If they are equal, trigger a notification else
> wait.
> There are two drawbacks with this approach.
> 1. I have to query through all the trades for that company twice every time
> trade reach SUCCESS stage which is really really bad.
> 2. notification can be triggered multiple times if multiple threads
> processing trades in parallel in SUCCESS stage.(i.e lets say last 5 items
> reach SUCCESS stage together, so 5 emails will be sent since continuous
> query will run local listener 5 times)
>
> Is there a better way to do same task. I am hoping it is. My current
> approach cannot work in real time application.
>
> Below is my code.
>
>
>
> public class TerminalEventsUsingContQuery {
>         IgniteCache<Integer, Trade> cache;
>
>         public static void main(String[] args) {
>                 new TerminalEventsUsingContQuery().test();
>         }
>
>         private void test() {
>
>                 Ignite ignite = Ignition.start("examples/
> config/example-ignite.xml");
>                 CacheConfiguration<Integer, Trade> config = new
> CacheConfiguration<>("TradesCache");
>
>                 cache = ignite.getOrCreateCache(config);
>                 ContinuousQuery<Integer, Trade> query = new
> ContinuousQuery<>();
>                 query.setLocalListener(events -> events.forEach(e ->
> process(e.getValue())));
>
>                 query.setRemoteFilterFactory(factoryOf(e ->
> TradeStatus.SUCCESS.equals(e.getValue().getStatus())));
>                 query.setInitialQuery(new ScanQuery<Integer, Trade>((k, v)
> ->
> TradeStatus.SUCCESS.equals(v.getStatus())));
>                 buildData();
>                 QueryCursor<Entry&lt;Integer, Trade>> cursor =
> cache.query(query);
>                 cursor.forEach(entry -> process(entry.getValue()));
>                 Trade t9 = new Trade(9, TradeStatus.SUCCESS, "type1", 100);
>                 cache.put(t9.getId(), t9);
>         }
>
>         private void process(Trade trade) {
>                 List<Entry&lt;Integer, Trade>> totalperRef = cache
>                                 .query(new ScanQuery<Integer, Trade>((k,
> v) -> v.getRef() ==
> trade.getRef())).getAll();
>
>                 List<Entry&lt;Integer, Trade>> totalSuccessForRef =
> cache.query(new
> ScanQuery<Integer, Trade>(
>                                 (k, v) -> v.getRef() == trade.getRef() &&
> TradeStatus.SUCCESS.equals(v.getStatus()))).getAll();
>
>                 if (totalperRef.size() == totalSuccessForRef.size()) {
>                         System.out.println("Terminal condition reached.
> Notify the handler for :
> " + trade.getRef());
>                 } else {
>                         System.out.println("Terminal condition not reached
> yet. Current
> processing Trade : " + trade.getId());
>                 }
>         }
>
>         private void buildData() {
>                 Trade t1 = new Trade(1, TradeStatus.SUCCESS, "type1", 100);
>                 Trade t2 = new Trade(2, TradeStatus.FAILED, "type1", 101);
>                 Trade t3 = new Trade(3, TradeStatus.EXPIRED, "type1", 102);
>                 Trade t4 = new Trade(4, TradeStatus.SUCCESS, "type1", 100);
>                 Trade t5 = new Trade(5, TradeStatus.CHANGED, "type1", 103);
>                 Trade t6 = new Trade(6, TradeStatus.SUCCESS, "type1", 100);
>                 Trade t7 = new Trade(7, TradeStatus.CHANGED, "type1", 103);
>                 Trade t8 = new Trade(8, TradeStatus.SUCCESS, "type1", 101);
>                 cache.put(t1.getId(), t1);
>                 cache.put(t2.getId(), t2);
>                 cache.put(t3.getId(), t3);
>                 cache.put(t4.getId(), t4);
>                 cache.put(t5.getId(), t5);
>                 cache.put(t6.getId(), t6);
>                 cache.put(t7.getId(), t7);
>                 cache.put(t8.getId(), t8);
>         }
> }
>
> public class Trade {
>         private int id;
>         private TradeStatus status;
>         private String tradeType;
>         public Trade(int id, TradeStatus status, String tradeType) {
>                 this.id = id;
>                 this.status = status;
>                 this.tradeType = tradeType;
>         }
>
> //setter getter, equals, hashcode methods
> }
>
> public enum TradeStatus {
>         NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS
> }
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Detecting-terminal-condition-for-group-
> of-items-in-Ignite-cache-tp9526.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>