You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/16 12:21:47 UTC

[1/3] ignite git commit: ignite-3478 Tests restructured

Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 e2853a2f5 -> b69f62eb6


ignite-3478 Tests restructured


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1fadab5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1fadab5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1fadab5a

Branch: refs/heads/ignite-3478
Commit: 1fadab5a069f5b7f4de49f39df63cb8ba6d6b5b1
Parents: 3f33d6a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 16 15:18:16 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 16 15:18:16 2017 +0300

----------------------------------------------------------------------
 .../cache/mvcc/CacheMvccAbstractTest.java       | 841 +++++++++++++++++++
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 763 +----------------
 2 files changed, 842 insertions(+), 762 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1fadab5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
new file mode 100644
index 0000000..3954bff
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    static final int DFLT_PARTITION_COUNT = RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
+
+    /** */
+    static final String CRD_ATTR = "testCrd";
+
+    /** */
+    static final long DFLT_TEST_TIME = 30_000;
+
+    /** */
+    protected static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
+
+    /** */
+    protected static final int SRVS = 4;
+
+    /** */
+    protected boolean client;
+
+    /** */
+    protected boolean testSpi;
+
+    /** */
+    protected String nodeAttr;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMvccEnabled(true);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        if (testSpi)
+            cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        if (nodeAttr != null)
+            cfg.setUserAttributes(F.asMap(nodeAttr, true));
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        memCfg.setPageSize(PAGE_SIZE);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return DFLT_TEST_TIME + 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        try {
+            verifyCoordinatorInternalState();
+        }
+        finally {
+            stopAllGrids();
+        }
+
+        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
+
+        super.afterTest();
+    }
+
+    /**
+     * @param srvs Number of server nodes.
+     * @param clients Number of client nodes.
+     * @param cacheBackups Number of cache backups.
+     * @param cacheParts Number of cache partitions.
+     * @param withRmvs If {@code true} then in addition to puts tests also executes removes.
+     * @param readMode Read mode.
+     * @throws Exception If failed.
+     */
+    final void accountsTxReadAll(
+        final int srvs,
+        final int clients,
+        int cacheBackups,
+        int cacheParts,
+        final boolean withRmvs,
+        final ReadMode readMode
+    )
+        throws Exception
+    {
+        final int ACCOUNTS = 20;
+
+        final int ACCOUNT_START_VAL = 1000;
+
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
+
+                Map<Integer, MvccTestAccount> accounts = new HashMap<>();
+
+                for (int i = 0; i < ACCOUNTS; i++)
+                    accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1));
+
+                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.putAll(accounts);
+
+                    tx.commit();
+                }
+            }
+        };
+
+        final Set<Integer> rmvdIds = new HashSet<>();
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
+
+                        try {
+                            IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+                            cnt++;
+
+                            Integer id1 = rnd.nextInt(ACCOUNTS);
+                            Integer id2 = rnd.nextInt(ACCOUNTS);
+
+                            while (id1.equals(id2))
+                                id2 = rnd.nextInt(ACCOUNTS);
+
+                            TreeSet<Integer> keys = new TreeSet<>();
+
+                            keys.add(id1);
+                            keys.add(id2);
+
+                            Integer cntr1 = null;
+                            Integer cntr2 = null;
+
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                MvccTestAccount a1;
+                                MvccTestAccount a2;
+
+                                Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys);
+
+                                a1 = accounts.get(id1);
+                                a2 = accounts.get(id2);
+
+                                if (!withRmvs) {
+                                    assertNotNull(a1);
+                                    assertNotNull(a2);
+
+                                    cntr1 = a1.updateCnt + 1;
+                                    cntr2 = a2.updateCnt + 1;
+
+                                    cache.cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
+                                    cache.cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+                                }
+                                else {
+                                    if (a1 != null || a2 != null) {
+                                        if (a1 != null && a2 != null) {
+                                            Integer rmvd = null;
+
+                                            if (rnd.nextInt(10) == 0) {
+                                                synchronized (rmvdIds) {
+                                                    if (rmvdIds.size() < ACCOUNTS / 2) {
+                                                        rmvd = rnd.nextBoolean() ? id1 : id2;
+
+                                                        assertTrue(rmvdIds.add(rmvd));
+                                                    }
+                                                }
+                                            }
+
+                                            if (rmvd != null) {
+                                                cache.cache.remove(rmvd);
+
+                                                cache.cache.put(rmvd.equals(id1) ? id2 : id1,
+                                                    new MvccTestAccount(a1.val + a2.val, 1));
+                                            }
+                                            else {
+                                                cache.cache.put(id1, new MvccTestAccount(a1.val + 1, 1));
+                                                cache.cache.put(id2, new MvccTestAccount(a2.val - 1, 1));
+                                            }
+                                        }
+                                        else {
+                                            if (a1 == null) {
+                                                cache.cache.put(id1, new MvccTestAccount(100, 1));
+                                                cache.cache.put(id2, new MvccTestAccount(a2.val - 100, 1));
+
+                                                assertTrue(rmvdIds.remove(id1));
+                                            }
+                                            else {
+                                                cache.cache.put(id1, new MvccTestAccount(a1.val - 100, 1));
+                                                cache.cache.put(id2, new MvccTestAccount(100, 1));
+
+                                                assertTrue(rmvdIds.remove(id2));
+                                            }
+                                        }
+                                    }
+                                }
+
+                                tx.commit();
+                            }
+
+                            if (!withRmvs) {
+                                Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys);
+
+                                MvccTestAccount a1 = accounts.get(id1);
+                                MvccTestAccount a2 = accounts.get(id2);
+
+                                assertNotNull(a1);
+                                assertNotNull(a2);
+
+                                assertTrue(a1.updateCnt >= cntr1);
+                                assertTrue(a2.updateCnt >= cntr2);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    Map<Integer, Integer> lastUpdateCntrs = new HashMap<>();
+
+                    while (!stop.get()) {
+                        while (keys.size() < ACCOUNTS)
+                            keys.add(rnd.nextInt(ACCOUNTS));
+
+                        TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
+
+                        Map<Integer, MvccTestAccount> accounts;
+
+                        try {
+                            if (readMode == ReadMode.SCAN) {
+                                accounts = new HashMap<>();
+
+                                for (IgniteCache.Entry<Integer, MvccTestAccount> e : cache.cache) {
+                                    MvccTestAccount old = accounts.put(e.getKey(), e.getValue());
+
+                                    assertNull(old);
+                                }
+                            }
+                            else
+                                accounts = cache.cache.getAll(keys);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        if (!withRmvs)
+                            assertEquals(ACCOUNTS, accounts.size());
+
+                        int sum = 0;
+
+                        for (int i = 0; i < ACCOUNTS; i++) {
+                            MvccTestAccount account = accounts.get(i);
+
+                            if (account != null) {
+                                sum += account.val;
+
+                                Integer cntr = lastUpdateCntrs.get(i);
+
+                                if (cntr != null)
+                                    assertTrue(cntr <= account.updateCnt);
+
+                                lastUpdateCntrs.put(i, cntr);
+                            }
+                            else
+                                assertTrue(withRmvs);
+                        }
+
+                        assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                    }
+
+                    if (idx == 0) {
+                        TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
+
+                        Map<Integer, MvccTestAccount> accounts;
+
+                        try {
+                            accounts = cache.cache.getAll(keys);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        int sum = 0;
+
+                        for (int i = 0; i < ACCOUNTS; i++) {
+                            MvccTestAccount account = accounts.get(i);
+
+                            assertTrue(account != null || withRmvs);
+
+                            info("Account [id=" + i + ", val=" + (account != null ? account.val : null) + ']');
+
+                            if (account != null)
+                                sum += account.val;
+                        }
+
+                        info("Sum: " + sum);
+                    }
+                }
+            };
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            cacheBackups,
+            cacheParts,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            init,
+            writer,
+            reader);
+    }
+
+    /**
+     * @param restartMode Restart mode.
+     * @param srvs Number of server nodes.
+     * @param clients Number of client nodes.
+     * @param cacheBackups Number of cache backups.
+     * @param cacheParts Number of cache partitions.
+     * @param time Test time.
+     * @param writers Number of writers.
+     * @param readers Number of readers.
+     * @param init Optional init closure.
+     * @param writer Writers threads closure.
+     * @param reader Readers threads closure.
+     * @throws Exception If failed.
+     */
+    final void readWriteTest(
+        final RestartMode restartMode,
+        final int srvs,
+        final int clients,
+        int cacheBackups,
+        int cacheParts,
+        final int writers,
+        final int readers,
+        final long time,
+        IgniteInClosure<IgniteCache<Object, Object>> init,
+        final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer,
+        final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader) throws Exception {
+        if (restartMode == RestartMode.RESTART_CRD)
+            CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure());
+
+        Ignite srv0 = startGridsMultiThreaded(srvs);
+
+        if (clients > 0) {
+            client = true;
+
+            startGridsMultiThreaded(srvs, clients);
+
+            client = false;
+        }
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
+            FULL_SYNC,
+            cacheBackups,
+            cacheParts);
+
+        if (restartMode == RestartMode.RESTART_CRD)
+            ccfg.setNodeFilter(new CoordinatorNodeFilter());
+
+        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+        int crdIdx = srvs + clients;
+
+        if (restartMode == RestartMode.RESTART_CRD) {
+            nodeAttr = CRD_ATTR;
+
+            startGrid(crdIdx);
+        }
+
+        if (init != null)
+            init.apply(cache);
+
+        final List<TestCache> caches = new ArrayList<>(srvs + clients);
+
+        for (int i = 0; i < srvs + clients; i++) {
+            Ignite node = grid(i);
+
+            caches.add(new TestCache(node.cache(cache.getName())));
+        }
+
+        final long stopTime = U.currentTimeMillis() + time;
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        try {
+            final AtomicInteger writerIdx = new AtomicInteger();
+
+            IgniteInternalFuture<?> writeFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try {
+                        int idx = writerIdx.getAndIncrement();
+
+                        writer.apply(idx, caches, stop);
+                    }
+                    catch (Throwable e) {
+                        if (restartMode != null && X.hasCause(e, ClusterTopologyException.class)) {
+                            log.info("Writer error: " + e);
+
+                            return null;
+                        }
+
+                        error("Unexpected error: " + e, e);
+
+                        stop.set(true);
+
+                        fail("Unexpected error: " + e);
+                    }
+
+                    return null;
+                }
+            }, writers, "writer");
+
+            final AtomicInteger readerIdx = new AtomicInteger();
+
+            IgniteInternalFuture<?> readFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    try {
+                        int idx = readerIdx.getAndIncrement();
+
+                        reader.apply(idx, caches, stop);
+                    }
+                    catch (Throwable e) {
+                        error("Unexpected error: " + e, e);
+
+                        stop.set(true);
+
+                        fail("Unexpected error: " + e);
+                    }
+
+                    return null;
+                }
+            }, readers, "reader");
+
+            while (System.currentTimeMillis() < stopTime && !stop.get()) {
+                Thread.sleep(1000);
+
+                if (restartMode != null) {
+                    switch (restartMode) {
+                        case RESTART_CRD: {
+                            log.info("Start new coordinator: " + (crdIdx + 1));
+
+                            startGrid(crdIdx + 1);
+
+                            log.info("Stop current coordinator: " + crdIdx);
+
+                            stopGrid(crdIdx);
+
+                            crdIdx++;
+
+                            awaitPartitionMapExchange();
+
+                            break;
+                        }
+
+                        case RESTART_RND_SRV: {
+                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                            int idx = rnd.nextInt(srvs);
+
+                            TestCache cache0 = caches.get(idx);
+
+                            cache0.stopLock.writeLock().lock();
+
+                            log.info("Stop node: " + idx);
+
+                            stopGrid(idx);
+
+                            log.info("Start new node: " + idx);
+
+                            Ignite srv = startGrid(idx);
+
+                            synchronized (caches) {
+                                caches.set(idx, new TestCache(srv.cache(DEFAULT_CACHE_NAME)));
+                            }
+
+                            awaitPartitionMapExchange();
+
+                            break;
+                        }
+
+                        default:
+                            fail();
+                    }
+                }
+            }
+
+            stop.set(true);
+
+            writeFut.get();
+            readFut.get();
+        }
+        finally {
+            stop.set(true);
+        }
+    }
+
+    /**
+     * @param cacheMode Cache mode.
+     * @param syncMode Write synchronization mode.
+     * @param backups Number of backups.
+     * @param parts Number of partitions.
+     * @return Cache configuration.
+     */
+    final CacheConfiguration<Object, Object> cacheConfiguration(
+        CacheMode cacheMode,
+        CacheWriteSynchronizationMode syncMode,
+        int backups,
+        int parts) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        ccfg.setCacheMode(cacheMode);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(syncMode);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, parts));
+
+        if (cacheMode == PARTITIONED)
+            ccfg.setBackups(backups);
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    final void verifyCoordinatorInternalState() throws Exception {
+        for (Ignite node : G.allGrids()) {
+            final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators();
+
+            Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs");
+
+            assertTrue("Txs on node [node=" + node.name() + ", txs=" + activeTxs.toString() + ']',
+                activeTxs.isEmpty());
+
+            Map cntrFuts = GridTestUtils.getFieldValue(crd, "verFuts");
+
+            assertTrue(cntrFuts.isEmpty());
+
+            Map ackFuts = GridTestUtils.getFieldValue(crd, "ackFuts");
+
+            assertTrue(ackFuts.isEmpty());
+
+            // TODO IGNITE-3478
+            // checkActiveQueriesCleanup(node);
+        }
+    }
+
+    /**
+     * @param node Node.
+     * @throws Exception If failed.
+     */
+    final void checkActiveQueriesCleanup(Ignite node) throws Exception {
+        final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators();
+
+        assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition(
+            new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    Object activeQueries = GridTestUtils.getFieldValue(crd, "activeQueries");
+
+                    synchronized (activeQueries) {
+                        Long minQry = GridTestUtils.getFieldValue(activeQueries, "minQry");
+
+                        if (minQry != null)
+                            log.info("Min query: " + minQry);
+
+                        Map<Object, Map> queriesMap = GridTestUtils.getFieldValue(activeQueries, "activeQueries");
+
+                        boolean empty = true;
+
+                        for (Map.Entry<Object, Map> e : queriesMap.entrySet()) {
+                            if (!e.getValue().isEmpty()) {
+                                empty = false;
+
+                                log.info("Active queries: " + e);
+                            }
+                        }
+
+                        return empty && minQry == null;
+                    }
+                }
+            }, 8_000)
+        );
+
+        assertTrue("Previous coordinator queries not empty: " + node.name(), GridTestUtils.waitForCondition(
+            new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries");
+                    Boolean prevDone = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone");
+
+                    if (!queries.isEmpty() || !prevDone)
+                        log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']');
+
+                    return queries.isEmpty();
+                }
+            }, 8_000)
+        );
+    }
+
+    /**
+     * @param caches Caches.
+     * @param rnd Random.
+     * @return Random cache.
+     */
+    static <K, V> TestCache<K, V> randomCache(
+        List<TestCache> caches,
+        ThreadLocalRandom rnd) {
+        synchronized (caches) {
+            if (caches.size() == 1)
+                return caches.get(0);
+
+            for (;;) {
+                int idx = rnd.nextInt(caches.size());
+
+                TestCache testCache = caches.get(idx);
+
+                if (testCache.readLock())
+                    return testCache;
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    static class MvccTestAccount {
+        /** */
+        final int val;
+
+        /** */
+        final int updateCnt;
+
+        /**
+         * @param val Value.
+         * @param updateCnt Updates counter.
+         */
+        MvccTestAccount(int val, int updateCnt) {
+            assert updateCnt > 0;
+
+            this.val = val;
+            this.updateCnt = updateCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MvccTestAccount.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    enum ReadMode {
+        /** */
+        GET_ALL,
+
+        /** */
+        SCAN
+    }
+
+    /**
+     *
+     */
+    enum RestartMode {
+        /**
+         * Dedicated coordinator node is restarted during test.
+         */
+        RESTART_CRD,
+
+        /** */
+        RESTART_RND_SRV
+    }
+
+    /**
+     *
+     */
+    static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public boolean apply(ClusterNode node) {
+            return node.attribute(CRD_ATTR) == null;
+        }
+    }
+
+    /**
+     *
+     */
+    static class CoordinatorAssignClosure implements IgniteClosure<Collection<ClusterNode>, ClusterNode> {
+        /** {@inheritDoc} */
+        @Override public ClusterNode apply(Collection<ClusterNode> clusterNodes) {
+            for (ClusterNode node : clusterNodes) {
+                if (node.attribute(CRD_ATTR) != null) {
+                    assert !CU.clientNode(node) : node;
+
+                    return node;
+                }
+            }
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    static class TestCache<K, V> {
+        /** */
+        final IgniteCache<K, V> cache;
+
+        /** Locks node to avoid node restart while test operation is in progress. */
+        final ReadWriteLock stopLock = new ReentrantReadWriteLock();
+
+        /**
+         * @param cache Cache.
+         */
+        TestCache(IgniteCache cache) {
+            this.cache = cache;
+        }
+
+        /**
+         * @return {@code True} if locked.
+         */
+        boolean readLock() {
+            return stopLock.readLock().tryLock();
+        }
+
+        /**
+         *
+         */
+        void readUnlock() {
+            stopLock.readLock().unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fadab5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 9da6876..3bfbb93 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -110,87 +110,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
  * TODO IGNITE-3478: add check for cleanup in all test (at the and do update for all keys, check there are 2 versions left).
  */
 @SuppressWarnings("unchecked")
-public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
-    /** */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** */
-    private static final int DFLT_PARTITION_COUNT = RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
-
-    /** */
-    private static final String CRD_ATTR = "testCrd";
-
-    /** */
-    private static final long DFLT_TEST_TIME = 30_000;
-
-    /** */
-    private static final int SRVS = 4;
-
-    /** */
-    private boolean client;
-
-    /** */
-    private boolean testSpi;
-
-    /** */
-    private String nodeAttr;
-
-    /** */
-    private static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setMvccEnabled(true);
-
-        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
-
-        if (testSpi)
-            cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
-
-        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
-
-        cfg.setClientMode(client);
-
-        if (nodeAttr != null)
-            cfg.setUserAttributes(F.asMap(nodeAttr, true));
-
-        MemoryConfiguration memCfg = new MemoryConfiguration();
-
-        memCfg.setPageSize(PAGE_SIZE);
-
-        cfg.setMemoryConfiguration(memCfg);
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected long getTestTimeout() {
-        return DFLT_TEST_TIME + 60_000;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        super.beforeTest();
-
-        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
-        try {
-            verifyCoordinatorInternalState();
-        }
-        finally {
-            stopAllGrids();
-        }
-
-        CacheCoordinatorsProcessor.coordinatorAssignClosure(null);
-
-        super.afterTest();
-    }
-
+public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
@@ -1612,271 +1532,6 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param srvs Number of server nodes.
-     * @param clients Number of client nodes.
-     * @param cacheBackups Number of cache backups.
-     * @param cacheParts Number of cache partitions.
-     * @param withRmvs If {@code true} then in addition to puts tests also executes removes.
-     * @param readMode Read mode.
-     * @throws Exception If failed.
-     */
-    private void accountsTxReadAll(
-        final int srvs,
-        final int clients,
-        int cacheBackups,
-        int cacheParts,
-        final boolean withRmvs,
-        final ReadMode readMode
-    )
-        throws Exception
-    {
-        final int ACCOUNTS = 20;
-
-        final int ACCOUNT_START_VAL = 1000;
-
-        final int writers = 4;
-
-        final int readers = 4;
-
-        final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
-            @Override public void apply(IgniteCache<Object, Object> cache) {
-                final IgniteTransactions txs = cache.unwrap(Ignite.class).transactions();
-
-                Map<Integer, MvccTestAccount> accounts = new HashMap<>();
-
-                for (int i = 0; i < ACCOUNTS; i++)
-                    accounts.put(i, new MvccTestAccount(ACCOUNT_START_VAL, 1));
-
-                try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                    cache.putAll(accounts);
-
-                    tx.commit();
-                }
-            }
-        };
-
-        final Set<Integer> rmvdIds = new HashSet<>();
-
-        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
-            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
-                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-                    int cnt = 0;
-
-                    while (!stop.get()) {
-                        TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
-
-                        try {
-                            IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
-
-                            cnt++;
-
-                            Integer id1 = rnd.nextInt(ACCOUNTS);
-                            Integer id2 = rnd.nextInt(ACCOUNTS);
-
-                            while (id1.equals(id2))
-                                id2 = rnd.nextInt(ACCOUNTS);
-
-                            TreeSet<Integer> keys = new TreeSet<>();
-
-                            keys.add(id1);
-                            keys.add(id2);
-
-                            Integer cntr1 = null;
-                            Integer cntr2 = null;
-
-                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
-                                MvccTestAccount a1;
-                                MvccTestAccount a2;
-
-                                Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys);
-
-                                a1 = accounts.get(id1);
-                                a2 = accounts.get(id2);
-
-                                if (!withRmvs) {
-                                    assertNotNull(a1);
-                                    assertNotNull(a2);
-
-                                    cntr1 = a1.updateCnt + 1;
-                                    cntr2 = a2.updateCnt + 1;
-
-                                    cache.cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
-                                    cache.cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
-                                }
-                                else {
-                                    if (a1 != null || a2 != null) {
-                                        if (a1 != null && a2 != null) {
-                                            Integer rmvd = null;
-
-                                            if (rnd.nextInt(10) == 0) {
-                                                synchronized (rmvdIds) {
-                                                    if (rmvdIds.size() < ACCOUNTS / 2) {
-                                                        rmvd = rnd.nextBoolean() ? id1 : id2;
-
-                                                        assertTrue(rmvdIds.add(rmvd));
-                                                    }
-                                                }
-                                            }
-
-                                            if (rmvd != null) {
-                                                cache.cache.remove(rmvd);
-
-                                                cache.cache.put(rmvd.equals(id1) ? id2 : id1,
-                                                    new MvccTestAccount(a1.val + a2.val, 1));
-                                            }
-                                            else {
-                                                cache.cache.put(id1, new MvccTestAccount(a1.val + 1, 1));
-                                                cache.cache.put(id2, new MvccTestAccount(a2.val - 1, 1));
-                                            }
-                                        }
-                                        else {
-                                            if (a1 == null) {
-                                                cache.cache.put(id1, new MvccTestAccount(100, 1));
-                                                cache.cache.put(id2, new MvccTestAccount(a2.val - 100, 1));
-
-                                                assertTrue(rmvdIds.remove(id1));
-                                            }
-                                            else {
-                                                cache.cache.put(id1, new MvccTestAccount(a1.val - 100, 1));
-                                                cache.cache.put(id2, new MvccTestAccount(100, 1));
-
-                                                assertTrue(rmvdIds.remove(id2));
-                                            }
-                                        }
-                                    }
-                                }
-
-                                tx.commit();
-                            }
-
-                            if (!withRmvs) {
-                                Map<Integer, MvccTestAccount> accounts = cache.cache.getAll(keys);
-
-                                MvccTestAccount a1 = accounts.get(id1);
-                                MvccTestAccount a2 = accounts.get(id2);
-
-                                assertNotNull(a1);
-                                assertNotNull(a2);
-
-                                assertTrue(a1.updateCnt >= cntr1);
-                                assertTrue(a2.updateCnt >= cntr2);
-                            }
-                        }
-                        finally {
-                            cache.readUnlock();
-                        }
-                    }
-
-                    info("Writer finished, updates: " + cnt);
-                }
-            };
-
-        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
-            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
-                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
-                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-                    Set<Integer> keys = new LinkedHashSet<>();
-
-                    Map<Integer, Integer> lastUpdateCntrs = new HashMap<>();
-
-                    while (!stop.get()) {
-                        while (keys.size() < ACCOUNTS)
-                            keys.add(rnd.nextInt(ACCOUNTS));
-
-                        TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
-
-                        Map<Integer, MvccTestAccount> accounts;
-
-                        try {
-                            if (readMode == ReadMode.SCAN) {
-                                accounts = new HashMap<>();
-
-                                for (IgniteCache.Entry<Integer, MvccTestAccount> e : cache.cache) {
-                                    MvccTestAccount old = accounts.put(e.getKey(), e.getValue());
-
-                                    assertNull(old);
-                                }
-                            }
-                            else
-                                accounts = cache.cache.getAll(keys);
-                        }
-                        finally {
-                            cache.readUnlock();
-                        }
-
-                        if (!withRmvs)
-                            assertEquals(ACCOUNTS, accounts.size());
-
-                        int sum = 0;
-
-                        for (int i = 0; i < ACCOUNTS; i++) {
-                            MvccTestAccount account = accounts.get(i);
-
-                            if (account != null) {
-                                sum += account.val;
-
-                                Integer cntr = lastUpdateCntrs.get(i);
-
-                                if (cntr != null)
-                                    assertTrue(cntr <= account.updateCnt);
-
-                                lastUpdateCntrs.put(i, cntr);
-                            }
-                            else
-                                assertTrue(withRmvs);
-                        }
-
-                        assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
-                    }
-
-                    if (idx == 0) {
-                        TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
-
-                        Map<Integer, MvccTestAccount> accounts;
-
-                        try {
-                            accounts = cache.cache.getAll(keys);
-                        }
-                        finally {
-                            cache.readUnlock();
-                        }
-
-                        int sum = 0;
-
-                        for (int i = 0; i < ACCOUNTS; i++) {
-                            MvccTestAccount account = accounts.get(i);
-
-                            assertTrue(account != null || withRmvs);
-
-                            info("Account [id=" + i + ", val=" + (account != null ? account.val : null) + ']');
-
-                            if (account != null)
-                                sum += account.val;
-                        }
-
-                        info("Sum: " + sum);
-                    }
-                }
-            };
-
-        readWriteTest(
-            null,
-            srvs,
-            clients,
-            cacheBackups,
-            cacheParts,
-            writers,
-            readers,
-            DFLT_TEST_TIME,
-            init,
-            writer,
-            reader);
-    }
-
-    /**
      * @throws Exception If failed.
      */
     public void testPessimisticTxReadsSnapshot_SingleNode_SinglePartition() throws Exception {
@@ -3800,189 +3455,6 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param restartMode Restart mode.
-     * @param srvs Number of server nodes.
-     * @param clients Number of client nodes.
-     * @param cacheBackups Number of cache backups.
-     * @param cacheParts Number of cache partitions.
-     * @param time Test time.
-     * @param writers Number of writers.
-     * @param readers Number of readers.
-     * @param init Optional init closure.
-     * @param writer Writers threads closure.
-     * @param reader Readers threads closure.
-     * @throws Exception If failed.
-     */
-    private void readWriteTest(
-        final RestartMode restartMode,
-        final int srvs,
-        final int clients,
-        int cacheBackups,
-        int cacheParts,
-        final int writers,
-        final int readers,
-        final long time,
-        IgniteInClosure<IgniteCache<Object, Object>> init,
-        final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer,
-        final GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader) throws Exception {
-        if (restartMode == RestartMode.RESTART_CRD)
-            CacheCoordinatorsProcessor.coordinatorAssignClosure(new CoordinatorAssignClosure());
-
-        Ignite srv0 = startGridsMultiThreaded(srvs);
-
-        if (clients > 0) {
-            client = true;
-
-            startGridsMultiThreaded(srvs, clients);
-
-            client = false;
-        }
-
-        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED,
-            FULL_SYNC,
-            cacheBackups,
-            cacheParts);
-
-        if (restartMode == RestartMode.RESTART_CRD)
-            ccfg.setNodeFilter(new CoordinatorNodeFilter());
-
-        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
-
-        int crdIdx = srvs + clients;
-
-        if (restartMode == RestartMode.RESTART_CRD) {
-            nodeAttr = CRD_ATTR;
-
-            startGrid(crdIdx);
-        }
-
-        if (init != null)
-            init.apply(cache);
-
-        final List<TestCache> caches = new ArrayList<>(srvs + clients);
-
-        for (int i = 0; i < srvs + clients; i++) {
-            Ignite node = grid(i);
-
-            caches.add(new TestCache(node.cache(cache.getName())));
-        }
-
-        final long stopTime = U.currentTimeMillis() + time;
-
-        final AtomicBoolean stop = new AtomicBoolean();
-
-        try {
-            final AtomicInteger writerIdx = new AtomicInteger();
-
-            IgniteInternalFuture<?> writeFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-                @Override public Void call() throws Exception {
-                    try {
-                        int idx = writerIdx.getAndIncrement();
-
-                        writer.apply(idx, caches, stop);
-                    }
-                    catch (Throwable e) {
-                        if (restartMode != null && X.hasCause(e, ClusterTopologyException.class)) {
-                            log.info("Writer error: " + e);
-
-                            return null;
-                        }
-
-                        error("Unexpected error: " + e, e);
-
-                        stop.set(true);
-
-                        fail("Unexpected error: " + e);
-                    }
-
-                    return null;
-                }
-            }, writers, "writer");
-
-            final AtomicInteger readerIdx = new AtomicInteger();
-
-            IgniteInternalFuture<?> readFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
-                @Override public Void call() throws Exception {
-                    try {
-                        int idx = readerIdx.getAndIncrement();
-
-                        reader.apply(idx, caches, stop);
-                    }
-                    catch (Throwable e) {
-                        error("Unexpected error: " + e, e);
-
-                        stop.set(true);
-
-                        fail("Unexpected error: " + e);
-                    }
-
-                    return null;
-                }
-            }, readers, "reader");
-
-            while (System.currentTimeMillis() < stopTime && !stop.get()) {
-                Thread.sleep(1000);
-
-                if (restartMode != null) {
-                    switch (restartMode) {
-                        case RESTART_CRD: {
-                            log.info("Start new coordinator: " + (crdIdx + 1));
-
-                            startGrid(crdIdx + 1);
-
-                            log.info("Stop current coordinator: " + crdIdx);
-
-                            stopGrid(crdIdx);
-
-                            crdIdx++;
-
-                            awaitPartitionMapExchange();
-
-                            break;
-                        }
-
-                        case RESTART_RND_SRV: {
-                            ThreadLocalRandom rnd = ThreadLocalRandom.current();
-
-                            int idx = rnd.nextInt(srvs);
-
-                            TestCache cache0 = caches.get(idx);
-
-                            cache0.stopLock.writeLock().lock();
-
-                            log.info("Stop node: " + idx);
-
-                            stopGrid(idx);
-
-                            log.info("Start new node: " + idx);
-
-                            Ignite srv = startGrid(idx);
-
-                            synchronized (caches) {
-                                caches.set(idx, new TestCache(srv.cache(DEFAULT_CACHE_NAME)));
-                            }
-
-                            awaitPartitionMapExchange();
-
-                            break;
-                        }
-
-                        default:
-                            fail();
-                    }
-                }
-            }
-
-            stop.set(true);
-
-            writeFut.get();
-            readFut.get();
-        }
-        finally {
-            stop.set(true);
-        }
-    }
-    /**
      * @throws IgniteCheckedException If failed.
      */
     public void testSize() throws Exception {
@@ -4219,155 +3691,6 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param cacheMode Cache mode.
-     * @param syncMode Write synchronization mode.
-     * @param backups Number of backups.
-     * @param parts Number of partitions.
-     * @return Cache configuration.
-     */
-    private CacheConfiguration<Object, Object> cacheConfiguration(
-        CacheMode cacheMode,
-        CacheWriteSynchronizationMode syncMode,
-        int backups,
-        int parts) {
-        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
-
-        ccfg.setCacheMode(cacheMode);
-        ccfg.setAtomicityMode(TRANSACTIONAL);
-        ccfg.setWriteSynchronizationMode(syncMode);
-        ccfg.setAffinity(new RendezvousAffinityFunction(false, parts));
-
-        if (cacheMode == PARTITIONED)
-            ccfg.setBackups(backups);
-
-        return ccfg;
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    private void verifyCoordinatorInternalState() throws Exception {
-        for (Ignite node : G.allGrids()) {
-            final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators();
-
-            Map activeTxs = GridTestUtils.getFieldValue(crd, "activeTxs");
-
-            assertTrue("Txs on node [node=" + node.name() + ", txs=" + activeTxs.toString() + ']',
-                activeTxs.isEmpty());
-
-            Map cntrFuts = GridTestUtils.getFieldValue(crd, "verFuts");
-
-            assertTrue(cntrFuts.isEmpty());
-
-            Map ackFuts = GridTestUtils.getFieldValue(crd, "ackFuts");
-
-            assertTrue(ackFuts.isEmpty());
-
-            // TODO IGNITE-3478
-            // checkActiveQueriesCleanup(node);
-        }
-    }
-
-    /**
-     * @param node Node.
-     * @throws Exception If failed.
-     */
-    private void checkActiveQueriesCleanup(Ignite node) throws Exception {
-        final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators();
-
-        assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition(
-            new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    Object activeQueries = GridTestUtils.getFieldValue(crd, "activeQueries");
-
-                    synchronized (activeQueries) {
-                        Long minQry = GridTestUtils.getFieldValue(activeQueries, "minQry");
-
-                        if (minQry != null)
-                            log.info("Min query: " + minQry);
-
-                        Map<Object, Map> queriesMap = GridTestUtils.getFieldValue(activeQueries, "activeQueries");
-
-                        boolean empty = true;
-
-                        for (Map.Entry<Object, Map> e : queriesMap.entrySet()) {
-                            if (!e.getValue().isEmpty()) {
-                                empty = false;
-
-                                log.info("Active queries: " + e);
-                            }
-                        }
-
-                        return empty && minQry == null;
-                    }
-                }
-            }, 8_000)
-        );
-
-        assertTrue("Previous coordinator queries not empty: " + node.name(), GridTestUtils.waitForCondition(
-            new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries");
-                    Boolean prevDone = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone");
-
-                    if (!queries.isEmpty() || !prevDone)
-                        log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']');
-
-                    return queries.isEmpty();
-                }
-            }, 8_000)
-        );
-    }
-
-    /**
-     * @param caches Caches.
-     * @param rnd Random.
-     * @return Random cache.
-     */
-    private static <K, V> TestCache<K, V> randomCache(List<TestCache> caches, ThreadLocalRandom rnd) {
-        synchronized (caches) {
-            if (caches.size() == 1)
-                return caches.get(0);
-
-            for (;;) {
-                int idx = rnd.nextInt(caches.size());
-
-                TestCache testCache = caches.get(idx);
-
-                if (testCache.readLock())
-                    return testCache;
-            }
-        }
-    }
-
-    /**
-     *
-     */
-    static class MvccTestAccount {
-        /** */
-        private final int val;
-
-        /** */
-        private final int updateCnt;
-
-        /**
-         * @param val Value.
-         * @param updateCnt Updates counter.
-         */
-        MvccTestAccount(int val, int updateCnt) {
-            assert updateCnt > 0;
-
-            this.val = val;
-            this.updateCnt = updateCnt;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MvccTestAccount.class, this);
-        }
-    }
-
-    /**
      *
      */
     static class Value {
@@ -4395,58 +3718,6 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      *
      */
-    enum ReadMode {
-        /** */
-        GET_ALL,
-
-        /** */
-        SCAN
-    }
-
-    /**
-     *
-     */
-    enum RestartMode {
-        /**
-         * Dedicated coordinator node is restarted during test.
-         */
-        RESTART_CRD,
-
-        /** */
-        RESTART_RND_SRV
-    }
-
-    /**
-     *
-     */
-    static class CoordinatorNodeFilter implements IgnitePredicate<ClusterNode> {
-        /** {@inheritDoc} */
-        @Override public boolean apply(ClusterNode node) {
-            return node.attribute(CRD_ATTR) == null;
-        }
-    }
-
-    /**
-     *
-     */
-    static class CoordinatorAssignClosure implements IgniteClosure<Collection<ClusterNode>, ClusterNode> {
-        /** {@inheritDoc} */
-        @Override public ClusterNode apply(Collection<ClusterNode> clusterNodes) {
-            for (ClusterNode node : clusterNodes) {
-                if (node.attribute(CRD_ATTR) != null) {
-                    assert !CU.clientNode(node) : node;
-
-                    return node;
-                }
-            }
-
-            return null;
-        }
-    }
-
-    /**
-     *
-     */
     static class TestKey implements Serializable {
         /** */
         private final int key;
@@ -4493,36 +3764,4 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']';
         }
     }
