You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by begineer <re...@gmail.com> on 2017/06/07 14:51:12 UTC

Re: Continuous Query remote listener misses some events or respond really late

Hi.. Sorry its quite late to reply. CQ is setup in execute method of service
not in init(), but we do have initialQuery in CQ to scan existing events to
matching the filter. Below is snapshot of one of the many ignite services
set to process trade on when trade moves to particular status.

As you can see, I have added logs to remote filter predicate. But these logs
don't get printed when trade get stuck at particular status. So I assume,
remote filter does not pick the events it is supposed to track.

public enum TradeStatus { 
        NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS 
}


/**
 * Ignite Service which picks up CHANGED trade delivery items
 */
public class ChangedTradeService implements Service{

	@IgniteInstanceResource
	private transient Ignite ignite;
	private transient IgniteCache<Long, Trade> tradeCache;
	private transient QueryCursor<Entry&lt;Long, Trade>> cursor;

	@Override
	public void init(ServiceContext serviceContext) throws Exception {
		tradeCache = ignite.cache("tradeCache");
	}

	@Override
	public void execute(ServiceContext serviceContext) throws Exception {
		ContinuousQuery<Long, Trade> query = new ContinuousQuery<>();
		query.setLocalListener((CacheEntryUpdatedListenerAsync<Long, Trade>)
events -> events
				.forEach(event -> process(event.getValue())));
		query.setRemoteFilterFactory(factoryOf(checkStatus(status)));
		query.setInitialQuery(new ScanQuery<>(checkStatusPredicate(status)));
		QueryCursor<Cache.Entry&lt;Long, Trade>> cursor = tradeCache.query(query);
		cursor.forEach(entry -> process(entry.getValue()));
	}

	private void process(Trade item){
             log.info("transition started for trade id :"+item.getPkey());
		//move the trade to next state(e.g SUCCESS) and next Service(contains CQ,
which is looking for SUCCESS status) will pick this up for processing
further and so on
             log.info("transition finished for trade id :"+item.getPkey());	
}

	@Override
	public void cancel(ServiceContext serviceContext) {
		cursor.close();
	}
	
	static CacheEntryEventFilterAsync<Long, Trade> checkStatus(TradeStatus
status) {
		return event -> event.getValue() != null &&
checkStatusPredicate(status).apply(event.getKey(), event.getValue());
	}
	
	static IgniteBiPredicate<Long, TradeStatus>
checkStatusPredicate(TradeStatus status) {
		return (k, v) -> {
			LOG.debug("Status checking for: {} Event value: {} isStatus: {}", status,
v, v.getStatus() == status);
			return v.getStatus() == status;
		};
	}
}




--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Continuous-Query-remote-listener-misses-some-events-or-respond-really-late-tp12338p13476.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Continuous Query remote listener misses some events or respond really late

Posted by begineer <re...@gmail.com>.
Hi..
I know its quite late to reply, But I am seeing this issue intermittently
almost everyday. But can't reproduce it locally on dev machine. As suggested
I have moved logs before null check to see if null event is logged. However,
I didn't see it printed in logs. Also, it was suggested to check if events
(in question) reaches remote listener(log should print), no log is printed
in such scenario so I assume event does not reach remote listener
immediately.

Same event is processed after several hours later. like 4 hours some times
even after one day. 

I tried to add same event manually to cache object, it is processed
immediately 
(only if original event is stuck).

Also, host logs are clean, I couldn't find anything suspicious. 
Please let me know if you want any more information. I will try to fetch it.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: Continuous Query remote listener misses some events or respond really late

Posted by begineer <re...@gmail.com>.
Hi,
Thanks I will move the logging as suggested. And that is correct, we don't
store null in caches.



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Continuous-Query-remote-listener-misses-some-events-or-respond-really-late-tp12338p13873.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Continuous Query remote listener misses some events or respond really late

Posted by Sasha Belyak <rt...@gmail.com>.
Thank for your reply. From code I see that you log only entries with non
null values. If your absolutely shure that you never put null in cache - I
will create loadtest to reproduce it and create issue for you. But it will
be great, if you move logging before event.getValue! = null.

среда, 7 июня 2017 г. пользователь begineer написал:

> Hi.. Sorry its quite late to reply. CQ is setup in execute method of
> service
> not in init(), but we do have initialQuery in CQ to scan existing events to
> matching the filter. Below is snapshot of one of the many ignite services
> set to process trade on when trade moves to particular status.
>
> As you can see, I have added logs to remote filter predicate. But these
> logs
> don't get printed when trade get stuck at particular status. So I assume,
> remote filter does not pick the events it is supposed to track.
>
> public enum TradeStatus {
>         NEW, CHANGED, EXPIRED, FAILED, UNCHANGED , SUCCESS
> }
>
>
> /**
>  * Ignite Service which picks up CHANGED trade delivery items
>  */
> public class ChangedTradeService implements Service{
>
>         @IgniteInstanceResource
>         private transient Ignite ignite;
>         private transient IgniteCache<Long, Trade> tradeCache;
>         private transient QueryCursor<Entry&lt;Long, Trade>> cursor;
>
>         @Override
>         public void init(ServiceContext serviceContext) throws Exception {
>                 tradeCache = ignite.cache("tradeCache");
>         }
>
>         @Override
>         public void execute(ServiceContext serviceContext) throws
> Exception {
>                 ContinuousQuery<Long, Trade> query = new
> ContinuousQuery<>();
>                 query.setLocalListener((CacheEntryUpdatedListenerAsync<Long,
> Trade>)
> events -> events
>                                 .forEach(event ->
> process(event.getValue())));
>                 query.setRemoteFilterFactory(
> factoryOf(checkStatus(status)));
>                 query.setInitialQuery(new ScanQuery<>(
> checkStatusPredicate(status)));
>                 QueryCursor<Cache.Entry&lt;Long, Trade>> cursor =
> tradeCache.query(query);
>                 cursor.forEach(entry -> process(entry.getValue()));
>         }
>
>         private void process(Trade item){
>              log.info("transition started for trade id :"+item.getPkey());
>                 //move the trade to next state(e.g SUCCESS) and next
> Service(contains CQ,
> which is looking for SUCCESS status) will pick this up for processing
> further and so on
>              log.info("transition finished for trade id
> :"+item.getPkey());
> }
>
>         @Override
>         public void cancel(ServiceContext serviceContext) {
>                 cursor.close();
>         }
>
>         static CacheEntryEventFilterAsync<Long, Trade>
> checkStatus(TradeStatus
> status) {
>                 return event -> event.getValue() != null &&
> checkStatusPredicate(status).apply(event.getKey(), event.getValue());
>         }
>
>         static IgniteBiPredicate<Long, TradeStatus>
> checkStatusPredicate(TradeStatus status) {
>                 return (k, v) -> {
>                         LOG.debug("Status checking for: {} Event value: {}
> isStatus: {}", status,
> v, v.getStatus() == status);
>                         return v.getStatus() == status;
>                 };
>         }
> }
>
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Continuous-Query-remote-listener-misses-some-events-
> or-respond-really-late-tp12338p13476.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>