You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/10/15 14:42:54 UTC

[2/5] ignite git commit: IGNITE-7953: MVCC: Continuous queries support. This closes #4767.

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
index ce45570..842b6f4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheKeepBinaryIterationTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
@@ -104,6 +105,18 @@ public class CacheKeepBinaryIterationTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testMvccTxOnHeap() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,1, TRANSACTIONAL_SNAPSHOT);
+
+        doTestScanQuery(ccfg, true, true);
+        doTestScanQuery(ccfg, true, false);
+        doTestScanQuery(ccfg, false, true);
+        doTestScanQuery(ccfg, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAtomicOnHeapLocalEntries() throws Exception {
         CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, ATOMIC);
 
@@ -126,6 +139,19 @@ public class CacheKeepBinaryIterationTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testMvccTxOnHeapLocalEntries() throws Exception {
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 1, TRANSACTIONAL_SNAPSHOT);
+
+        doTestLocalEntries(ccfg, true, true);
+        doTestLocalEntries(ccfg, true, false);
+        doTestLocalEntries(ccfg, false, true);
+        doTestLocalEntries(ccfg, false, false);
+    }
+
+
+    /**
      * @param ccfg Cache configuration.
      */
     private void doTestScanQuery(final CacheConfiguration<Object, Object> ccfg, boolean keepBinary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
index 9b531c6..9ec25d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -77,6 +78,12 @@ public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest {
         else {
             CacheConfiguration ccfg = defaultCacheConfiguration();
 
+            ccfg.setAtomicityMode(atomicityMode());
+
+            // TODO IGNITE-9530 Remove this clause.
+            if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+                ccfg.setNearConfiguration(null);
+
             cfg.setCacheConfiguration(ccfg);
         }
 
@@ -84,6 +91,13 @@ public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @return Transaction snapshot.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /**
      * Test client reconnect to alive grid.
      *
      * @throws Exception If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 0ace0ba..707ef4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -78,6 +78,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -112,13 +113,18 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
             cacheCfg.setCacheMode(cacheMode());
             cacheCfg.setAtomicityMode(atomicityMode());
-            cacheCfg.setNearConfiguration(nearConfiguration());
+            cacheCfg.setLoadPreviousValue(true);
             cacheCfg.setRebalanceMode(ASYNC);
             cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-            cacheCfg.setCacheStoreFactory(new StoreFactory());
-            cacheCfg.setReadThrough(true);
-            cacheCfg.setWriteThrough(true);
-            cacheCfg.setLoadPreviousValue(true);
+            cacheCfg.setNearConfiguration(nearConfiguration());
+
+            if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) {
+                cacheCfg.setCacheStoreFactory(new StoreFactory()); // TODO IGNITE-8582 enable for tx snapshot.
+                cacheCfg.setReadThrough(true); // TODO IGNITE-8582 enable for tx snapshot.
+                cacheCfg.setWriteThrough(true); // TODO IGNITE-8582 enable for tx snapshot.
+            }
+            else
+                cacheCfg.setIndexedTypes(Integer.class, Integer.class);
 
             cfg.setCacheConfiguration(cacheCfg);
         }
@@ -240,6 +246,23 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     protected abstract int gridCount();
 
     /**
+     * @param cache Cache.
+     * @param key Key.
+     * @param val Value.
+     */
+    protected void cachePut(IgniteCache cache, Integer key, Integer val) {
+        cache.put(key, val);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     */
+    protected void cacheRemove(IgniteCache cache, Integer key) {
+        cache.remove(key);
+    }
+
+    /**
      * @throws Exception If failed.
      */
     @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@@ -314,13 +337,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         });
 
         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
-            cache.put(1, 1);
-            cache.put(2, 2);
-            cache.put(3, 3);
+            cachePut(cache,1, 1);
+            cachePut(cache,2, 2);
+            cachePut(cache,3, 3);
 
-            cache.remove(2);
+            cacheRemove(cache, 2);
 
-            cache.put(1, 10);
+            cachePut(cache, 1, 10);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -370,7 +393,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
             for (int i = 0; i < 100; i++)
-                cache.put(i, i);
+                cachePut(cache, i, i);
         }
     }
 
