You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2016/02/16 16:35:10 UTC

ignite git commit: Fixed "IGNITE-2515 Make 'updateCntr' available through CacheContinuousQueryEvent public API"

Repository: ignite
Updated Branches:
  refs/heads/ignite-1.5.7 dd6681046 -> 4de124c41


Fixed "IGNITE-2515 Make 'updateCntr' available through CacheContinuousQueryEvent public API"


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4de124c4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4de124c4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4de124c4

Branch: refs/heads/ignite-1.5.7
Commit: 4de124c41a30e8b4ee825c355686e16228a4390d
Parents: dd66810
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Feb 16 18:28:51 2016 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue Feb 16 18:30:05 2016 +0300

----------------------------------------------------------------------
 .../cache/query/CacheQueryEntryEvent.java       |   48 +
 .../continuous/CacheContinuousQueryEvent.java   |   17 +-
 .../continuous/CacheContinuousQueryHandler.java |   96 +-
 .../CacheContinuousQueryListener.java           |    3 +-
 .../continuous/CacheContinuousQueryManager.java |   27 +-
 ...CacheContinuousQueryCounterAbstractTest.java |  613 ++++++++++
 ...inuousQueryCounterPartitionedAtomicTest.java |   41 +
 ...ContinuousQueryCounterPartitionedTxTest.java |   41 +
 ...tinuousQueryCounterReplicatedAtomicTest.java |   41 +
 ...eContinuousQueryCounterReplicatedTxTest.java |   41 +
 ...acheContinuousQueryRandomOperationsTest.java | 1124 ++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   10 +
 12 files changed, 2019 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
new file mode 100644
index 0000000..2c1c5e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/CacheQueryEntryEvent.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.query;
+
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.EventType;
+
+/**
+ * A Cache continuous query entry event.
+ *
+ * @param <K> the type of key
+ * @param <V> the type of value
+ */
+public abstract class CacheQueryEntryEvent<K, V> extends CacheEntryEvent<K, V> {
+    /**
+     * Constructs a cache entry event from a given cache as source.
+     *
+     * @param source the cache that originated the event
+     * @param eventType Event type.
+     */
+    public CacheQueryEntryEvent(Cache source, EventType eventType) {
+        super(source, eventType);
+    }
+
+    /**
+     * Each cache update increases partition counter. The same cache updates have on the same value of counter
+     * on primary and backup nodes. This value can be useful to communicate with external applications.
+     *
+     * @return Value of counter for this event.
+     */
+    public abstract long getPartitionUpdateCounter();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d26e666..eab5dbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
 import javax.cache.Cache;
-import javax.cache.event.CacheEntryEvent;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 /**
  * Continuous query event.
  */
-class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+class CacheContinuousQueryEvent<K, V> extends CacheQueryEntryEvent<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -57,8 +57,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public K getKey() {
+    @Override public K getKey() {
         return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.key(), e.isKeepBinary(), false);
     }
 
@@ -68,8 +67,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public V getOldValue() {
+    @Override public V getOldValue() {
         return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(e.oldValue(), e.isKeepBinary(), false);
     }
 
@@ -79,8 +77,13 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
     }
 
     /** {@inheritDoc} */
+    @Override public long getPartitionUpdateCounter() {
+        return e.updateCounter();
+    }
+
+    /** {@inheritDoc} */
     @Override public <T> T unwrap(Class<T> cls) {
-        if(cls.isAssignableFrom(getClass()))
+        if (cls.isAssignableFrom(getClass()))
             return cls.cast(this);
 
         throw new IllegalArgumentException("Unwrapping to class is not supported: " + cls);

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 115b67d..08fe62a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -325,9 +325,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 // skipPrimaryCheck is set only when listen locally for replicated cache events.
                 assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
-                boolean notify = true;
+                boolean notify = !evt.entry().isFiltered();
 
-                if (rmtFilter != null) {
+                if (notify && rmtFilter != null) {
                     try {
                         notify = rmtFilter.evaluate(evt);
                     }
@@ -472,38 +472,15 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
-            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer) {
-                try {
-                    assert evt != null;
-
-                    CacheContinuousQueryEntry e = evt.entry();
-
-                    EntryBuffer buf = entryBufs.get(e.partition());
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer,
+                boolean primary) {
+                assert evt != null;
 
-                    if (buf == null) {
-                        buf = new EntryBuffer();
-
-                        EntryBuffer oldRec = entryBufs.putIfAbsent(e.partition(), buf);
-
-                        if (oldRec != null)
-                            buf = oldRec;
-                    }
+                CacheContinuousQueryEntry e = evt.entry();
 
-                    e = buf.skipEntry(e);
-
-                    if (e != null && !ctx.localNodeId().equals(nodeId))
-                        ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true);
-                }
-                catch (ClusterTopologyCheckedException ex) {
-                    IgniteLogger log = ctx.log(getClass());
+                e.markFiltered();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Failed to send event notification to node, node left cluster " +
-                                "[node=" + nodeId + ", err=" + ex + ']');
-                }
-                catch (IgniteCheckedException ex) {
-                    U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                }
+                onEntryUpdated(evt, primary, false);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -618,20 +595,22 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         for (CacheContinuousQueryEntry e : entries)
             entries0.addAll(handleEvent(ctx, e));
 
-        Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-            new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
-                @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
-                    return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                }
-            },
-            new IgnitePredicate<CacheContinuousQueryEntry>() {
-                @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                    return !entry.isFiltered();
+        if (!entries0.isEmpty()) {
+            Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
+                new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+                    @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+                        return new CacheContinuousQueryEvent<>(cache, cctx, e);
+                    }
+                },
+                new IgnitePredicate<CacheContinuousQueryEntry>() {
+                    @Override public boolean apply(CacheContinuousQueryEntry entry) {
+                        return !entry.isFiltered();
+                    }
                 }
