You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by ght230 <gh...@163.com> on 2017/02/20 08:38:34 UTC

Ignite cluster unstable when doing continuous query

I found Ignite cluster unstable when doing continuous query.
Following is my test code.

ContinuousQuery.java
public class ContinuousQuery {
	/** Cache name. */
	private static final String CACHE_NAME_INPUTDATA = "inputdata";
	private static final String CACHE_NAME_UPDATEDATA = "updatedata";
	private static IgniteCache inputCache = null;
	private static IgniteCache updateCache = null;
    static Random rand=new Random();

	public static void main(String[] args) throws Exception {
		Ignition.setClientMode(true);
		Ignite ignite = Ignition.start();
		
		CacheConfiguration cacheCfg = new CacheConfiguration();
		cacheCfg.setCacheMode(CacheMode.PARTITIONED);
		cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
		cacheCfg.setName(CACHE_NAME_INPUTDATA);
		inputCache = ignite.getOrCreateCache(cacheCfg);
		
		CacheConfiguration cacheCfg1 = new CacheConfiguration();
		cacheCfg1.setCacheMode(CacheMode.REPLICATED);
		cacheCfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
		cacheCfg1.setName(CACHE_NAME_UPDATEDATA);
		updateCache = ignite.getOrCreateCache(cacheCfg1);
		
		ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();

		qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
			@Override
			public void onUpdated(Iterable<CacheEntryEvent&lt;? extends Integer, ?
extends String>> evts) {			
			}
		});

		qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter&lt;Integer,
String>>() {
            @Override public CacheEntryEventFilter<Integer, String> create()
{
                return new CacheEntryFilter();
            }
		});		
		inputCache.query(qry);
	}	

    private static class CacheEntryFilter implements
CacheEntryEventFilter<Integer, String> {
        /** Ignite instance. */
        @IgniteInstanceResource
        private Ignite ignite;

        /** {@inheritDoc} */
        @Override public boolean evaluate(final CacheEntryEvent<? extends
Integer, ? extends String> e)
            throws CacheEntryListenerException {
        	IgniteCache<Integer, String> syncCache =
ignite.cache(CACHE_NAME_UPDATEDATA);
	        if (e.getEventType() == EventType.CREATED && e.getKey() < 100000 &&
e.getKey() % 5000 != 0){
	        
System.out.println(Thread.currentThread().getName()+"***--->><key,value>=<"+e.getKey()+","+e.getValue()+">"
);
	            syncCache.put(e.getKey(),
e.getValue()+"_______"+System.currentTimeMillis());
	            syncCache.remove(e.getKey());
				return false;
	        }
			return true;
        }
    }
}

InputData.java
public class InputData {
	/** Cache name. */
	private static final String CACHE_NAME_INPUTDATA = "inputdata";
	private static IgniteCache inputCache;

	public static void main(String[] args) throws Exception {
		Ignition.setClientMode(true);
		Ignite ignite = Ignition.start();

		inputCache = ignite.getOrCreateCache(CACHE_NAME_INPUTDATA);

		// Auto-close cache at the end of the example.
		int keyCnt = 1000000;

		// These entries will be queried by initial predicate.
		for (int i = 0; i < keyCnt; i++){
			inputCache.put(i, Integer.toString(10000 + i));
		}
	}
}

My test steps as following:
step 1:start an Ignite server.
step 2: start an Ignite client of ContinuousQuery.jar
step 3: start an Ignite client of InputData.jar

Then If I start another Ignite server or even start ignitevisorcmd, the
Ignite cluster will be jammed.

If I add @IgniteAsyncCallback to CacheEntryFilter, the cluster will not be
jammed, but it can not guarantee event to be processed in order.

I would like to know what solution can meet my follwong needs?
1. The event should be triggered in order.
2. The process in function "evaluate" should be in order.(in the example,
"put" and "remove" operation should be in order) 



--
View this message in context: http://apache-ignite-users.70518.x6.nabble.com/Ignite-cluster-unstable-when-doing-continuous-query-tp10726.html
Sent from the Apache Ignite Users mailing list archive at Nabble.com.

Re: Ignite cluster unstable when doing continuous query

Posted by Nikolai Tikhonov <nt...@apache.org>.
Hi!

You perform cache operation from filter. The callback is invoked from
sensitive part of code and in synchronous mode it can lead to thread
starvation and deadlock (see ContinuousQuery#setRemoteFilterFactory
javadoc). @IgniteAsyncCallback was designed exactly for resolving this
issue.

> but it can not guarantee event to be processed in order
@IgniteAsyncCallback have the same guarantee. Updates for key will be performed
sequentially.

Thanks,
Nikolay

On Mon, Feb 20, 2017 at 11:38 AM, ght230 <gh...@163.com> wrote:

