You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/13 14:48:49 UTC

incubator-ignite git commit: # ignite-426 add tests

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-426 1e8d8aed4 -> eee8ea416


# ignite-426 add tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/eee8ea41
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/eee8ea41
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/eee8ea41

Branch: refs/heads/ignite-426
Commit: eee8ea4169047a2788aa7674aa9ff65a063b3897
Parents: 1e8d8ae
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 13 11:57:36 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 13 15:46:08 2015 +0300

----------------------------------------------------------------------
 .../distributed/dht/GridDhtLocalPartition.java  |   6 +-
 .../continuous/CacheContinuousQueryHandler.java | 147 +++---
 .../continuous/CacheContinuousQueryManager.java |  36 +-
 ...acheContinuousQueryFailoverAbstractTest.java | 457 +++++++++++++++++--
 ...ueryFailoverAtomicPrimaryWriteOrderTest.java |  32 ++
 ...inuousQueryFailoverAtomicReplicatedTest.java |  32 ++
 ...ContinuousQueryFailoverTxReplicatedTest.java |  32 ++
 .../IgniteCacheQuerySelfTestSuite.java          |   3 +
 8 files changed, 617 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index b7d4375..a0a75c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -592,11 +592,7 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
      * @return Next update index.
      */
     public long nextContinuousQueryUpdateIndex() {
-        long res = contQryUpdIdx.incrementAndGet();
-
-        log.info("Next update index [node=" + cctx.gridName() + ", part=" + id + ", idx=" + res + ']');
-
-        return res;
+        return contQryUpdIdx.incrementAndGet();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 4bd2e1c..98e857b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.query.*;
 import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -39,6 +40,7 @@ import javax.cache.event.*;
 import javax.cache.event.EventType;
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.*;
 
 import static org.apache.ignite.events.EventType.*;
 
@@ -50,7 +52,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     private static final long serialVersionUID = 0L;
 
     /** */
-    private static final int BACKUP_ACK_THRESHOLD = 1;
+    private static final int BACKUP_ACK_THRESHOLD = 100;
 
     /** Cache name. */
     private String cacheName;
@@ -95,10 +97,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     private transient Map<Integer, Long> rcvCntrs;
 
     /** */
-    private transient DuplicateEventFilter dupEvtFilter = new DuplicateEventFilter();
+    private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
 
     /** */
-    private transient AcknowledgeData ackData = new AcknowledgeData();
+    private transient AcknowledgeBuffer ackBuf;
 
     /**
      * Required by {@link Externalizable}.
@@ -121,6 +123,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
      * @param ignoreExpired Ignore expired events flag.
      * @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
      * @param taskHash Task name hash code.
+     * @param locCache {@code True} if local cache.
      */
     public CacheContinuousQueryHandler(
         String cacheName,
@@ -133,7 +136,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         boolean sync,
         boolean ignoreExpired,
         int taskHash,
-        boolean skipPrimaryCheck) {
+        boolean skipPrimaryCheck,
+        boolean locCache) {
         assert topic != null;
         assert locLsnr != null;
 
@@ -149,7 +153,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         this.taskHash = taskHash;
         this.skipPrimaryCheck = skipPrimaryCheck;
 
-        this.rcvCntrs = new HashMap<>();
+        if (locCache)
+            dupEvtFilter = F.alwaysTrue();
+        else {
+            rcvCntrs = new ConcurrentHashMap<>();
+
+            dupEvtFilter = new DuplicateEventFilter();
+        }
     }
 
     /** {@inheritDoc} */
@@ -187,8 +197,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
         backupQueue = new ConcurrentLinkedDeque8<>();
 
+        ackBuf = new AcknowledgeBuffer();
+
         final boolean loc = nodeId.equals(ctx.localNodeId());
 
+        assert !skipPrimaryCheck || loc;
+
         CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
             @Override public void onExecution() {
                 if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -217,8 +231,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
                 GridCacheContext<K, V> cctx = cacheContext(ctx);
 
-                if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
-                    return;
+                // skipPrimaryCheck is set only when listen locally for replicated cache events.
+                assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
 
                 boolean notify = true;
 
@@ -232,30 +246,36 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 }
 
                 if (notify) {
-                    if (loc && dupEvtFilter.apply(evt.entry()))
-                        locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
-                    else {
-                        try {
-                            final CacheContinuousQueryEntry entry = evt.entry();
+                    try {
+                        final CacheContinuousQueryEntry entry = evt.entry();
+
+                        if (primary || skipPrimaryCheck) {
+                            if (loc) {
+                                if (dupEvtFilter.apply(entry)) {
+                                    locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
 
-                            if (primary) {
+                                    if (!skipPrimaryCheck)
+                                        sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+                                }
+                            }
+                            else {
                                 prepareEntry(cctx, nodeId, entry);
 
                                 ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
                             }
-                            else
-                                backupQueue.add(entry);
                         }
-                        catch (ClusterTopologyCheckedException ex) {
-                            IgniteLogger log = ctx.log(getClass());
+                        else
+                            backupQueue.add(entry);
+                    }
+                    catch (ClusterTopologyCheckedException ex) {
+                        IgniteLogger log = ctx.log(getClass());
 
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send event notification to node, node left cluster " +
-                                    "[node=" + nodeId + ", err=" + ex + ']');
-                        }
-                        catch (IgniteCheckedException ex) {
-                            U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
-                        }
+                        if (log.isDebugEnabled())
+                            log.debug("Failed to send event notification to node, node left cluster " +
+                                "[node=" + nodeId + ", err=" + ex + ']');
+                    }
+                    catch (IgniteCheckedException ex) {
+                        U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
                     }
 
                     if (recordIgniteEvt) {
@@ -292,28 +312,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
                 while (it.hasNext()) {
                     CacheContinuousQueryEntry backupEntry = it.next();
 
-                    assert backupEntry != null;
-
                     Long updateIdx = updateIdxs.get(backupEntry.partition());
 
-                    if (updateIdx != null) {
-                        assert backupEntry.updateIndex() <= updateIdx;
-
+                    if (updateIdx != null && backupEntry.updateIndex() <= updateIdx)
                         it.remove();
-
-                        if (backupEntry.updateIndex() == updateIdx) {
-                            updateIdxs.remove(backupEntry.partition());
-
-                            if (updateIdxs.isEmpty())
-                                break;
-                        }
-                    }
                 }
             }
 
             @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
-                ctx.log(getClass()).info("Flush backup queue [topVer=" + topVer + ", queue=" + backupQueue + ']');
-
                 if (backupQueue.isEmpty())
                     return;
 
@@ -333,7 +339,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
             }
 
             @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
-                sendBackupAcknowledge(ackData.acknowledgeOnTimeout(), routineId, ctx);
+                sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
             @Override public void onPartitionEvicted(int part) {
@@ -469,30 +475,15 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
             if (e.updateIndex() > cntr0) {
                 // TODO IGNITE-426: remove assert.
-                if (e.updateIndex() != cntr0 + 1)
-                    System.out.println(Thread.currentThread().getName() +
-                        "Invalid entry [part=" + e.partition() + ", cntr=" + cntr + ", e=" + e + ']');
-
                 assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']';
 
-                System.out.println(Thread.currentThread().getName() +
-                    " update cntr [part=" + part + ", idx=" + e.updateIndex() + ", e=" + e + ']');
-
                 rcvCntrs.put(part, e.updateIndex());
             }
-            else {
-                System.out.println(Thread.currentThread().getName() +
-                    " ignore entry [cntr=" + cntr0 + ", idx=" + e.updateIndex() + ", e=" + e + ']');
-
+            else
                 return false;
-            }
         }
-        else {
-            System.out.println(Thread.currentThread().getName() +
-                " update cntr [part=" + part + ", idx=" + e.updateIndex() + ", e=" + e + ']');
-
+        else
             rcvCntrs.put(part, e.updateIndex());
-        }
 
         return true;
     }
@@ -525,7 +516,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     @Override public void onBatchAcknowledged(final UUID routineId,
         GridContinuousBatch batch,
         final GridKernalContext ctx) {
-        sendBackupAcknowledge(ackData.onAcknowledged(batch), routineId, ctx);
+        sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
     }
 
     /**
@@ -651,14 +642,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     }
 
     /** */
-    private static class AcknowledgeData {
+    private static class AcknowledgeBuffer {
         /** */
         private int size;
 
         /** */
+        @GridToStringInclude
         private Map<Integer, Long> updateIdxs = new HashMap<>();
 
         /** */
+        @GridToStringInclude
         private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
 
         /**
@@ -668,24 +661,42 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
         @SuppressWarnings("unchecked")
         @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
         onAcknowledged(GridContinuousBatch batch) {
-            // TODO: IGNITE-426: check if it is called from nio thread in correct order.
             size += batch.size();
 
             Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
 
-            for (CacheContinuousQueryEntry e : entries) {
-                topVers.add(e.topologyVersion());
+            for (CacheContinuousQueryEntry e : entries)
+                addEntry(e);
 
-                Long cntr0 = updateIdxs.get(e.partition());
+            return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+        }
 
-                if (cntr0 == null || e.updateIndex() > cntr0)
-                    updateIdxs.put(e.partition(), e.updateIndex());
-            }
+        /**
+         * @param e Entry.
+         * @return Non-null tuple if acknowledge should be sent to backups.
+         */
+        @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+        onAcknowledged(CacheContinuousQueryEntry e) {
+            size++;
+
+            addEntry(e);
 
             return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
         }
 
         /**
+         * @param e Entry.
+         */
+        private void addEntry(CacheContinuousQueryEntry e) {
+            topVers.add(e.topologyVersion());
+
+            Long cntr0 = updateIdxs.get(e.partition());
+
+            if (cntr0 == null || e.updateIndex() > cntr0)
+                updateIdxs.put(e.partition(), e.updateIndex());
+        }
+
+        /**
          * @return Non-null tuple if acknowledge should be sent to backups.
          */
         @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
@@ -697,6 +708,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
          * @return Tuple with acknowledge information.
          */
         private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+            assert size > 0;
+
             Map<Integer, Long> idxs = new HashMap<>(updateIdxs);
 
             IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
@@ -711,7 +724,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(AcknowledgeData.class, this);
+            return S.toString(AcknowledgeBuffer.class, this);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 9eae419..cc8f77b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -58,6 +58,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
     /** */
     private static final byte EXPIRED_FLAG = 0b1000;
 
+    /** */
+    private static final long BACKUP_ACK_FREQ = 5000;
+
     /** Listeners. */
     private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
 
@@ -94,6 +97,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                         lsnr.cleanupBackupQueue(msg.updateIndexes());
                 }
             });
+
+        cctx.time().schedule(new Runnable() {
+            @Override public void run() {
+                for (CacheContinuousQueryListener lsnr : lsnrs.values())
+                    lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+
+                for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+                    lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+            }
+        }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
     }
 
     /** {@inheritDoc} */
@@ -166,11 +179,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
         if (!hasNewVal && !hasOldVal)
             return;
 
-        GridDhtLocalPartition locPart = cctx.topology().localPartition(e.partition(), topVer, false);
+        long updateIdx;
 
-        assert locPart != null;
+        if (!cctx.isLocal()) {
+            GridDhtLocalPartition locPart = cctx.topology().localPartition(e.partition(), topVer, false);
 
-        long updateIdx = locPart.nextContinuousQueryUpdateIndex();
+            assert locPart != null;
+
+            updateIdx = locPart.nextContinuousQueryUpdateIndex();
+        }
+        else
+            updateIdx = 0;
 
         EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED;
 
@@ -206,12 +225,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
                 updateIdx,
                 topVer);
 