-            }
-        );
+            );
 
-        locLsnr.onUpdated(evts);
+            locLsnr.onUpdated(evts);
+        }
     }
 
     /**
@@ -731,11 +710,11 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
          * @param initCntr Update counters.
          */
         public PartitionRecovery(IgniteLogger log, AffinityTopologyVersion topVer, @Nullable Long initCntr) {
-            assert topVer.topologyVersion() > 0 : topVer;
-
             this.log = log;
 
             if (initCntr != null) {
+                assert topVer.topologyVersion() > 0 : topVer;
+
                 this.lastFiredEvt = initCntr;
 
                 curTop = topVer;
@@ -878,33 +857,6 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         }
 
         /**
-         * @param e Entry.
-         * @return Continuous query entry.
-         */
-        private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
-            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
-
-                e.markFiltered();
-
-                return e;
-            }
-            else {
-                buf.add(e.updateCounter());
-
-                // Double check. If another thread sent a event with counter higher than this event.
-                if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter())) {
-                    buf.remove(e.updateCounter());
-
-                    e.markFiltered();
-
-                    return e;
-                }
-                else
-                    return null;
-            }
-        }
-
-        /**
          * Add continuous entry.
          *
          * @param e Cache continuous query entry.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 86abbef..5246fac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -67,8 +67,9 @@ interface CacheContinuousQueryListener<K, V> {
     /**
      * @param evt Event
      * @param topVer Topology version.
+     * @param primary Primary
      */
-    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer);
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion topVer, boolean primary);
 
     /**
      * @param part Partition.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 8e4ff09..bb3c479 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -161,7 +161,25 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
      * @param updCntr Updated counter.
      * @param topVer Topology version.
      */
-    public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+    public void skipUpdateEvent(KeyCacheObject key,
+        int partId,
+        long updCntr,
+        AffinityTopologyVersion topVer) {
+        skipUpdateEvent(key, partId, updCntr, true, topVer);
+    }
+
+    /**
+     * @param key Entry key.
+     * @param partId Partition id.
+     * @param updCntr Updated counter.
+     * @param topVer Topology version.
+     * @param primary Primary.
+     */
+    public void skipUpdateEvent(KeyCacheObject key,
+        int partId,
+        long updCntr,
+        boolean primary,
+        AffinityTopologyVersion topVer) {
         if (lsnrCnt.get() > 0) {
             for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
                 CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
@@ -178,7 +196,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-                lsnr.skipUpdateEvent(evt, topVer);
+                lsnr.skipUpdateEvent(evt, topVer, primary);
             }
         }
     }
