You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/23 13:52:09 UTC

[07/19] ignite git commit: IGNITE-426 temp commit.

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
new file mode 100644
index 0000000..ed856a5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -0,0 +1,1104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.T3;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int BACKUP_ACK_THRESHOLD = 100;
+
+    /** */
+    private static volatile boolean err;
+
+    /** */
+    private boolean client;
+
+    /** */
+    private int backups = 1;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        TestCommunicationSpi commSpi = new TestCommunicationSpi();
+
+        commSpi.setIdleConnectionTimeout(100);
+
+        cfg.setCommunicationSpi(commSpi);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(cacheMode());
+        ccfg.setAtomicityMode(atomicityMode());
+        ccfg.setAtomicWriteOrderMode(writeOrderMode());
+        ccfg.setBackups(backups);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 5 * 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        err = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @return Cache mode.
+     */
+    protected abstract CacheMode cacheMode();
+
+    /**
+     * @return Atomicity mode.
+     */
+    protected abstract CacheAtomicityMode atomicityMode();
+
+    /**
+     * @return Write order mode for atomic cache.
+     */
+    protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return CLOCK;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRebalanceVersion() throws Exception {
+        Ignite ignite0 = startGrid(0);
+        GridDhtPartitionTopology top0 = ((IgniteKernal)ignite0).context().cache().context().cacheContext(1).topology();
+
+        assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(1)));
+        assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(2)));
+
+        Ignite ignite1 = startGrid(1);
+        GridDhtPartitionTopology top1 = ((IgniteKernal)ignite1).context().cache().context().cacheContext(1).topology();
+
+        waitRebalanceFinished(ignite0, 2);
+        waitRebalanceFinished(ignite1, 2);
+
+        assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(3)));
+        assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(3)));
+
+        Ignite ignite2 = startGrid(2);
+        GridDhtPartitionTopology top2 = ((IgniteKernal)ignite2).context().cache().context().cacheContext(1).topology();
+
+        waitRebalanceFinished(ignite0, 3);
+        waitRebalanceFinished(ignite1, 3);
+        waitRebalanceFinished(ignite2, 3);
+
+        assertFalse(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertFalse(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertFalse(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+        client = true;
+
+        Ignite ignite3 = startGrid(3);
+        GridDhtPartitionTopology top3 = ((IgniteKernal)ignite3).context().cache().context().cacheContext(1).topology();
+
+        assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertTrue(top1.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(4)));
+        assertTrue(top3.rebalanceFinished(new AffinityTopologyVersion(4)));
+
+        stopGrid(1);
+
+        waitRebalanceFinished(ignite0, 5);
+        waitRebalanceFinished(ignite2, 5);
+        waitRebalanceFinished(ignite3, 5);
+
+        stopGrid(3);
+
+        assertTrue(top0.rebalanceFinished(new AffinityTopologyVersion(6)));
+        assertTrue(top2.rebalanceFinished(new AffinityTopologyVersion(6)));
+
+        stopGrid(0);
+
+        waitRebalanceFinished(ignite2, 7);
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @param topVer Topology version.
+     * @throws Exception If failed.
+     */
+    private void waitRebalanceFinished(Ignite ignite, long topVer) throws Exception {
+        final AffinityTopologyVersion topVer0 = new AffinityTopologyVersion(topVer);
+
+        final GridDhtPartitionTopology top =
+            ((IgniteKernal)ignite).context().cache().context().cacheContext(1).topology();
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return top.rebalanceFinished(topVer0);
+            }
+        }, 5000);
+
+        assertTrue(top.rebalanceFinished(topVer0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOneBackup() throws Exception {
+        checkBackupQueue(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOneBackupClientUpdate() throws Exception {
+        checkBackupQueue(1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testThreeBackups() throws Exception {
+        if (cacheMode() == REPLICATED)
+            return;
+
+        checkBackupQueue(3, false);
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param updateFromClient If {@code true} executes cache update from client node.
+     * @throws Exception If failed.
+     */
+    private void checkBackupQueue(int backups, boolean updateFromClient) throws Exception {
+        this.backups = backups;
+
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+        if (cacheMode() != REPLICATED)
+            assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
+
+        Affinity<Object> aff = qryClient.affinity(null);
+
+        CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = qryClientCache.query(qry);
+
+        int PARTS = 10;
+
+        Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+        List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+        for (int i = 0; i < SRV_NODES - 1; i++) {
+            log.info("Stop iteration: " + i);
+
+            TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+            Ignite ignite = ignite(i);
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+
+            List<Integer> keys = testKeys(cache, PARTS);
+
+            CountDownLatch latch = new CountDownLatch(keys.size());
+
+            lsnr.latch = latch;
+
+            boolean first = true;
+
+            for (Integer key : keys) {
+                log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
+
+                T2<Object, Object> t = updates.get(key);
+
+                if (t == null) {
+                    updates.put(key, new T2<>((Object)key, null));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, null));
+                }
+                else {
+                    updates.put(key, new T2<>((Object)key, (Object)key));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+                }
+
+                if (updateFromClient)
+                    qryClientCache.put(key, key);
+                else
+                    cache.put(key, key);
+
+                if (first) {
+                    spi.skipMsg = true;
+
+                    first = false;
+                }
+            }
+
+            stopGrid(i);
+
+            if (!latch.await(5, SECONDS)) {
+                Set<Integer> keys0 = new HashSet<>(keys);
+
+                keys0.removeAll(lsnr.keys);
+
+                log.info("Missed events for keys: " + keys0);
+
+                fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+            }
+
+            checkEvents(expEvts, lsnr);
+        }
+
+        for (int i = 0; i < SRV_NODES - 1; i++) {
+            log.info("Start iteration: " + i);
+
+            Ignite ignite = startGrid(i);
+
+            IgniteCache<Object, Object> cache = ignite.cache(null);
+
+            List<Integer> keys = testKeys(cache, PARTS);
+
+            CountDownLatch latch = new CountDownLatch(keys.size());
+
+            lsnr.latch = latch;
+
+            for (Integer key : keys) {
+                log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key) + ']');
+
+                T2<Object, Object> t = updates.get(key);
+
+                if (t == null) {
+                    updates.put(key, new T2<>((Object)key, null));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, null));
+                }
+                else {
+                    updates.put(key, new T2<>((Object)key, (Object)key));
+
+                    expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+                }
+
+                if (updateFromClient)
+                    qryClientCache.put(key, key);
+                else
+                    cache.put(key, key);
+            }
+
+            if (!latch.await(5, SECONDS)) {
+                Set<Integer> keys0 = new HashSet<>(keys);
+
+                keys0.removeAll(lsnr.keys);
+
+                log.info("Missed events for keys: " + keys0);
+
+                fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+            }
+
+            checkEvents(expEvts, lsnr);
+        }
+
+        cur.close();
+
+        assertFalse("Unexpected error during test, see log for details.", err);
+    }
+
+    /**
+     * @param expEvts Expected events.
+     * @param lsnr Listener.
+     */
+    private void checkEvents(List<T3<Object, Object, Object>> expEvts, CacheEventListener1 lsnr) {
+        for (T3<Object, Object, Object> exp : expEvts) {
+            CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
+
+            assertNotNull("No event for key: " + exp.get1(), e);
+            assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+            assertEquals("Unexpected old value: " + e, exp.get3(), e.getOldValue());
+        }
+
+        expEvts.clear();
+
+        lsnr.evts.clear();
+    }
+
+    /**
+     * @param cache Cache.
+     * @param parts Number of partitions.
+     * @return Keys.
+     */
+    private List<Integer> testKeys(IgniteCache<Object, Object> cache, int parts) {
+        Ignite ignite = cache.unwrap(Ignite.class);
+
+        List<Integer> res = new ArrayList<>();
+
+        Affinity<Object> aff = ignite.affinity(cache.getName());
+
+        ClusterNode node = ignite.cluster().localNode();
+
+        int[] nodeParts = aff.primaryPartitions(node);
+
+        final int KEYS_PER_PART = 3;
+
+        for (int i = 0; i < parts; i++) {
+            int part = nodeParts[i];
+
+            int cnt = 0;
+
+            for (int key = 0; key < 100_000; key++) {
+                if (aff.partition(key) == part && aff.isPrimary(node, key)) {
+                    res.add(key);
+
+                    if (++cnt == KEYS_PER_PART)
+                        break;
+                }
+            }
+
+            assertEquals(KEYS_PER_PART, cnt);
+        }
+
+        assertEquals(parts * KEYS_PER_PART, res.size());
+
+        return res;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupQueueCleanupClientQuery() throws Exception {
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        Ignite qryClient = startGrid(2);
+
+        CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = qryClient.cache(null).query(qry);
+
+        final Collection<Object> backupQueue = backupQueue(ignite(1));
+
+        assertEquals(0, backupQueue.size());
+
+        IgniteCache<Object, Object> cache0 = ignite(0).cache(null);
+
+        List<Integer> keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD);
+
+        CountDownLatch latch = new CountDownLatch(keys.size());
+
+        lsnr.latch = latch;
+
+        for (Integer key : keys) {
+            log.info("Put: " + key);
+
+            cache0.put(key, key);
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, 2000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+        if (!latch.await(5, SECONDS))
+            fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+        keys = primaryKeys(cache0, BACKUP_ACK_THRESHOLD / 2);
+
+        latch = new CountDownLatch(keys.size());
+
+        lsnr.latch = latch;
+
+        for (Integer key : keys)
+            cache0.put(key, key);
+
+        final long ACK_FREQ = 5000;
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, ACK_FREQ + 2000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.isEmpty());
+
+        if (!latch.await(5, SECONDS))
+            fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+        cur.close();
+
+        assertFalse("Unexpected error during test, see log for details.", err);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testBackupQueueCleanupServerQuery() throws Exception {
+        Ignite qryClient = startGridsMultiThreaded(2);
+
+        CacheEventListener1 lsnr = new CacheEventListener1(false);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+        QueryCursor<?> cur = cache.query(qry);
+
+        final Collection<Object> backupQueue = backupQueue(ignite(1));
+
+        assertEquals(0, backupQueue.size());
+
+        List<Integer> keys = primaryKeys(cache, BACKUP_ACK_THRESHOLD);
+
+        CountDownLatch latch = new CountDownLatch(keys.size());
+
+        lsnr.latch = latch;
+
+        for (Integer key : keys) {
+            log.info("Put: " + key);
+
+            cache.put(key, key);
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return backupQueue.isEmpty();
+            }
+        }, 3000);
+
+        assertTrue("Backup queue is not cleared: " + backupQueue, backupQueue.size() < BACKUP_ACK_THRESHOLD);
+
+        if (!latch.await(5, SECONDS))
+            fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + lsnr.latch.getCount() + ']');
+
+        cur.close();
+    }
+
+    /**
+     * @param ignite Ignite.
+     * @return Backup queue for test query.
+     */
+    private Collection<Object> backupQueue(Ignite ignite) {
+        GridContinuousProcessor proc = ((IgniteKernal)ignite).context().continuous();
+
+        ConcurrentMap<Object, Object> infos = GridTestUtils.getFieldValue(proc, "rmtInfos");
+
+        Collection<Object> backupQueue = null;
+
+        for (Object info : infos.values()) {
+            GridContinuousHandler hnd = GridTestUtils.getFieldValue(info, "hnd");
+
+            if (hnd.isForQuery() && hnd.cacheName() == null) {
+                backupQueue = GridTestUtils.getFieldValue(hnd, "backupQueue");
+
+                break;
+            }
+        }
+
+        assertNotNull(backupQueue);
+
+        return backupQueue;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailover() throws Exception {
+        final int SRV_NODES = 4;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        client = false;
+
+        IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+        final CacheEventListener2 lsnr = new CacheEventListener2();
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = qryClientCache.query(qry);
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        final AtomicReference<CountDownLatch> checkLatch = new AtomicReference<>();
+
+        IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                final int idx = SRV_NODES + 1;
+
+                while (!stop.get() && !err) {
+                    log.info("Start node: " + idx);
+
+                    startGrid(idx);
+
+                    Thread.sleep(3000);
+
+                    log.info("Stop node: " + idx);
+
+                    stopGrid(idx);
+
+                    CountDownLatch latch = new CountDownLatch(1);
+
+                    assertTrue(checkLatch.compareAndSet(null, latch));
+
+                    if (!stop.get()) {
+                        log.info("Wait for event check.");
+
+                        assertTrue(latch.await(1, MINUTES));
+                    }
+                }
+
+                return null;
+            }
+        });
+
+        final Map<Integer, Integer> vals = new HashMap<>();
+
+        final Map<Integer, List<T2<Integer, Integer>>> expEvts = new HashMap<>();
+
+        try {
+            long stopTime = System.currentTimeMillis() + 3 * 60_000;
+
+            final int PARTS = qryClient.affinity(null).partitions();
+
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            while (System.currentTimeMillis() < stopTime) {
+                Integer key = rnd.nextInt(PARTS);
+
+                Integer prevVal = vals.get(key);
+                Integer val = vals.get(key);
+
+                if (val == null)
+                    val = 0;
+                else
+                    val = val + 1;
+
+                qryClientCache.put(key, val);
+
+                vals.put(key, val);
+
+                List<T2<Integer, Integer>> keyEvts = expEvts.get(key);
+
+                if (keyEvts == null) {
+                    keyEvts = new ArrayList<>();
+
+                    expEvts.put(key, keyEvts);
+                }
+
+                keyEvts.add(new T2<>(val, prevVal));
+
+                CountDownLatch latch = checkLatch.get();
+
+                if (latch != null) {
+                    log.info("Check events.");
+
+                    checkLatch.set(null);
+
+                    boolean success = false;
+
+                    try {
+                        if (err)
+                            break;
+
+                        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                            @Override public boolean apply() {
+                                return checkEvents(false, expEvts, lsnr);
+                            }
+                        }, 10_000);
+
+                        if (!check)
+                            assertTrue(checkEvents(true, expEvts, lsnr));
+
+                        success = true;
+
+                        log.info("Events checked.");
+                    }
+                    finally {
+                        if (!success)
+                            err = true;
+
+                        latch.countDown();
+                    }
+                }
+            }
+        }
+        finally {
+            stop.set(true);
+        }
+
+        CountDownLatch latch = checkLatch.get();
+
+        if (latch != null)
+            latch.countDown();
+
+        restartFut.get();
+
+        boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return checkEvents(false, expEvts, lsnr);
+            }
+        }, 10_000);
+
+        if (!check)
+            assertTrue(checkEvents(true, expEvts, lsnr));
+
+        cur.close();
+
+        assertFalse("Unexpected error during test, see log for details.", err);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultiThreaded() throws Exception {
+        final int SRV_NODES = 3;
+
+        startGridsMultiThreaded(SRV_NODES);
+
+        client = true;
+
+        Ignite qryClient = startGrid(SRV_NODES);
+
+        final IgniteCache<Object, Object> cache = qryClient.cache(null);
+
+        CacheEventListener1 lsnr = new CacheEventListener1(true);
+
+        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+        qry.setLocalListener(lsnr);
+
+        QueryCursor<?> cur = cache.query(qry);
+
+        client = false;
+
+        final int SRV_IDX = SRV_NODES - 1;
+
+        List<Integer> keys = primaryKeys(ignite(SRV_IDX).cache(null), 10);
+
+        final int THREADS = 10;
+
+        for (int i = 0; i < keys.size(); i++) {
+            log.info("Iteration: " + i);
+
+            Ignite srv = ignite(SRV_IDX);
+
+            TestCommunicationSpi spi = (TestCommunicationSpi)srv.configuration().getCommunicationSpi();
+
+            spi.sndFirstOnly = new AtomicBoolean(false);
+
+            final Integer key = keys.get(i);
+
+            final AtomicInteger val = new AtomicInteger();
+
+            CountDownLatch latch = new CountDownLatch(THREADS);
+
+            lsnr.latch = latch;
+
+            IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    Integer val0 = val.getAndIncrement();
+
+                    cache.put(key, val0);
+
+                    return null;
+                }
+            }, THREADS, "update-thread");
+
+            fut.get();
+
+            stopGrid(SRV_IDX);
+
+            if (!latch.await(5, SECONDS))
+                fail("Failed to wait for notifications [exp=" + THREADS + ", left=" + lsnr.latch.getCount() + ']');
+
+            assertEquals(THREADS, lsnr.allEvts.size());
+
+            Set<Integer> vals = new HashSet<>();
+
+            boolean err = false;
+
+            for (CacheEntryEvent<?, ?> evt : lsnr.allEvts) {
+                assertEquals(key, evt.getKey());
+                assertNotNull(evt.getValue());
+
+                if (!vals.add((Integer)evt.getValue())) {
+                    err = true;
+
+                    log.info("Extra event: " + evt);
+                }
+            }
+
+            for (int v = 0; v < THREADS; v++) {
+                if (!vals.contains(v)) {
+                    err = true;
+
+                    log.info("Event for value not received: " + v);
+                }
+            }
+
+            assertFalse("Invalid events, see log for details.", err);
+
+            lsnr.allEvts.clear();
+
+            startGrid(SRV_IDX);
+        }
+
+        cur.close();
+    }
+
+    /**
+     * @param logAll If {@code true} logs all unexpected values.
+     * @param expEvts Expected values.
+     * @param lsnr Listener.
+     * @return Check status.
+     */
+    @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+    private boolean checkEvents(boolean logAll,
+        Map<Integer, List<T2<Integer, Integer>>> expEvts,
+        CacheEventListener2 lsnr) {
+        assertTrue(!expEvts.isEmpty());
+
+        boolean pass = true;
+
+        for (Map.Entry<Integer, List<T2<Integer, Integer>>> e : expEvts.entrySet()) {
+            Integer key = e.getKey();
+            List<T2<Integer, Integer>> exp = e.getValue();
+
+            List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(key);
+
+            if (rcvdEvts == null) {
+                pass = false;
+
+                log.info("No events for key [key=" + key + ", exp=" + e.getValue() + ']');
+
+                if (!logAll)
+                    return false;
+            }
+            else {
+                synchronized (rcvdEvts) {
+                    if (rcvdEvts.size() != exp.size()) {
+                        pass = false;
+
+                        log.info("Missed or extra events for key [key=" + key +
+                            ", exp=" + e.getValue() +
+                            ", rcvd=" + rcvdEvts + ']');
+
+                        if (!logAll)
+                            return false;
+                    }
+
+                    int cnt = Math.min(rcvdEvts.size(), exp.size());
+
+                    for (int i = 0; i < cnt; i++) {
+                        T2<Integer, Integer> expEvt = exp.get(i);
+                        CacheEntryEvent<?, ?> rcvdEvt = rcvdEvts.get(i);
+
+                        assertEquals(key, rcvdEvt.getKey());
+                        assertEquals(expEvt.get1(), rcvdEvt.getValue());
+                        assertEquals(expEvt.get2(), rcvdEvt.getOldValue());
+                    }
+                }
+            }
+        }
+
+        if (pass) {
+            expEvts.clear();
+            lsnr.evts.clear();
+        }
+
+        return pass;
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> {
+        /** */
+        private volatile CountDownLatch latch;
+
+        /** */
+        private GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
+
+        /** */
+        private ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+        /** */
+        private List<CacheEntryEvent<?, ?>> allEvts;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /**
+         * @param saveAll Save all events flag.
+         */
+        CacheEventListener1(boolean saveAll) {
+            if (saveAll)
+                allEvts = new ArrayList<>();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) {
+            try {
+                for (CacheEntryEvent<?, ?> evt : evts) {
+                    CountDownLatch latch = this.latch;
+
+                    log.info("Received cache event [evt=" + evt +
+                        ", left=" + (latch != null ? latch.getCount() : null) + ']');
+
+                    this.evts.put(evt.getKey(), evt);
+
+                    keys.add((Integer) evt.getKey());
+
+                    if (allEvts != null)
+                        allEvts.add(evt);
+
+                    assertTrue(latch != null);
+                    assertTrue(latch.getCount() > 0);
+
+                    latch.countDown();
+
+                    if (latch.getCount() == 0) {
+                        this.latch = null;
+
+                        keys.clear();
+                    }
+                }
+            }
+            catch (Throwable e) {
+                err = true;
+
+                log.error("Unexpected error", e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private final ConcurrentHashMap<Integer, Integer> vals = new ConcurrentHashMap<>();
+
+        /** */
+        private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
+            throws CacheEntryListenerException  {
+            try {
+                for (CacheEntryEvent<?, ?> evt : evts) {
+                    Integer key = (Integer)evt.getKey();
+                    Integer val = (Integer)evt.getValue();
+
+                    assertNotNull(key);
+                    assertNotNull(val);
+
+                    Integer prevVal = vals.get(key);
+
+                    boolean dup = false;
+
+                    if (prevVal != null) {
+                        if (prevVal.equals(val)) // Can get this event with automatic put retry.
+                            dup = true;
+                        else {
+                            assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
+                            assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
+                        }
+                    }
+                    else {
+                        assertEquals("Unexpected event: " + evt, (Object)0, val);
+                        assertNull("Unexpected event: " + evt, evt.getOldValue());
+                    }
+
+                    if (!dup) {
+                        vals.put(key, val);
+
+                        List<CacheEntryEvent<?, ?>> keyEvts = this.evts.get(key);
+
+                        if (keyEvts == null) {
+                            keyEvts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>());
+
+                            this.evts.put(key, keyEvts);
+                        }
+
+                        keyEvts.add(evt);
+                    }
+                }
+            }
+            catch (Throwable e) {
+                err = true;
+
+                log.error("Unexpected error", e);
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private volatile boolean skipMsg;
+
+        /** */
+        private volatile AtomicBoolean sndFirstOnly;
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
+            throws IgniteSpiException {
+            Object msg0 = ((GridIoMessage)msg).message();
+
+            if (msg0 instanceof GridContinuousMessage) {
+                if (skipMsg) {
+                    log.info("Skip continuous message: " + msg0);
+
+                    return;
+                }
+                else {
+                    AtomicBoolean sndFirstOnly = this.sndFirstOnly;
+
+                    if (sndFirstOnly != null && !sndFirstOnly.compareAndSet(false, true)) {
+                        log.info("Skip continuous message: " + msg0);
+
+                        return;
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
new file mode 100644
index 0000000..4ddcf0d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderTest extends CacheContinuousQueryFailoverAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return PRIMARY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
new file mode 100644
index 0000000..8fc58d3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicReplicatedTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicReplicatedTest extends CacheContinuousQueryFailoverAtomicTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicWriteOrderMode writeOrderMode() {
+        return PRIMARY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
new file mode 100644
index 0000000..fb50387
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverAtomicTest extends CacheContinuousQueryFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
new file mode 100644
index 0000000..746f0eb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxReplicatedTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxReplicatedTest extends CacheContinuousQueryFailoverTxTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java
new file mode 100644
index 0000000..8e3a575
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class CacheContinuousQueryFailoverTxTest extends CacheContinuousQueryFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+}