> I found Ignite cluster unstable when doing continuous query.
> Following is my test code.
>
> ContinuousQuery.java
> public class ContinuousQuery {
>         /** Cache name. */
>         private static final String CACHE_NAME_INPUTDATA = "inputdata";
>         private static final String CACHE_NAME_UPDATEDATA = "updatedata";
>         private static IgniteCache inputCache = null;
>         private static IgniteCache updateCache = null;
>     static Random rand=new Random();
>
>         public static void main(String[] args) throws Exception {
>                 Ignition.setClientMode(true);
>                 Ignite ignite = Ignition.start();
>
>                 CacheConfiguration cacheCfg = new CacheConfiguration();
>                 cacheCfg.setCacheMode(CacheMode.PARTITIONED);
>                 cacheCfg.setAtomicityMode(CacheAtomicityMode.
> TRANSACTIONAL);
>                 cacheCfg.setName(CACHE_NAME_INPUTDATA);
>                 inputCache = ignite.getOrCreateCache(cacheCfg);
>
>                 CacheConfiguration cacheCfg1 = new CacheConfiguration();
>                 cacheCfg1.setCacheMode(CacheMode.REPLICATED);
>                 cacheCfg1.setAtomicityMode(CacheAtomicityMode.
> TRANSACTIONAL);
>                 cacheCfg1.setName(CACHE_NAME_UPDATEDATA);
>                 updateCache = ignite.getOrCreateCache(cacheCfg1);
>
>                 ContinuousQuery<Integer, String> qry = new
> ContinuousQuery<>();
>
>                 qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
> String>() {
>                         @Override
>                         public void onUpdated(Iterable<CacheEntryEvent&lt;?
> extends Integer, ?
> extends String>> evts) {
>                         }
>                 });
>
>                 qry.setRemoteFilterFactory(new
> Factory<CacheEntryEventFilter&lt;Integer,
> String>>() {
>             @Override public CacheEntryEventFilter<Integer, String>
> create()
> {
>                 return new CacheEntryFilter();
>             }
>                 });
>                 inputCache.query(qry);
>         }
>
>     private static class CacheEntryFilter implements
> CacheEntryEventFilter<Integer, String> {
>         /** Ignite instance. */
>         @IgniteInstanceResource
>         private Ignite ignite;
>
>         /** {@inheritDoc} */
>         @Override public boolean evaluate(final CacheEntryEvent<? extends
> Integer, ? extends String> e)
>             throws CacheEntryListenerException {
>                 IgniteCache<Integer, String> syncCache =
> ignite.cache(CACHE_NAME_UPDATEDATA);
>                 if (e.getEventType() == EventType.CREATED && e.getKey() <
> 100000 &&
> e.getKey() % 5000 != 0){
>
> System.out.println(Thread.currentThread().getName()+"***
> --->><key,value>=<"+e.getKey()+","+e.getValue()+">"
> );
>                     syncCache.put(e.getKey(),
> e.getValue()+"_______"+System.currentTimeMillis());
>                     syncCache.remove(e.getKey());
>                                 return false;
>                 }
>                         return true;
>         }
>     }
> }
>
> InputData.java
> public class InputData {
>         /** Cache name. */
>         private static final String CACHE_NAME_INPUTDATA = "inputdata";
>         private static IgniteCache inputCache;
>
>         public static void main(String[] args) throws Exception {
>                 Ignition.setClientMode(true);
>                 Ignite ignite = Ignition.start();
>
>                 inputCache = ignite.getOrCreateCache(CACHE_
> NAME_INPUTDATA);
>
>                 // Auto-close cache at the end of the example.
>                 int keyCnt = 1000000;
>
>                 // These entries will be queried by initial predicate.
>                 for (int i = 0; i < keyCnt; i++){
>                         inputCache.put(i, Integer.toString(10000 + i));
>                 }
>         }
> }
>
> My test steps as following:
> step 1:start an Ignite server.
> step 2: start an Ignite client of ContinuousQuery.jar
> step 3: start an Ignite client of InputData.jar
>
> Then If I start another Ignite server or even start ignitevisorcmd, the
> Ignite cluster will be jammed.
>
> If I add @IgniteAsyncCallback to CacheEntryFilter, the cluster will not be
> jammed, but it can not guarantee event to be processed in order.
>
> I would like to know what solution can meet my follwong needs?
> 1. The event should be triggered in order.
> 2. The process in function "evaluate" should be in order.(in the example,
> "put" and "remove" operation should be in order)
>
>
>
> --
> View this message in context: http://apache-ignite-users.
> 70518.x6.nabble.com/Ignite-cluster-unstable-when-doing-
> continuous-query-tp10726.html
> Sent from the Apache Ignite Users mailing list archive at Nabble.com.
>