@@ -224,8 +242,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         boolean hasNewVal = newVal != null;
         boolean hasOldVal = oldVal != null;
 
-        if (!hasNewVal && !hasOldVal)
+        if (!hasNewVal && !hasOldVal) {
+            skipUpdateEvent(key, partId, updateCntr, primary, topVer);
+
             return;
+        }
 
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
new file mode 100644
index 0000000..2ca9988
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterAbstractTest.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Continuous queries counter tests.
+ */
+public abstract class CacheContinuousQueryCounterAbstractTest extends GridCommonAbstractTest
+    implements Serializable {
+    /** */
+    protected static final String CACHE_NAME = "test_cache";
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Latch timeout. */
+    protected static final long LATCH_TIMEOUT = 5000;
+
+    /** */
+    private static final String NO_CACHE_GRID_NAME = "noCacheGrid";
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setPeerClassLoadingEnabled(peerClassLoadingEnabled());
+
+        if (gridName.equals(NO_CACHE_GRID_NAME))
+            cfg.setClientMode(true);
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    @NotNull private CacheConfiguration cacheConfiguration() {
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setName(CACHE_NAME);
+        cacheCfg.setCacheMode(cacheMode());
+        cacheCfg.setAtomicityMode(atomicityMode());
+        cacheCfg.setNearConfiguration(nearConfiguration());
+        cacheCfg.setRebalanceMode(ASYNC);
+        cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+        cacheCfg.setCacheStoreFactory(new StoreFactory());
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setWriteThrough(true);
+        cacheCfg.setLoadPreviousValue(true);
+
+        return cacheCfg;
+    }
+
+    /**
+     * @return Peer class loading enabled flag.
+     */
+    protected boolean peerClassLoadingEnabled() {
+        return true;
+    }
+
+    /**
+     * @return Distribution.
+     */
+    protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGridsMultiThreaded(gridCount());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                for (int i = 0; i < gridCount(); i++) {
+                    if (grid(i).cluster().nodes().size() != gridCount())
+                        return false;
+                }
+
+                return true;
+            }
+        }, 3000);
+
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).destroyCache(CACHE_NAME);
+
+        for (int i = 0; i < gridCount(); i++)
+            grid(i).getOrCreateCache(cacheConfiguration());
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /**
+     * @return Grids count.
+     */
+    protected abstract int gridCount();
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAllEntries() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(5);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    synchronized (map) {
+                        List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(), e
+                            .unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(2, 2);
+            cache.put(3, 3);
+
+            cache.remove(2);
+
+            cache.put(1, 10);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(3, map.size());
+
+            List<T2<Integer, Long>> vals = map.get(1);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(1, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+            assertEquals(10, (int)vals.get(1).get1());
+            assertEquals(2L, (long)vals.get(1).get2());
+
+            vals = map.get(2);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(2, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+            assertNull(vals.get(1).get1());
+
+            vals = map.get(3);
+
+            assertNotNull(vals);
+            assertEquals(1, vals.size());
+            assertEquals(3, (int)vals.get(0).get1());
+            assertEquals(1L, (long)vals.get(0).get2());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTwoQueryListener() throws Exception {
+        if (cacheMode() == LOCAL)
+            return;
+
+        final IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+        final IgniteCache<Integer, Integer> cache1 = grid(1).cache(CACHE_NAME);
+
+        final AtomicInteger cntr = new AtomicInteger(0);
+        final AtomicInteger cntr1 = new AtomicInteger(0);
+
+        final ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>();
+        final ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map1 = new HashMap<>();
+        final Map<Integer, List<T2<Integer, Long>>> map2 = new HashMap<>();
+
+        qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    cntr.incrementAndGet();
+
+                    synchronized (map1) {
+                        List<T2<Integer, Long>> vals = map1.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map1.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(),
+                            e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+                }
+            }
+        });
+
+        qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    cntr1.incrementAndGet();
+
+                    synchronized (map2) {
+                        List<T2<Integer, Long>> vals = map2.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map2.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(),
+                            e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2);
+            QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) {
+            for (int i = 0; i < gridCount(); i++) {
+                IgniteCache<Object, Object> cache0 = grid(i).cache(CACHE_NAME);
+
+                cache0.put(1, 1);
+                cache0.put(2, 2);
+                cache0.put(3, 3);
+
+                cache0.remove(1);
+                cache0.remove(2);
+                cache0.remove(3);
+
+                final int iter = i + 1;
+
+                assert GridTestUtils.waitForCondition(new PA() {
+                    @Override public boolean apply() {
+                        return iter * 6 /* count operation */ * 2 /* count continues queries*/
+                            == (cntr.get() + cntr1.get());
+                    }
+                }, 5000L);
+
+                checkEvents(map1, i);
+
+                map1.clear();
+
+                checkEvents(map2, i);
+
+                map2.clear();
+            }
+        }
+    }
+
+    /**
+     * @param evnts Events.
+     * @param iter Iteration.
+     */
+    private void checkEvents(Map<Integer, List<T2<Integer, Long>>> evnts, long iter) {
+        List<T2<Integer, Long>> val = evnts.get(1);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 1
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(1L, (long)val.get(0).get1());
+
+        // Check remove 1
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+
+        val = evnts.get(2);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 2
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(2L, (long)val.get(0).get1());
+
+        // Check remove 2
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+
+        val = evnts.get(3);
+
+        assertEquals(val.size(), 2);
+
+        // Check put 3
+        assertEquals(iter * 2 + 1, (long)val.get(0).get2());
+        assertEquals(3L, (long)val.get(0).get1());
+
+        // Check remove 3
+        assertEquals(iter * 2 + 2, (long)val.get(1).get2());
+        assertNull(val.get(1).get1());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestartQuery() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        final int keyCnt = 300;
+
+        final int updateKey = 1;
+
+        for (int i = 0; i < keyCnt; i++)
+            cache.put(updateKey, i);
+
+        for (int i = 0; i < 10; i++) {
+            if (i % 2 == 0) {
+                final AtomicInteger cntr = new AtomicInteger(0);
+
+                ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+                final List<T2<Integer, Long>> vals = new ArrayList<>();
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+                    @Override public void onUpdated(
+                        Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                        for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                            synchronized (vals) {
+                                cntr.incrementAndGet();
+
+                                vals.add(new T2<>(e.getValue(),
+                                    e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                            }
+                        }
+                    }
+                });
+
+                try (QueryCursor<Cache.Entry<Integer, Integer>> ignore = cache.query(qry)) {
+                    for (int key = 0; key < keyCnt; key++)
+                        cache.put(updateKey, cache.get(updateKey) + 1);
+
+                    assert GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return cntr.get() == keyCnt;
+                        }
+                    }, 2000L);
+
+                    synchronized (vals) {
+                        for (T2<Integer, Long> val : vals)
+                            assertEquals((long)val.get1() + 1, (long)val.get2());
+                    }
+                }
+            }
+            else {
+                for (int key = 0; key < keyCnt; key++)
+                    cache.put(updateKey, cache.get(updateKey) + 1);
+
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testEntriesByFilter() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, List<T2<Integer, Long>>> map = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(8);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    synchronized (map) {
+                        List<T2<Integer, Long>> vals = map.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map.put(e.getKey(), vals);
+                        }
+
+                        vals.add(new T2<>(e.getValue(),
+                            e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+                    }
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,Integer>() {
+            @Override public boolean evaluate(CacheEntryEvent<? extends Integer,? extends Integer> evt) {
+                return evt.getValue() % 2 == 0;
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.put(1, 1);
+            cache.put(1, 2);
+            cache.put(1, 3);
+            cache.put(1, 4);
+
+            cache.put(2, 1);
+            cache.put(2, 2);
+            cache.put(2, 3);
+            cache.put(2, 4);
+
+            cache.remove(1);
+            cache.remove(2);
+
+            cache.put(1, 10);
+            cache.put(2, 40);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(2, map.size());
+
+            List<T2<Integer, Long>> vals = map.get(1);
+
+            assertNotNull(vals);
+            assertEquals(4, vals.size());
+
+            assertEquals((int)vals.get(0).get1(), 2);
+            assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+            assertEquals((int)vals.get(1).get1(), 4);
+            assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+            assertNull(vals.get(2).get1());
+            assertEquals(5, (long)vals.get(2).get2());
+
+            assertEquals((int)vals.get(3).get1(), 10);
+            assertEquals(6, (long)vals.get(3).get2());
+
+            vals = map.get(2);
+
+            assertNotNull(vals);
+            assertEquals(4, vals.size());
+
+            assertEquals((int)vals.get(0).get1(), 2);
+            assertEquals((long)vals.get(0).get1(), (long)vals.get(0).get2());
+
+            assertEquals((int)vals.get(1).get1(), 4);
+            assertEquals((long)vals.get(1).get1(), (long)vals.get(1).get2());
+
+            assertNull(vals.get(2).get1());
+            assertEquals(5, (long)vals.get(2).get2());
+
+            assertEquals((int)vals.get(3).get1(), 40);
+            assertEquals(6, (long)vals.get(3).get2());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCache() throws Exception {
+        IgniteCache<Integer, Integer> cache = grid(0).cache(CACHE_NAME);
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, T2<Integer, Long>> map = new ConcurrentHashMap8<>();
+        final CountDownLatch latch = new CountDownLatch(10);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    map.put(e.getKey(), new T2<>(e.getValue(),
+                        e.unwrap(CacheQueryEntryEvent.class).getPartitionUpdateCounter()));
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+            cache.loadCache(null, 0);
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : "Count: " + latch.getCount();
+
+            assertEquals(10, map.size());
+
+            for (int i = 0; i < 10; i++) {
+                assertEquals(i, (int)map.get(i).get1());
+                assertEquals((long)1, (long)map.get(i).get2());
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class StoreFactory implements Factory<CacheStore> {
+        @Override public CacheStore create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     * Store.
+     */
+    private static class TestStore extends CacheStoreAdapter<Object, Object> {
+        /** {@inheritDoc} */
+        @Override public void loadCache(IgniteBiInClosure<Object, Object> clo, Object... args) {
+            for (int i = 0; i < 10; i++)
+                clo.apply(i, i);
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Object load(Object key) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) throws CacheWriterException {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
new file mode 100644
index 0000000..7b97928
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
new file mode 100644
index 0000000..aa42832
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterPartitionedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterPartitionedTxTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
new file mode 100644
index 0000000..afa7a22
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedAtomicTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedAtomicTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
new file mode 100644
index 0000000..4ee12de
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryCounterReplicatedTxTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ *
+ */
+public class CacheContinuousQueryCounterReplicatedTxTest extends CacheContinuousQueryCounterAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
new file mode 100644
index 0000000..dc86f4e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java
@@ -0,0 +1,1124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.ALL;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT;
+import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ *
+ */
+public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 5;
+
+    /** */
+    private static final int KEYS = 50;
+
+    /** */
+    private static final int VALS = 10;
+
+    /** */
+    public static final int ITERATION_CNT = 100;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackupsAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicNoBackupsClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            ATOMIC,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTx() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxClientExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            1,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicated() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxReplicatedClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackups() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsAllNodes() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsExplicit() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, SERVER);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxNoBackupsClient() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            0,
+            TRANSACTIONAL,
+            ONHEAP_TIERED,
+            false);
+
+        testContinuousQuery(ccfg, CLIENT);
+    }
+
+    /**
+     * @param ccfg Cache configuration.
+     * @param deploy The place where continuous query will be started.
+     * @throws Exception If failed.
+     */
+    private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy)
+        throws Exception {
+        ignite(0).createCache(ccfg);
+
+        try {
+            long seed = System.currentTimeMillis();
+
+            Random rnd = new Random(seed);
+
+            log.info("Random seed: " + seed);
+
+            List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>();
+
+            Collection<QueryCursor<?>> curs = new ArrayList<>();
+
+            if (deploy == CLIENT) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                        for (CacheEntryEvent<?, ?> evt : evts)
+                            evtsQueue.add(evt);
+                    }
+                });
+
+                evtsQueues.add(evtsQueue);
+
+                QueryCursor<?> cur = grid(NODES - 1).cache(ccfg.getName()).query(qry);
+
+                curs.add(cur);
+            }
+            else if (deploy == SERVER) {
+                ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+                qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                    @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                        for (CacheEntryEvent<?, ?> evt : evts)
+                            evtsQueue.add(evt);
+                    }
+                });
+
+                evtsQueues.add(evtsQueue);
+
+                QueryCursor<?> cur = grid(rnd.nextInt(NODES - 1)).cache(ccfg.getName()).query(qry);
+
+                curs.add(cur);
+            }
+            else {
+                for (int i = 0; i < NODES - 1; i++) {
+                    ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+                    final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000);
+
+                    qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() {
+                        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+                            for (CacheEntryEvent<?, ?> evt : evts)
+                                evtsQueue.add(evt);
+                        }
+                    });
+
+                    evtsQueues.add(evtsQueue);
+
+                    QueryCursor<?> cur = ignite(i).cache(ccfg.getName()).query(qry);
+
+                    curs.add(cur);
+                }
+            }
+
+            ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>();
+
+            Map<Integer, Long> partCntr = new ConcurrentHashMap<>();
+
+            try {
+                for (int i = 0; i < ITERATION_CNT; i++) {
+                    if (i % 20 == 0)
+                        log.info("Iteration: " + i);
+
+                    for (int idx = 0; idx < NODES; idx++)
+                        randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName()));
+                }
+            }
+            finally {
+                for (QueryCursor<?> cur : curs)
+                    cur.close();
+            }
+        }
+        finally {
+            ignite(0).destroyCache(ccfg.getName());
+        }
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @param evtsQueues Events queue.
+     * @param expData Expected cache data.
+     * @param partCntr Partition counter.
+     * @param cache Cache.
+     * @throws Exception If failed.
+     */
+    private void randomUpdate(
+        Random rnd,
+        List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        ConcurrentMap<Object, Object> expData,
+        Map<Integer, Long> partCntr,
+        IgniteCache<Object, Object> cache)
+        throws Exception {
+        Object key = new QueryTestKey(rnd.nextInt(KEYS));
+        Object newVal = value(rnd);
+        Object oldVal = expData.get(key);
+
+        int op = rnd.nextInt(13);
+
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        Transaction tx = null;
+
+        if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean())
+            tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd));
+
+        try {
+            // log.info("Random operation [key=" + key + ", op=" + op + ']');
+
+            switch (op) {
+                case 0: {
+                    cache.put(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 1: {
+                    cache.getAndPut(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 2: {
+                    cache.remove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 3: {
+                    cache.getAndRemove(key);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 4: {
+                    cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                    expData.put(key, newVal);
+
+                    break;
+                }
+
+                case 5: {
+                    cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    updatePartitionCounter(cache, key, partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal);
+
+                    expData.remove(key);
+
+                    break;
+                }
+
+                case 6: {
+                    cache.putIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 7: {
+                    cache.getAndPutIfAbsent(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal == null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 8: {
+                    cache.replace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 9: {
+                    cache.getAndReplace(key, newVal);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    if (oldVal != null) {
+                        updatePartitionCounter(cache, key, partCntr);
+
+                        waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                        expData.put(key, newVal);
+                    }
+                    else
+                        checkNoEvent(evtsQueues);
+
+                    break;
+                }
+
+                case 10: {
+                    if (oldVal != null) {
+                        Object replaceVal = value(rnd);
+
+                        boolean success = replaceVal.equals(oldVal);
+
+                        if (success) {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            updatePartitionCounter(cache, key, partCntr);
+
+                            waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal);
+
+                            expData.put(key, newVal);
+                        }
+                        else {
+                            cache.replace(key, replaceVal, newVal);
+
+                            if (tx != null)
+                                tx.commit();
+
+                            checkNoEvent(evtsQueues);
+                        }
+                    }
+                    else {
+                        cache.replace(key, value(rnd), newVal);
+
+                        if (tx != null)
+                            tx.commit();
+
+                        checkNoEvent(evtsQueues);
+                    }
+
+                    break;
+                }
+
+                case 11: {
+                    SortedMap<Object, Object> vals = new TreeMap<>();
+
+                    while (vals.size() < KEYS / 5)
+                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), value(rnd));
+
+                    cache.putAll(vals);
+
+                    if (tx != null)
+                        tx.commit();
+
+                    for (Map.Entry<Object, Object> e : vals.entrySet())
+                        updatePartitionCounter(cache, e.getKey(), partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
+
+                    expData.putAll(vals);
+
+                    break;
+                }
+
+                case 12: {
+                    SortedMap<Object, Object> vals = new TreeMap<>();
+
+                    while (vals.size() < KEYS / 5)
+                        vals.put(new QueryTestKey(rnd.nextInt(KEYS)), newVal);
+
+                    cache.invokeAll(vals.keySet(), new EntrySetValueProcessor(newVal, rnd.nextBoolean()));
+
+                    if (tx != null)
+                        tx.commit();
+
+                    for (Map.Entry<Object, Object> e : vals.entrySet())
+                        updatePartitionCounter(cache, e.getKey(), partCntr);
+
+                    waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), vals, expData);
+
+                    for (Object o : vals.keySet())
+                        expData.put(o, newVal);
+
+                    break;
+                }
+
+                default:
+                    fail("Op:" + op);
+            }
+        } finally {
+            if (tx != null)
+                tx.close();
+        }
+    }
+
+    /**
+     *  @param evtsQueues Queue.
+     * @param partCntrs Counters.
+     * @param aff Affinity.
+     * @param vals Values.
+     * @param expData Expected data.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        SortedMap<Object, Object> vals,
+        Map<Object, Object> expData)
+        throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            Map<Object, CacheEntryEvent> rcvEvts = new HashMap<>();
+
+            for (int i = 0; i < vals.size(); i++) {
+                CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+                rcvEvts.put(evt.getKey(), evt);
+            }
+
+            assertEquals(vals.size(), rcvEvts.size());
+
+            for (Map.Entry<Object, Object> e : vals.entrySet()) {
+                Object key = e.getKey();
+                Object val = e.getValue();
+                Object oldVal = expData.get(key);
+
+                if (val == null && oldVal == null) {
+                    checkNoEvent(evtsQueues);
+
+                    continue;
+                }
+
+                CacheEntryEvent evt = rcvEvts.get(key);
+
+                assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']',
+                    evt);
+                assertEquals(key, evt.getKey());
+                assertEquals(val, evt.getValue());
+                assertEquals(oldVal, evt.getOldValue());
+
+                long cntr = partCntrs.get(aff.partition(key));
+                CacheQueryEntryEvent qryEntryEvt = (CacheQueryEntryEvent)evt.unwrap(CacheQueryEntryEvent.class);
+
+                assertNotNull(cntr);
+                assertNotNull(qryEntryEvt);
+
+                assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+            }
+        }
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionIsolation}.
+     */
+    private TransactionIsolation txRandomIsolation(Random rnd) {
+        int val = rnd.nextInt(3);
+
+        if (val == 0)
+            return READ_COMMITTED;
+        else if (val == 1)
+            return REPEATABLE_READ;
+        else
+            return SERIALIZABLE;
+    }
+
+    /**
+     * @param rnd {@link Random}.
+     * @return {@link TransactionConcurrency}.
+     */
+    private TransactionConcurrency txRandomConcurrency(Random rnd) {
+        return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key
+     * @param cntrs Partition counters.
+     */
+    private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) {
+        Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName());
+
+        int part = aff.partition(key);
+
+        Long partCntr = cntrs.get(part);
+
+        if (partCntr == null)
+            partCntr = 0L;
+
+        cntrs.put(part, ++partCntr);
+    }
+
+    /**
+     * @param rnd Random generator.
+     * @return Cache value.
+     */
+    private static Object value(Random rnd) {
+        return new QueryTestValue(rnd.nextInt(VALS));
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @param partCntrs Partition counters.
+     * @param aff Affinity function.
+     * @param key Key.
+     * @param val Value.
+     * @param oldVal Old value.
+     * @throws Exception If failed.
+     */
+    private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues,
+        Map<Integer, Long> partCntrs,
+        Affinity<Object> aff,
+        Object key,
+        Object val,
+        Object oldVal)
+        throws Exception {
+        if (val == null && oldVal == null) {
+            checkNoEvent(evtsQueues);
+
+            return;
+        }
+
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS);
+
+            assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt);
+            assertEquals(key, evt.getKey());
+            assertEquals(val, evt.getValue());
+            assertEquals(oldVal, evt.getOldValue());
+
+            long cntr = partCntrs.get(aff.partition(key));
+            CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class);
+
+            assertNotNull(cntr);
+            assertNotNull(qryEntryEvt);
+
+            assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter());
+        }
+    }
+
+    /**
+     * @param evtsQueues Event queue.
+     * @throws Exception If failed.
+     */
+    private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception {
+        for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) {
+            CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS);
+
+            assertNull(evt);
+        }
+    }
+
+    /**
+     *
+     * @param cacheMode Cache mode.
+     * @param backups Number of backups.
+     * @param atomicityMode Cache atomicity mode.
+     * @param memoryMode Cache memory mode.
+     * @param store If {@code true} configures dummy cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        int backups,
+        CacheAtomicityMode atomicityMode,
+        CacheMemoryMode memoryMode,
+        boolean store) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setMemoryMode(memoryMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicWriteOrderMode(PRIMARY);
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        if (store) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+            ccfg.setReadThrough(true);
+            ccfg.setWriteThrough(true);
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+
+    /**
+     *
+     */
+    static class QueryTestKey implements Serializable, Comparable {
+        /** */
+        private final Integer key;
+
+        /**
+         * @param key Key.
+         */
+        public QueryTestKey(Integer key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestKey that = (QueryTestKey)o;
+
+            return key.equals(that.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestKey.class, this);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(Object o) {
+            return key - ((QueryTestKey)o).key;
+        }
+    }
+
+    /**
+     *
+     */
+    static class QueryTestValue implements Serializable {
+        /** */
+        private final Integer val1;
+
+        /** */
+        private final String val2;
+
+        /**
+         * @param val Value.
+         */
+        public QueryTestValue(Integer val) {
+            this.val1 = val;
+            this.val2 = String.valueOf(val);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            QueryTestValue that = (QueryTestValue) o;
+
+            return val1.equals(that.val1) && val2.equals(that.val2);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = val1.hashCode();
+
+            res = 31 * res + val2.hashCode();
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(QueryTestValue.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> {
+        /** */
+        private Object val;
+
+        /** */
+        private boolean retOld;
+
+        /**
+         * @param val Value to set.
+         * @param retOld Return old value flag.
+         */
+        public EntrySetValueProcessor(Object val, boolean retOld) {
+            this.val = val;
+            this.retOld = retOld;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry<Object, Object> e, Object... args) {
+            Object old = retOld ? e.getValue() : null;
+
+            if (val != null)
+                e.setValue(val);
+            else
+                e.remove();
+
+            return old;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(EntrySetValueProcessor.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    protected enum ContinuousDeploy {
+        CLIENT, SERVER, ALL
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4de124c4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 4dfe741..4136d21 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -71,12 +71,17 @@ import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQ
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQuerySelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterPartitionedTxTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedAtomicTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryCounterReplicatedTxTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousBatchForceServerModeAckTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxSelfTest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicNearEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicP2PDisabledSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAtomicSelfTest;
@@ -201,6 +206,11 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterPartitionedAtomicTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterPartitionedTxTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterReplicatedAtomicTest.class);
+        suite.addTestSuite(CacheContinuousQueryCounterReplicatedTxTest.class);
+        suite.addTestSuite(CacheContinuousQueryRandomOperationsTest.class);
         suite.addTestSuite(CacheContinuousBatchAckTest.class);
         suite.addTestSuite(CacheContinuousBatchForceServerModeAckTest.class);