@@ -409,13 +432,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             for (int i = 0; i < gridCount(); i++) {
                 IgniteCache<Object, Object> cache0 = grid(i).cache(DEFAULT_CACHE_NAME);
 
-                cache0.put(1, 1);
-                cache0.put(2, 2);
-                cache0.put(3, 3);
+                cachePut(cache0, 1, 1);
+                cachePut(cache0, 2, 2);
+                cachePut(cache0, 3, 3);
 
-                cache0.remove(1);
-                cache0.remove(2);
-                cache0.remove(3);
+                cacheRemove(cache0, 1);
+                cacheRemove(cache0, 2);
+                cacheRemove(cache0, 3);
 
                 final int iter = i + 1;
 
@@ -467,7 +490,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         final int keyCnt = parts * 2;
 
         for (int i = 0; i < parts / 2; i++)
-            cache.put(i, i);
+            cachePut(cache, i, i);
 
         for (int i = 0; i < 10; i++) {
             if (i % 2 == 0) {
@@ -486,7 +509,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 QueryCursor<Cache.Entry<Integer, Integer>> qryCur = cache.query(qry);
 
                 for (int key = 0; key < keyCnt; key++)
-                    cache.put(key, key);
+                    cachePut(cache, key, key);
 
                 try {
                     assert GridTestUtils.waitForCondition(new PA() {
@@ -501,7 +524,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
             else {
                 for (int key = 0; key < keyCnt; key++)
-                    cache.put(key, key);
+                    cachePut(cache, key, key);
             }
         }
     }
@@ -544,16 +567,16 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         });
 
         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
-            cache.put(1, 1);
-            cache.put(2, 2);
-            cache.put(3, 3);
-            cache.put(4, 4);
+            cachePut(cache, 1, 1);
+            cachePut(cache, 2, 2);
+            cachePut(cache, 3, 3);
+            cachePut(cache, 4, 4);
 
-            cache.remove(2);
-            cache.remove(3);
+            cacheRemove(cache, 2);
+            cacheRemove(cache, 3);
 
-            cache.put(1, 10);
-            cache.put(4, 40);
+            cachePut(cache, 1, 10);
+            cachePut(cache, 4, 40);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -631,8 +654,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                     break;
             }
 
-            cache.put(locKey, 1);
-            cache.put(rmtKey, 2);
+            cachePut(cache, locKey, 1);
+            cachePut(cache, rmtKey, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -706,12 +729,12 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             Iterator<Integer> it = keys.iterator();
 
             for (int i = 0; i < 4; i++)
-                cache.put(it.next(), 0);
+                cachePut(cache, it.next(), 0);
 
             assert !latch.await(2, SECONDS);
 
             for (int i = 0; i < 2; i++)
-                cache.put(it.next(), 0);
+                cachePut(cache, it.next(), 0);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -790,7 +813,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
             }
 
             for (Integer k : keys)
-                cache.put(k, 0);
+                cachePut(cache, k, 0);
 
             assert !latch.await(2, SECONDS);
             assert latch.await(1000 + LATCH_TIMEOUT, MILLISECONDS);
@@ -832,7 +855,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         });
 
         for (int i = 0; i < 10; i++)
-            cache.put(i, i);
+            cachePut(cache, i, i);
 
         try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry)) {
             List<Cache.Entry<Integer, Integer>> res = cur.getAll();
@@ -884,7 +907,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         });
 
         for (int i = 0; i < 10; i++)
-            cache.put(i, i);
+            cachePut(cache, i, i);
 
         try (QueryCursor<Cache.Entry<Integer, Integer>> cur = cache.query(qry)) {
             List<Cache.Entry<Integer, Integer>> res = cur.getAll();
@@ -906,8 +929,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 exp++;
             }
 
-            cache.put(10, 10);
-            cache.put(11, 11);
+            cachePut(cache, 10, 10);
+            cachePut(cache, 11, 11);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS) : latch.getCount();
 
@@ -978,8 +1001,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) {
             cache.put(new GridCacheInternalKeyImpl("test", "test"), 1);
 
-            cache.put(1, 1);
-            cache.put(2, 2);
+            cachePut(cache, 1, 1);
+            cachePut(cache, 2, 2);
 
             assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
 
@@ -1014,7 +1037,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
                 log.info("Started node without cache: " + ignite);
             }
 