-            log.info("Created entry [node=" + cctx.gridName() +
-                ", primary=" + primary +
-                ", preload=" + preload +
-                ", part=" + e.partition() +
-                ", idx=" + updateIdx + ']');
-
             CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                 cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
@@ -405,7 +418,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             lsnr.onPartitionEvicted(part);
     }
 
-
     /**
      * @param locLsnr Local listener.
      * @param rmtFilter Remote filter.
@@ -480,7 +492,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
             sync,
             ignoreExpired,
             taskNameHash,
-            skipPrimaryCheck);
+            skipPrimaryCheck,
+            cctx.isLocal());
 
         UUID id = cctx.kernalContext().continuous().startRoutine(hnd, bufSize, timeInterval,
             autoUnsubscribe, grp.predicate()).get();
@@ -702,6 +715,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
 
         /**
          * @param impl Listener.
+         * @param log Logger.
          */
         JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) {
             assert impl != null;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 1eb3ae7..3afb01c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -23,8 +23,10 @@ import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.cache.query.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.extensions.communication.*;
 import org.apache.ignite.resources.*;
@@ -33,13 +35,17 @@ import org.apache.ignite.spi.communication.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 
 import javax.cache.event.*;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
 
 import static java.util.concurrent.TimeUnit.*;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
 
 /**
@@ -47,7 +53,10 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
  */
 public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest {
     /** */
-    protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int BACKUP_ACK_THRESHOLD = 100;
 
     /** */
     private static volatile boolean err;
@@ -62,6 +71,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
         TestCommunicationSpi commSpi = new TestCommunicationSpi();
 
         commSpi.setIdleConnectionTimeout(100);
@@ -72,6 +84,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         ccfg.setCacheMode(cacheMode());
         ccfg.setAtomicityMode(atomicityMode());
+        ccfg.setAtomicWriteOrderMode(writeOrderMode());
         ccfg.setBackups(backups);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
 
@@ -83,6 +96,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     }
 
     /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
@@ -107,6 +125,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     protected abstract CacheAtomicityMode atomicityMode();
 
     /**
+     * @return Write order mode for atomic cache.
+     */
+    protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return CLOCK;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testOneBackup() throws Exception {
@@ -117,6 +142,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
      * @throws Exception If failed.
      */
     public void testThreeBackups() throws Exception {
+        if (cacheMode() == REPLICATED)
+            return;
+
         checkBackupQueue(3);
     }
 
@@ -129,7 +157,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         final int SRV_NODES = 4;
 
-        startGrids(SRV_NODES);
+        startGridsMultiThreaded(SRV_NODES);
 
         client = true;
 
@@ -139,16 +167,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
 
-        assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
+        if (cacheMode() != REPLICATED)
+            assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
 
         Affinity<Object> aff = qryClient.affinity(null);
 
-        CacheEventListener lsnr = new CacheEventListener();
+        CacheEventListener1 lsnr = new CacheEventListener1();
 
         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
-        qry.setAutoUnsubscribe(true);
-
         qry.setLocalListener(lsnr);
 
         QueryCursor<?> cur = qryClientCache.query(qry);
@@ -188,8 +215,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
             if (!latch.await(5, SECONDS))
                 fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
-
-            awaitPartitionMapExchange();
         }
 
         for (int i = 0; i < SRV_NODES - 1; i++) {
@@ -197,8 +222,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
             Ignite ignite = startGrid(i);
 
-            awaitPartitionMapExchange();
-
             IgniteCache<Object, Object> cache = ignite.cache(null);
 
             List<Integer> keys = testKeys(cache, PARTS);
@@ -219,7 +242,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
 
         cur.close();
 
-        assertFalse(err);
+        assertFalse("Unexpected error during test, see log for details.", err);
     }
 
     /**
@@ -265,83 +288,427 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
     /**
      * @throws Exception If failed.
      */
-    public void testBackupQueueCleanup() throws Exception {
-        startGrids(2);
+    public void testBackupQueueCleanupClientQuery() throws Exception {
+        startGridsMultiThreaded(2);
 
-        Ignite ignite0 = ignite(0);
-        Ignite ignite1 = ignite(1);
+        client = true;
 
-        CacheEventListener lsnr = new CacheEventListener();
+        Ignite qryClient = startGrid(2);
 
-        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+        CacheEventListener1 lsnr = new CacheEventListener1();
 
-        qry.setAutoUnsubscribe(true);
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
 
         qry.setLocalListener(lsnr);
 
-        QueryCursor<?> cur = ignite1.cache(null).query(qry);
+        QueryCursor<?> cur = qryClient.cache(null).query(qry);
+
+        final Collection<Object> backupQueue = backupQueue(ignite(1));
 
-        IgniteCache<Object, Object> cache0 = ignite0.cache(null);
+        assertEquals(0, backupQueue.size());
 
-        final int KEYS = 1;
+        IgniteCache<Object, Object> cache0 = ignite(0).cache(null);
 
-        List<Integer> keys = primaryKeys(cache0, KEYS);
+        List<Integer> keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD);
 
         CountDownLatch latch = new CountDownLatch(keys.size());
 
         lsnr.latch = latch;
 
+        for (Integer key : keys) {
+            log.info("Put: " + key);
+
+            cache0.put(key, key);
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, 2000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
+        if (!latch.await(5, SECONDS))
+            fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+        keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD / 2);
+
+        latch = new CountDownLatch(keys.size());
+
+        lsnr.latch = latch;
+
         for (Integer key : keys)
             cache0.put(key, key);
 
+        final long ACK_FREQ = 5000;
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, ACK_FREQ + 2000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
         if (!latch.await(5, SECONDS))
             fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
 
-        Thread.sleep(10_000);
+        cur.close();
+
+        assertFalse("Unexpected error during test, see log for details.", err);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupQueueCleanupServerQuery() throws Exception {
+        Ignite qryClient = startGridsMultiThreaded(2);
+
+        CacheEventListener1 lsnr = new CacheEventListener1();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+        QueryCursor<?> cur = cache.query(qry);
+
+        final Collection<Object> backupQueue = backupQueue(ignite(1));
+
+        assertEquals(0, backupQueue.size());
+
+        List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD);
+
+        CountDownLatch latch = new CountDownLatch(keys.size());
+
+        lsnr.latch = latch;
+
+        for (Integer key : keys) {
+            log.info("Put: " + key);
+
+            cache.put(key, key);
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, 2000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
+        if (!latch.await(5, SECONDS))
+            fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+        cur.close();
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @return Backup queue for test query.
+     */
+    private Collection<Object> backupQueue(Ignite ignite) {
+        GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous();
+
+        ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos");
+
+        Collection<Object> backupQueue = null;
+
+        for (Object info : infos.values()) {
+            GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
+
+            if (hnd.isForQuery() && hnd.cacheName() == null) {
+                backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+
+                break;
+            }
+        }
+
+        assertNotNull(backupQueue);
+
+        return backupQueue;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void _testFailover() throws Exception {
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+        final CacheEventListener2 lsnr = new CacheEventListener2();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = qryClientCache.query(qry);
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
+
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                final int idx = SRV_NODES + 1;
+
+                while (!stop.get() && !err) {
+                    log.info("Start node: " + idx);
+
+                    startGrid(idx);
+
+                    Thread.sleep(2000);
+
+                    log.info("Stop node: " + idx);
+
+                    stopGrid(idx);
+
+                    Thread.sleep(1000);
+
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    assertTrue(checkLatch.compareAndSet(null, latch));
+
+                    if (!stop.get()) {
+                        log.info("Wait for event check.");
+
+                        assertTrue(latch.await(1, MINUTES));
+                    }
+                }
+
+                return null;
+            }
+        });
+
+        final Map<Integer, Integer> vals = new HashMap<>();
+
+        try {
+            long stopTime = System.currentTimeMillis() + 3 * 60_000;
+
+            final int PARTS = qryClient.affinity(null).partitions();
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            while (System.currentTimeMillis() < stopTime) {
+                Integer key = rnd.nextInt(PARTS);
+
+                Integer val = vals.get(key);
+
+                if (val == null)
+                    val = 0;
+                else
+                    val = val + 1;
+
+                qryClientCache.put(key, val);
+
+                vals.put(key, val);
+
+                CountDownLatch latch = checkLatch.get();
+
+                if (latch != null) {
+                    log.info("Check events.");
+
+                    checkLatch.set(null);
+
+                    boolean success = false;
+
+                    try {
+                        if (err)
+                            break;
+
+                        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                return checkEvents(false, vals, lsnr);
+                            }
+                        }, 10_000);
+
+                        if (!check)
+                            assertTrue(checkEvents(true, vals, lsnr));
+
+                        success = true;
+
+                        log.info("Events checked.");
+                    }
+                    finally {
+                        if (!success)
+                            err = true;
+
+                        latch.countDown();
+                    }
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        CountDownLatch latch = checkLatch.get();
+
+        if (latch != null)
+            latch.countDown();
+
+        restartFut.get();
+
+        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return checkEvents(false, vals, lsnr);
+            }
+        }, 10_000);
+
+        if (!check)
+            assertTrue(checkEvents(true, vals, lsnr));
 
         cur.close();