-
-    /**
-     *
-     */
-    static class TestCache<K, V> {
-        /** */
-        private final IgniteCache<K, V> cache;
-
-        /** Locks node to avoid node restart while test operation is in progress. */
-        private final ReadWriteLock stopLock = new ReentrantReadWriteLock();
-
-        /**
-         * @param cache Cache.
-         */
-        TestCache(IgniteCache cache) {
-            this.cache = cache;
-        }
-
-        /**
-         * @return {@code True} if locked.
-         */
-        boolean readLock() {
-            return stopLock.readLock().tryLock();
-        }
-
-        /**
-         *
-         */
-        void readUnlock() {
-            stopLock.readLock().unlock();
-        }
-    }
 }


[2/3] ignite git commit: Merge remote-tracking branch 'origin/ignite-3478' into ignite-3478

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-3478' into ignite-3478

# Conflicts:
#	modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/68cf84a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/68cf84a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/68cf84a2

Branch: refs/heads/ignite-3478
Commit: 68cf84a2f6370aa4a1ea3f499393c2ac83bb19b4
Parents: 1fadab5 e2853a2
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 16 15:19:05 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 16 15:19:05 2017 +0300

----------------------------------------------------------------------

----------------------------------------------------------------------



[3/3] ignite git commit: ignite-3478 Tests restructured

Posted by sb...@apache.org.
ignite-3478 Tests restructured


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b69f62eb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b69f62eb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b69f62eb

Branch: refs/heads/ignite-3478
Commit: b69f62eb6f293553502b9dd984b1fa6526a77401
Parents: 68cf84a
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 16 15:21:27 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 16 15:21:27 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/mvcc/CacheMvccAbstractTest.java        | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b69f62eb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 3954bff..f9ac96f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -714,8 +714,13 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
         List<TestCache> caches,
         ThreadLocalRandom rnd) {
         synchronized (caches) {
-            if (caches.size() == 1)
-                return caches.get(0);
+            if (caches.size() == 1) {
+                TestCache cache = caches.get(0);
+
+                assertTrue(cache.readLock());
+
+                return cache;
+            }
 
             for (;;) {
                 int idx = rnd.nextInt(caches.size());