-            cache.put(1, 1);
+            cachePut(cache, 1, 1);
 
             assertTrue(latch.await(5000, MILLISECONDS));
         }
@@ -1102,7 +1125,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
 
             try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
                 for (int i = 0; i < 100; i++)
-                    cache.put(i, i);
+                    cachePut(cache, i, i);
 
                 assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
                 assert execLatch.await(LATCH_TIMEOUT, MILLISECONDS);
@@ -1147,8 +1170,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
         });
 
         try (QueryCursor<Cache.Entry<Object, Object>> ignored = cache.query(qry)) {
-            cache.put(1, 1);
-            cache.put(2, 2);
+            cachePut(cache, 1, 1);
+            cachePut(cache, 2, 2);
 
             // Wait for expiration.
             Thread.sleep(2000);

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
index 0241a69..9dca5ea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryConcurrentTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
@@ -106,6 +107,13 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
     /**
      * @throws Exception If failed.
      */
+    public void testReplicatedMvccTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, 1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testRestartReplicated() throws Exception {
         testRestartRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
     }
@@ -127,6 +135,13 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
     /**
      * @throws Exception If failed.
      */
+    public void testRestartPartitionMvccTx() throws Exception {
+        testRestartRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testReplicatedAtomic() throws Exception {
         testRegistration(cacheConfiguration(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC, 2));
     }
@@ -141,6 +156,13 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
     /**
      * @throws Exception If failed.
      */
+    public void testPartitionMvccTx() throws Exception {
+        testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT, 2));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testPartitionAtomic() throws Exception {
         testRegistration(cacheConfiguration(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC, 2));
     }
@@ -342,17 +364,44 @@ public class GridCacheContinuousQueryConcurrentTest extends GridCommonAbstractTe
         // were busy setting up the cache listener.
         // Check asynchronously.
         // Complete the promise if the key was inserted concurrently.
-        cache.getAsync(key).listen(new IgniteInClosure<IgniteFuture<String>>() {
-            @Override public void apply(IgniteFuture<String> f) {
-                String val = f.get();
+        if (!((IgniteCacheProxy)cache).context().mvccEnabled()) {
+            cache.getAsync(key).listen(new IgniteInClosure<IgniteFuture<String>>() {
+                @Override public void apply(IgniteFuture<String> f) {
+                    String val = f.get();
 
-                if (val != null) {
-                    log.info("Completed by get: " + id);
+                    if (val != null) {
+                        log.info("Completed by get: " + id);
 
-                    (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get");
+                        (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by async get");
+                    }
                 }
-            }
-        });
+            });
+        }
+        else {
+            // For MVCC caches we need to wait until updated value becomes visible for consequent readers.
+            // When MVCC transaction completes, it's updates are not visible immediately for the new transactions.
+            // This is caused by the lag between transaction completes on the node and mvcc coordinator
+            // removes this transaction from the active list.
+            GridTestUtils.runAsync(new Runnable() {
+                @Override public void run() {
+                    String v;
+
+                    while (!Thread.currentThread().isInterrupted()) {
+                        v = cache.get(key);
+
+                        if (v == null)
+                            doSleep(100);
+                        else {
+                            log.info("Completed by async mvcc get: " + id);
+
+                            (((GridFutureAdapter)((IgniteFutureImpl)promise).internalFuture())).onDone("by get");
+
+                            break;
+                        }
+                    }
+                }
+            });
+        }
 
         return promise;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
index b316042..0eb2e87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryMultiNodesFilteringTest.java
@@ -40,6 +40,7 @@ import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cluster.ClusterNode;
@@ -318,11 +319,18 @@ public class GridCacheContinuousQueryMultiNodesFilteringTest extends GridCommonA
         return new CacheConfiguration("test-cache-cq")
             .setBackups(1)
             .setNodeFilter(filter)
-            .setAtomicityMode(ATOMIC)
+            .setAtomicityMode(atomicityMode())
             .setWriteSynchronizationMode(FULL_SYNC)
             .setCacheMode(PARTITIONED);
     }
 
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
     /** */
     private final static class ListenerConfiguration extends MutableCacheEntryListenerConfiguration {
         /** Operation. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
index 6474df5..d02c6ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedTxOneNodeTest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.cache.query.continuous;
 
+import java.util.Iterator;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.Cache;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
@@ -55,6 +57,10 @@ public class GridCacheContinuousQueryReplicatedTxOneNodeTest extends GridCommonA
         cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC);
         cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
 
+        // TODO IGNITE-9530 Remove this clause.
+        if (atomicMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+            cacheCfg.setNearConfiguration(null);
+
         cfg.setCacheConfiguration(cacheCfg);
 
         TcpDiscoverySpi disco = new TcpDiscoverySpi();
@@ -164,7 +170,14 @@ public class GridCacheContinuousQueryReplicatedTxOneNodeTest extends GridCommonA
             for (int i = 0; i < 10; i++)
                 cache.put("key" + i, i);
 
-            cache.clear();
+            if (atomicMode() != CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT)
+                cache.clear();
+            else { // TODO IGNITE-7952. Remove "else" clause - do cache.clear() instead of iteration.
+                for (Iterator it = cache.iterator(); it.hasNext();) {
+                    it.next();
+                    it.remove();
+                }
+            }
 
             qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() {
                 @Override public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> evts)

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
index 5baa3a7..a8f0d71 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryBackupQueueTest.java
@@ -30,6 +30,7 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -98,6 +99,13 @@ public class IgniteCacheContinuousQueryBackupQueueTest extends GridCommonAbstrac
         return cfg;
     }
 
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
index 1e40170..ab0cb50 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryClientTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -61,7 +62,7 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
         ccfg.setCacheMode(PARTITIONED);
-        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setAtomicityMode(atomicityMode());
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(ccfg);
@@ -71,6 +72,13 @@ public class IgniteCacheContinuousQueryClientTest extends GridCommonAbstractTest
         return cfg;
     }
 
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index 81a7515..4e50cb9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -25,6 +25,7 @@ import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryEventFilter;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -66,7 +67,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
 
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
         ccfg.setCacheMode(PARTITIONED);
-        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setAtomicityMode(atomicityMode());
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
         cfg.setCacheConfiguration(ccfg);
@@ -76,6 +77,13 @@ public class IgniteCacheContinuousQueryImmutableEntryTest extends GridCommonAbst
         return cfg;
     }
 
+    /**
+     * @return Atomicity mode.
+     */
+    protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
index d9804bf..74f80df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListImplSelfTest.java
@@ -480,11 +480,6 @@ public class CacheFreeListImplSelfTest extends GridCommonAbstractTest {
         @Override public byte newMvccTxState() {
             return 0;
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean isKeyAbsentBefore() {
-            return false;
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
index 344a1cc..c8cf9aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
@@ -81,6 +81,7 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> {
                 new GridCacheSharedTtlCleanupManager(),
                 new PartitionsEvictManager(),
                 new CacheNoopJtaManager(),
+                null,
                 null
             ),
             defaultCacheConfiguration(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 6246aa5..7fb896f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -127,11 +127,6 @@ public abstract class GridH2Row extends GridH2SearchRowAdapter implements CacheD
     }
 
     /** {@inheritDoc} */
-    @Override public boolean isKeyAbsentBefore() {
-        return row.isKeyAbsentBefore();
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean indexSearchRow() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java
new file mode 100644
index 0000000..1418d47
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractContinuousQuerySelfTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+/**
+ *
+ */
+public abstract class CacheMvccAbstractContinuousQuerySelfTest extends GridCacheContinuousQueryAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testInternalKey() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    public void testExpired() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+    }
+
+    /** {@inheritDoc} */
+    public void testLoadCache() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-7954");
+    }
+
+    /** {@inheritDoc} */
+    public void testEvents() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-9321");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java
new file mode 100644
index 0000000..96fdf06
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlContinuousQuerySelfTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryAbstractSelfTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+/**
+ * Base class for MVCC continuous queries.
+ */
+public abstract class CacheMvccAbstractSqlContinuousQuerySelfTest extends CacheMvccAbstractContinuousQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected void cachePut(IgniteCache cache, Integer key, Integer val) {
+        cache.query(new SqlFieldsQuery("MERGE INTO Integer (_key, _val) values (" + key + ',' + val + ')')).getAll();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void cacheRemove(IgniteCache  cache, Integer key) {
+        cache.query(new SqlFieldsQuery("DELETE FROM Integer WHERE _key=" + key)).getAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
new file mode 100644
index 0000000..ed97b1b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBasicContinuousQueryTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.TX_SIZE_THRESHOLD;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Basic continuous queries test with enabled mvcc.
+ */
+public class CacheMvccBasicContinuousQueryTest extends CacheMvccAbstractTest  {
+    /** */
+    private static final long LATCH_TIMEOUT = 5000;
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        // Wait for all routines are unregistered
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                for (Ignite node : G.allGrids()) {
+                    GridContinuousProcessor proc = ((IgniteEx)node).context().continuous();
+
+                    if(((Map)U.field(proc, "rmtInfos")).size() > 0)
+                        return false;
+                }
+
+                return true;
+            }
+        }, 3000);
+
+        for (Ignite node : G.allGrids()) {
+            GridContinuousProcessor proc = ((IgniteEx)node).context().continuous();
+
+            assertEquals(1, ((Map)U.field(proc, "locInfos")).size());
+            assertEquals(0, ((Map)U.field(proc, "rmtInfos")).size());
+            assertEquals(0, ((Map)U.field(proc, "startFuts")).size());
+            assertEquals(0, ((Map)U.field(proc, "stopFuts")).size());
+            assertEquals(0, ((Map)U.field(proc, "bufCheckThreads")).size());
+
+            CacheContinuousQueryManager mgr = ((IgniteEx)node).context().cache().internalCache(DEFAULT_CACHE_NAME).context().continuousQueries();
+
+            assertEquals(0, ((Map)U.field(mgr, "lsnrs")).size());
+
+            MvccCachingManager cachingMgr = ((IgniteEx)node).context().cache().context().mvccCaching();
+
+            assertEquals(0, ((Map)U.field(cachingMgr, "enlistCache")).size());
+            assertEquals(0, ((Map)U.field(cachingMgr, "cntrs")).size());
+        }
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAllEntries() throws Exception {
+        Ignite node = startGrids(3);
+
+        final IgniteCache cache = node.createCache(
+            cacheConfiguration(cacheMode(), FULL_SYNC, 1, 2)
+                .setCacheMode(CacheMode.REPLICATED)
+                .setIndexedTypes(Integer.class, Integer.class));
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        final Map<Integer, List<Integer>> map = new HashMap<>();
+        final CountDownLatch latch = new CountDownLatch(5);
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                for (CacheEntryEvent<? extends Integer, ? extends Integer> e : evts) {
+                    synchronized (map) {
+                        List<Integer> vals = map.get(e.getKey());
+
+                        if (vals == null) {
+                            vals = new ArrayList<>();
+
+                            map.put(e.getKey(), vals);
+                        }
+
+                        vals.add(e.getValue());
+                    }
+
+                    latch.countDown();
+                }
+            }
+        });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                String dml = "INSERT INTO Integer (_key, _val) values (1,1),(2,2)";
+
+                cache.query(new SqlFieldsQuery(dml)).getAll();
+
+                tx.commit();
+            }
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                String dml1 = "MERGE INTO Integer (_key, _val) values (3,3)";
+
+                cache.query(new SqlFieldsQuery(dml1)).getAll();
+
+                String dml2 = "DELETE FROM Integer WHERE _key = 2";
+
+                cache.query(new SqlFieldsQuery(dml2)).getAll();
+
+                String dml3 = "UPDATE Integer SET _val = 10 WHERE _key = 1";
+
+                cache.query(new SqlFieldsQuery(dml3)).getAll();
+
+                tx.commit();
+            }
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                String dml = "INSERT INTO Integer (_key, _val) values (4,4),(5,5)";
+
+                cache.query(new SqlFieldsQuery(dml)).getAll();
+
+                tx.rollback();
+            }
+
+            assert latch.await(LATCH_TIMEOUT, MILLISECONDS);
+
+            assertEquals(3, map.size());
+
+            List<Integer> vals = map.get(1);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(1, (int)vals.get(0));
+            assertEquals(10, (int)vals.get(1));
+
+            vals = map.get(2);
+
+            assertNotNull(vals);
+            assertEquals(2, vals.size());
+            assertEquals(2, (int)vals.get(0));
+            assertEquals(2, (int)vals.get(1));
+
+            vals = map.get(3);
+
+            assertNotNull(vals);
+            assertEquals(1, vals.size());
+            assertEquals(3, (int)vals.get(0));
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCachingMaxSize() throws Exception {
+        Ignite node = startGrids(1);
+
+        final IgniteCache cache = node.createCache(
+            cacheConfiguration(cacheMode(), FULL_SYNC, 1, 2)
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setIndexedTypes(Integer.class, Integer.class));
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() {
+            @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
+                // No-op.
+            }
+        });
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                try (QueryCursor<Cache.Entry<Integer, Integer>> ignored = cache.query(qry)) {
+                    try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        for (int i = 0; i < TX_SIZE_THRESHOLD + 1; i++)
+                            cache.query(new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values (" + i + ", 1)")).getAll();
+
+                        tx.commit();
+                    }
+                }
+
+                return null;
+            }
+        },  CacheException.class, "Failed to run update. Transaction is too large. Consider reducing transaction size");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java
new file mode 100644
index 0000000..33e0960
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClientReconnectContinuousQueryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest;
+
+/**
+ *
+ */
+public class CacheMvccClientReconnectContinuousQueryTest extends ClientReconnectContinuousQueryTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java
new file mode 100644
index 0000000..3a598a2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryBackupQueueTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryBackupQueueTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousQueryBackupQueueTest extends IgniteCacheContinuousQueryBackupQueueTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
new file mode 100644
index 0000000..4c4c95b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientReconnectTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
+/**
+ * Mvcc CQ client reconnect test.
+ */
+public class CacheMvccContinuousQueryClientReconnectTest  extends IgniteCacheContinuousQueryClientReconnectTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicMode() {
+        return TRANSACTIONAL_SNAPSHOT;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java
new file mode 100644
index 0000000..5c6c7a8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryClientTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+
+/**
+ * Mvcc CQ client test.
+ */
+public class CacheMvccContinuousQueryClientTest extends IgniteCacheContinuousQueryClientTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java
new file mode 100644
index 0000000..bef9c70
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryImmutableEntryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryImmutableEntryTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousQueryImmutableEntryTest extends IgniteCacheContinuousQueryImmutableEntryTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java
new file mode 100644
index 0000000..714e834
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryMultiNodesFilteringTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryMultiNodesFilteringTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousQueryMultiNodesFilteringTest extends GridCacheContinuousQueryMultiNodesFilteringTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java
new file mode 100644
index 0000000..80b039d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedSelfTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for partitioned cache.
+ */
+public class CacheMvccContinuousQueryPartitionedSelfTest extends CacheMvccAbstractContinuousQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java
new file mode 100644
index 0000000..795932e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryPartitionedTxOneNodeTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
+
+/**
+ * Mvcc continuous query test for one node.
+ */
+public class CacheMvccContinuousQueryPartitionedTxOneNodeTest extends GridCacheContinuousQueryReplicatedTxOneNodeTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java
new file mode 100644
index 0000000..c9adbf9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedSelfTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for replicated cache.
+ */
+public class CacheMvccContinuousQueryReplicatedSelfTest extends CacheMvccAbstractContinuousQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java
new file mode 100644
index 0000000..d522ee9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousQueryReplicatedTxOneNodeTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedTxOneNodeTest;
+
+/**
+ * Mvcc continuous query test for one node.
+ */
+public class CacheMvccContinuousQueryReplicatedTxOneNodeTest extends GridCacheContinuousQueryReplicatedTxOneNodeTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java
new file mode 100644
index 0000000..a3ea0e8
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerClientSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerClientSelfTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousWithTransformerClientSelfTest extends CacheContinuousWithTransformerClientSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testExpired() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java
new file mode 100644
index 0000000..d029143
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerPartitionedSelfTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousWithTransformerPartitionedSelfTest extends CacheContinuousWithTransformerReplicatedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testExpired() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java
new file mode 100644
index 0000000..a294e17
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccContinuousWithTransformerReplicatedSelfTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousWithTransformerReplicatedSelfTest;
+
+/**
+ *
+ */
+public class CacheMvccContinuousWithTransformerReplicatedSelfTest
+    extends CacheContinuousWithTransformerReplicatedSelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void testExpired() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-7311");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java
new file mode 100644
index 0000000..cef553e
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryPartitionedSelfTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for partitioned SQL cache.
+ */
+public class CacheMvccSqlContinuousQueryPartitionedSelfTest extends CacheMvccAbstractSqlContinuousQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java
new file mode 100644
index 0000000..948e6e1
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlContinuousQueryReplicatedSelfTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * Mvcc continuous query test for replicated SQL cache.
+ */
+public class CacheMvccSqlContinuousQueryReplicatedSelfTest extends CacheMvccAbstractSqlContinuousQuerySelfTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/51a202a4/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index b5cb3e0..ce2a130 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -20,7 +20,21 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.index.MvccEmptyTransactionSelfTest;
 import org.apache.ignite.internal.processors.cache.index.SqlTransactionsCommandsWithMvccEnabledSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBasicContinuousQueryTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBulkLoadTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccClientReconnectContinuousQueryTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryBackupQueueTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryClientReconnectTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryClientTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryImmutableEntryTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryMultiNodesFilteringTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryPartitionedTxOneNodeTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryReplicatedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousQueryReplicatedTxOneNodeTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousWithTransformerClientSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousWithTransformerPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccContinuousWithTransformerReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccDmlSimpleTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccIteratorWithConcurrentJdbcTransactionTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest;
@@ -40,8 +54,10 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccScanQueryWithCo
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSizeWithConcurrentJdbcTransactionTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlConfigurationValidationTest;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlContinuousQueryPartitionedSelfTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlContinuousQueryReplicatedSelfTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxNodeMappingTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest;
@@ -102,6 +118,28 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
         suite.addTestSuite(CacheMvccPartitionedSqlCoordinatorFailoverTest.class);
         suite.addTestSuite(CacheMvccReplicatedSqlCoordinatorFailoverTest.class);
 
+        // Continuous queries.
+        suite.addTestSuite(CacheMvccBasicContinuousQueryTest.class);
+        suite.addTestSuite(CacheMvccContinuousQueryPartitionedSelfTest.class);
+        suite.addTestSuite(CacheMvccContinuousQueryReplicatedSelfTest.class);
+        suite.addTestSuite(CacheMvccSqlContinuousQueryPartitionedSelfTest.class);
+        suite.addTestSuite(CacheMvccSqlContinuousQueryReplicatedSelfTest.class);
+
+        suite.addTestSuite(CacheMvccContinuousQueryPartitionedTxOneNodeTest.class);
+        suite.addTestSuite(CacheMvccContinuousQueryReplicatedTxOneNodeTest.class);
+
+        suite.addTestSuite(CacheMvccContinuousQueryClientReconnectTest.class);
+        suite.addTestSuite(CacheMvccContinuousQueryClientTest.class);
+
+        suite.addTestSuite(CacheMvccContinuousQueryMultiNodesFilteringTest.class);
+        suite.addTestSuite(CacheMvccContinuousQueryBackupQueueTest.class);
+        suite.addTestSuite(CacheMvccContinuousQueryImmutableEntryTest.class);
+        suite.addTestSuite(CacheMvccClientReconnectContinuousQueryTest.class);
+
+        suite.addTestSuite(CacheMvccContinuousWithTransformerClientSelfTest.class);
+        suite.addTestSuite(CacheMvccContinuousWithTransformerPartitionedSelfTest.class);
+        suite.addTestSuite(CacheMvccContinuousWithTransformerReplicatedSelfTest.class);
+
         return suite;
     }
 }