+
+        assertFalse("Unexpected error during test, see log for details.", err);
+    }
+
+    /**
+     * @param logAll If {@code true} logs all unexpected values.
+     * @param vals Expected values.
+     * @param lsnr Listener.
+     * @return Check status.
+     */
+    private boolean checkEvents(boolean logAll, Map<Integer, Integer> vals, CacheEventListener2 lsnr) {
+        assertTrue(!vals.isEmpty());
+
+        ConcurrentHashMap<Integer, Integer> lsnrVals = lsnr.vals;
+
+        ConcurrentHashMap<Integer, Integer> lsnrCntrs = lsnr.cntrs;
+
+        boolean pass = true;
+
+        for (Map.Entry<Integer, Integer> e : vals.entrySet()) {
+            Integer key = e.getKey();
+
+            Integer lsnrVal = lsnrVals.get(key);
+            Integer expVal = e.getValue();
+
+            if (!expVal.equals(lsnrVal)) {
+                pass = false;
+
+                log.info("Unexpected value [key=" + key + ", val=" + lsnrVal + ", expVal=" + expVal + ']');
+
+                if (!logAll)
+                    return false;
+            }
+
+            Integer lsnrCntr = lsnrCntrs.get(key);
+            Integer expCntr = expVal + 1;
+
+            if (!expCntr.equals(lsnrCntr)) {
+                pass = false;
+
+                log.info("Unexpected events count [key=" + key + ", val=" + lsnrCntr + ", expVal=" + expCntr + ']');
+
+                if (!logAll)
+                    return false;
+            }
+        }
+
+        return pass;
     }
 
     /**
      *
      */
-    private static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
+    private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> {
         /** */
         private volatile CountDownLatch latch;
 
         /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        @LoggerResource
+        private IgniteLogger log;
 
         /** {@inheritDoc} */
         @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
-            for (CacheEntryEvent<?, ?> evt : evts) {
-                ignite.log().info("Received cache event: " + evt);
+            try {
+                for (CacheEntryEvent<?, ?> evt : evts) {
+                    CountDownLatch latch = this.latch;
 
-                CountDownLatch latch = this.latch;
+                    log.info("Received cache event: " + evt + " " + (latch != null ? latch.getCount() : null));
 
-                testAssert(evt, latch != null);
-                testAssert(evt, latch.getCount() > 0);
+                    assertTrue(latch != null);
+                    assertTrue(latch.getCount() > 0);
 
-                latch.countDown();
+                    latch.countDown();
 
-                if (latch.getCount() == 0)
-                    this.latch = null;
+                    if (latch.getCount() == 0)
+                        this.latch = null;
+                }
+            }
+            catch (Throwable e) {
+                err = true;
+
+                log.error("Unexpected error", e);
             }
         }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
 
-        /**
-         * @param evt Event.
-         * @param cond Condition to check.
-         */
-        private void testAssert(CacheEntryEvent evt, boolean cond) {
-            if (!cond) {
-                ignite.log().info("Unexpected event: " + evt);
+        /** */
+        private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>();
 
-                err = true;
+        /** */
+        private final ConcurrentHashMap<Integer, Integer> cntrs = new ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
+            throws CacheEntryListenerException  {
+            try {
+                for (CacheEntryEvent<?, ?> evt : evts) {
+                    Integer key = (Integer)evt.getKey();
+                    Integer val = (Integer)evt.getValue();
+
+                    assertNotNull(key);
+                    assertNotNull(val);
+
+                    Integer prevVal = vals.get(key);
+
+                    if (prevVal != null) {
+                        assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
+                        assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
+                    }
+                    else {
+                        assertEquals("Unexpected event: " + evt, (Object)0, val);
+                        assertNull("Unexpected event: " + evt, evt.getOldValue());
+                    }
+
+                    vals.put(key, val);
+
+                    Integer cntr = cntrs.get(key);
+
+                    if (cntr == null)
+                        cntr = 1;
+                    else
+                        cntr = cntr + 1;
+
+                    cntrs.put(key, cntr);
+                }
             }
+            catch (Throwable e) {
+                err = true;
 
-            assertTrue(cond);
+                log.error("Unexpected error", e);
+            }
         }
     }
 
@@ -357,7 +724,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
         private volatile boolean skipMsg;
 
         /** {@inheritDoc} */
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
             throws IgniteSpiException {
             if (skipMsg && msg instanceof GridIoMessage) {
                 Object msg0 = ((GridIoMessage)msg).message();
@@ -369,7 +736,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
                 }
             }
 
-            super.sendMessage(node, msg, ackClosure);
+            super.sendMessage(node, msg, ackC);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
new file mode 100644
index 0000000..6515b21
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return PRIMARY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
new file mode 100644
index 0000000..c8209d9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheContinuousQueryFailoverAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
new file mode 100644
index 0000000..3ac0c64
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxReplicatedTest extends CacheContinuousQueryFailoverTxTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/eee8ea41/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 2240226..8a7039e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -99,7 +99,10 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class);
         suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverAtomicTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedTest.class);
         suite.addTestSuite(CacheContinuousQueryFailoverTxTest.class);
+        suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedTest.class);
 
         // Reduce fields queries.
         suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);