You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/30 12:24:16 UTC

[10/45] ignite git commit: IGNITE-4191: MVCC and transactional SQL support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov, Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov, Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
new file mode 100644
index 0000000..71e004b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
@@ -0,0 +1,808 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxQueryEnlistResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
+import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Backups tests.
+ */
+@SuppressWarnings("unchecked")
+public abstract class CacheMvccBackupsAbstractTest extends CacheMvccAbstractTest {
+
+    /** Test timeout. */
+    private final long txLongTimeout = getTestTimeout() / 4;
+
+    /**
+     * Tests backup consistency.
+     *
+     * @throws Exception If fails.
+     */
+    public void testBackupsCoherenceSimple() throws Exception {
+        disableScheduledVacuum = true;
+
+        ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 2, DFLT_PARTITION_COUNT)
+            .setIndexedTypes(Integer.class, Integer.class);
+
+        final int KEYS_CNT = 5_000;
+        assert KEYS_CNT % 2 == 0;
+
+        startGrids(3);
+
+        Ignite node0 = grid(0);
+        Ignite node1 = grid(1);
+        Ignite node2 = grid(2);
+
+        client = true;
+
+        Ignite client = startGrid();
+
+        awaitPartitionMapExchange();
+
+        IgniteCache clientCache = client.cache(DEFAULT_CACHE_NAME);
+        IgniteCache cache0 = node0.cache(DEFAULT_CACHE_NAME);
+        IgniteCache cache1 = node1.cache(DEFAULT_CACHE_NAME);
+        IgniteCache cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            for (int i = 0; i < KEYS_CNT / 2; i += 2) {
+                SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values ("
+                    + i + ',' + i * 2 + "), (" + (i + 1) + ',' + (i + 1) * 2 + ')');
+
+                clientCache.query(qry).getAll();
+            }
+
+            tx.commit();
+        }
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            for (int i = 0; i < 10; i++) {
+                SqlFieldsQuery qry = new SqlFieldsQuery("DELETE from Integer WHERE _key = " + i);
+
+                clientCache.query(qry).getAll();
+            }
+
+            for (int i = 10; i < KEYS_CNT + 1; i++) {
+                SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer SET _val=" + i * 10 + " WHERE _key = " + i);
+
+                clientCache.query(qry).getAll();
+            }
+
+            tx.commit();
+        }
+
+        Map<KeyCacheObject, List<CacheDataRow>> vers0 = allVersions(cache0);
+
+        List res0 = getAll(cache0, "Integer");
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        Map<KeyCacheObject, List<CacheDataRow>> vers1 = allVersions(cache1);
+
+        assertVersionsEquals(vers0, vers1);
+
+        List res1 = getAll(cache1, "Integer");
+
+        assertEqualsCollections(res0, res1);
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            for (int i = 10; i < 20; i++) {
+                SqlFieldsQuery qry = new SqlFieldsQuery("DELETE from Integer WHERE _key = " + i);
+
+                clientCache.query(qry).getAll();
+            }
+
+            for (int i = 20; i < KEYS_CNT + 1; i++) {
+                SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer SET _val=" + i * 100 + " WHERE _key = " + i);
+
+                clientCache.query(qry).getAll();
+            }
+
+            tx.commit();
+        }
+
+        vers1 = allVersions(cache1);
+
+        res1 = getAll(cache2, "Integer");
+
+        stopGrid(1);
+
+        awaitPartitionMapExchange();
+
+        Map<KeyCacheObject, List<CacheDataRow>> vers2 = allVersions(cache2);
+
+        assertVersionsEquals(vers1, vers2);
+
+        List res2 = getAll(cache2, "Integer");
+
+        assertEqualsCollections(res1, res2);
+    }
+
+    /**
+     * Checks cache backups consistency with large queries.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBackupsCoherenceWithLargeOperations() throws Exception {
+        disableScheduledVacuum = true;
+
+        ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, DFLT_PARTITION_COUNT)
+            .setIndexedTypes(Integer.class, Integer.class);
+
+        final int KEYS_CNT = 50_000;
+        assert KEYS_CNT % 2 == 0;
+
+        startGrids(2);
+
+        Ignite node1 = grid(0);
+        Ignite node2 = grid(1);
+
+        client = true;
+
+        Ignite client = startGrid();
+
+        awaitPartitionMapExchange();
+
+        IgniteCache clientCache = client.cache(DEFAULT_CACHE_NAME);
+        IgniteCache cache1 = node1.cache(DEFAULT_CACHE_NAME);
+        IgniteCache cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+        StringBuilder insert = new StringBuilder("INSERT INTO Integer (_key, _val) values ");
+
+        boolean first = true;
+
+        for (int key = 0; key < KEYS_CNT; key++) {
+            if (!first)
+                insert.append(',');
+            else
+                first = false;
+
+            insert.append('(').append(key).append(',').append(key * 10).append(')');
+        }
+
+        String qryStr = insert.toString();
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+            clientCache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        qryStr = "SELECT * FROM Integer WHERE _key >= " + KEYS_CNT / 2 + " FOR UPDATE";
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+            clientCache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+
+        qryStr = "DELETE FROM Integer WHERE _key >= " + KEYS_CNT / 2;
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+            clientCache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        Map<KeyCacheObject, List<CacheDataRow>> cache1Vers = allVersions(cache1);
+
+        List res1 = getAll(cache1, "Integer");
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        Map<KeyCacheObject, List<CacheDataRow>> cache2Vers = allVersions(cache2);
+
+        assertVersionsEquals(cache1Vers, cache2Vers);
+
+        List res2 = getAll(cache2, "Integer");
+
+        assertEqualsCollections(res1, res2);
+    }
+
+    /**
+     * Checks cache backups consistency with in-flight batches overflow.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBackupsCoherenceWithInFlightBatchesOverflow() throws Exception {
+        testSpi = true;
+
+        disableScheduledVacuum = true;
+
+        ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, DFLT_PARTITION_COUNT)
+            .setIndexedTypes(Integer.class, Integer.class);
+
+        final int KEYS_CNT = 30_000;
+        assert KEYS_CNT % 2 == 0;
+
+        startGrids(2);
+
+        Ignite node1 = grid(0);
+        Ignite node2 = grid(1);
+
+        client = true;
+
+        Ignite client = startGrid();
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<?,?> clientCache = client.cache(DEFAULT_CACHE_NAME);
+        IgniteCache<?,?> cache1 = node1.cache(DEFAULT_CACHE_NAME);
+        IgniteCache<?,?> cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+        AtomicInteger keyGen = new AtomicInteger();
+        Affinity affinity = affinity(clientCache);
+
+        ClusterNode cNode1 = ((IgniteEx)node1).localNode();
+        ClusterNode cNode2 = ((IgniteEx)node2).localNode();
+
+        StringBuilder insert = new StringBuilder("INSERT INTO Integer (_key, _val) values ");
+
+        for (int i = 0; i < KEYS_CNT; i++) {
+            if (i > 0)
+                insert.append(',');
+
+            // To make big batches in near results future.
+            Integer key = i < KEYS_CNT / 2 ? keyForNode(affinity, keyGen, cNode1) : keyForNode(affinity, keyGen, cNode2);
+
+            assert key != null;
+
+            insert.append('(').append(key).append(',').append(key * 10).append(')');
+        }
+
+        String qryStr = insert.toString();
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+            clientCache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        // Add a delay to simulate batches overflow.
+        TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(node1);
+        TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(node2);
+
+        spi1.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+            @Override public void apply(ClusterNode node, Message msg) {
+                if (msg instanceof GridDhtTxQueryEnlistResponse)
+                    doSleep(100);
+            }
+        });
+
+        spi2.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+            @Override public void apply(ClusterNode node, Message msg) {
+                if (msg instanceof GridDhtTxQueryEnlistResponse)
+                    doSleep(100);
+            }
+        });
+
+        qryStr = "DELETE FROM Integer WHERE _key >= " + 10;
+
+        try (Transaction tx = client.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(txLongTimeout);
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(qryStr);
+
+            clientCache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        Map<KeyCacheObject, List<CacheDataRow>> cache1Vers = allVersions(cache1);
+
+        List res1 = getAll(cache1, "Integer");
+
+        stopGrid(0);
+
+        awaitPartitionMapExchange();
+
+        Map<KeyCacheObject, List<CacheDataRow>> cache2Vers = allVersions(cache2);
+
+        assertVersionsEquals(cache1Vers, cache2Vers);
+
+        List res2 = getAll(cache2, "Integer");
+
+        assertEqualsCollections(res1, res2);
+    }
+
+    /**
+     * Tests concurrent updates backups coherence.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBackupsCoherenceWithConcurrentUpdates2ServersNoClients() throws Exception {
+        checkBackupsCoherenceWithConcurrentUpdates(2, 0);
+    }
+
+    /**
+     * Tests concurrent updates backups coherence.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBackupsCoherenceWithConcurrentUpdates4ServersNoClients() throws Exception {
+        checkBackupsCoherenceWithConcurrentUpdates(4, 0);
+    }
+
+    /**
+     * Tests concurrent updates backups coherence.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBackupsCoherenceWithConcurrentUpdates3Servers1Client() throws Exception {
+        checkBackupsCoherenceWithConcurrentUpdates(3, 1);
+    }
+
+    /**
+     * Tests concurrent updates backups coherence.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBackupsCoherenceWithConcurrentUpdates5Servers2Clients() throws Exception {
+        checkBackupsCoherenceWithConcurrentUpdates(5, 2);
+    }
+
+    /**
+     * Tests concurrent updates backups coherence.
+     *
+     * @throws Exception If failed.
+     */
+    private void checkBackupsCoherenceWithConcurrentUpdates(int srvs, int clients) throws Exception {
+        assert srvs > 1;
+
+        disableScheduledVacuum = true;
+
+        accountsTxReadAll(srvs, clients, srvs - 1, DFLT_PARTITION_COUNT,
+            new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, DML, 5_000, null);
+
+        for (int i = 0; i < srvs - 1; i++) {
+            Ignite node1 = grid(i);
+
+            IgniteCache cache1 = node1.cache(DEFAULT_CACHE_NAME);
+
+            Map<KeyCacheObject, List<CacheDataRow>> vers1 = allVersions(cache1);
+
+            List res1 = getAll(cache1, "MvccTestAccount");
+
+            stopGrid(i);
+
+            awaitPartitionMapExchange();
+
+            Ignite node2 = grid(i + 1);
+
+            IgniteCache cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+            Map<KeyCacheObject, List<CacheDataRow>> vers2 = allVersions(cache2);
+
+            assertVersionsEquals(vers1, vers2);
+
+            List res2 = getAll(cache2, "MvccTestAccount");
+
+            assertEqualsCollections(res1, res2);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoForceKeyRequestDelayedRebalanceNoVacuum() throws Exception {
+        disableScheduledVacuum = true;
+
+        doTestRebalanceNodeAdd(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoForceKeyRequestDelayedRebalance() throws Exception {
+        doTestRebalanceNodeAdd(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoForceKeyRequestNoVacuum() throws Exception {
+        disableScheduledVacuum = true;
+
+        doTestRebalanceNodeAdd(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoForceKeyRequest() throws Exception {
+        doTestRebalanceNodeAdd(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTestRebalanceNodeAdd(boolean delayRebalance) throws Exception {
+        testSpi = true;
+
+        final Ignite node1 = startGrid(0);
+
+        final IgniteCache<Object, Object> cache = node1.createCache(
+            cacheConfiguration(cacheMode(), FULL_SYNC, 1, 16)
+                .setIndexedTypes(Integer.class, Integer.class));
+
+        try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values " +
+                "(1,1),(2,2),(3,3),(4,4),(5,5)");
+
+            cache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(node1);
+
+        // Check for a force key request.
+        spi.closure(new IgniteBiInClosure<ClusterNode, Message>() {
+            @Override public void apply(ClusterNode node, Message msg) {
+                if (delayRebalance && msg instanceof GridDhtPartitionSupplyMessage)
+                    doSleep(500);
+
+                if (msg instanceof GridDhtForceKeysResponse)
+                    fail("Force key request");
+            }
+        });
+
+        final Ignite node2 = startGrid(1);
+
+        TestRecordingCommunicationSpi.spi(node2).closure(
+            new IgniteBiInClosure<ClusterNode, Message>() {
+                @Override public void apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtForceKeysRequest)
+                        fail("Force key request");
+                }
+            }
+        );
+
+        IgniteCache<Object, Object> cache2 = node2.cache(DEFAULT_CACHE_NAME);
+
+        try (Transaction tx = node2.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            SqlFieldsQuery qry = new SqlFieldsQuery("DELETE FROM Integer WHERE _key IN " +
+                "(1,2,3,4,5)");
+
+            cache2.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        awaitPartitionMapExchange();
+
+        doSleep(2000);
+
+        stopGrid(1);
+
+        try (Transaction tx = node1.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("INSERT INTO Integer (_key, _val) values " +
+                "(1,1),(2,2),(3,3),(4,4),(5,5)");
+
+            cache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        doSleep(1000);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRebalanceNodeLeaveClient() throws Exception {
+        doTestRebalanceNodeLeave(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRebalanceNodeLeaveServer() throws Exception {
+        doTestRebalanceNodeLeave(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void doTestRebalanceNodeLeave(boolean startClient) throws Exception {
+        testSpi = true;
+        disableScheduledVacuum = true;
+
+        startGridsMultiThreaded(4);
+
+        client = true;
+
+        final Ignite node = startClient ? startGrid(4) : grid(0);
+
+        final IgniteCache<Object, Object> cache = node.createCache(
+            cacheConfiguration(cacheMode(), FULL_SYNC, 2, 16)
+                .setIndexedTypes(Integer.class, Integer.class));
+
+        List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 4; i++)
+            keys.addAll(primaryKeys(grid(i).cache(DEFAULT_CACHE_NAME), 2));
+
+        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            StringBuilder sb = new StringBuilder("INSERT INTO Integer (_key, _val) values ");
+
+            for (int i = 0; i < keys.size(); i++) {
+                if (i > 0)
+                    sb.append(", ");
+
+                sb.append("(" + keys.get(i) + ", " + keys.get(i) + ")");
+            }
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(sb.toString());
+
+            cache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        stopGrid(3);
+
+        awaitPartitionMapExchange();
+
+        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            SqlFieldsQuery qry = new SqlFieldsQuery("UPDATE Integer SET _val = 10*_key");
+
+            cache.query(qry).getAll();
+
+            tx.commit();
+        }
+
+        awaitPartitionMapExchange();
+
+        for (Integer key : keys) {
+            List<CacheDataRow> vers = null;
+
+            for (int i = 0; i < 3; i++) {
+                ClusterNode n = grid(i).cluster().localNode();
+
+                if (node.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(n, key)) {
+
+                    List<CacheDataRow> vers0 = allKeyVersions(grid(i).cache(DEFAULT_CACHE_NAME), key);
+
+                    if (vers != null)
+                        assertKeyVersionsEquals(vers, vers0);
+
+                    vers = vers0;
+                }
+            }
+        }
+    }
+
+    /**
+     * Retrieves all versions of all keys from cache.
+     *
+     * @param cache Cache.
+     * @return {@link Map} of keys to its versions.
+     * @throws IgniteCheckedException If failed.
+     */
+    private Map<KeyCacheObject, List<CacheDataRow>> allVersions(IgniteCache cache) throws IgniteCheckedException {
+        IgniteCacheProxy cache0 = (IgniteCacheProxy)cache;
+        GridCacheContext cctx = cache0.context();
+
+        assert cctx.mvccEnabled();
+
+        Map<KeyCacheObject, List<CacheDataRow>> vers = new HashMap<>();
+
+        for (Object e : cache) {
+            IgniteBiTuple entry = (IgniteBiTuple)e;
+
+            KeyCacheObject key = cctx.toCacheKeyObject(entry.getKey());
+
+            GridCursor<CacheDataRow> cur = cctx.offheap().mvccAllVersionsCursor(cctx, key, null);
+
+            List<CacheDataRow> rows = new ArrayList<>();
+
+            while (cur.next()) {
+                CacheDataRow row = cur.get();
+
+                rows.add(row);
+            }
+
+            vers.put(key, rows);
+        }
+
+        return vers;
+    }
+
+    /**
+     * @param cache Cache.
+     * @param key Key.
+     * @return Collection of versioned rows.
+     * @throws IgniteCheckedException if failed.
+     */
+    private List<CacheDataRow> allKeyVersions(IgniteCache cache, Object key) throws IgniteCheckedException {
+        IgniteCacheProxy cache0 = (IgniteCacheProxy)cache;
+        GridCacheContext cctx = cache0.context();
+
+        KeyCacheObject key0 = cctx.toCacheKeyObject(key);
+
+        GridCursor<CacheDataRow> cur = cctx.offheap().mvccAllVersionsCursor(cctx, key0, null);
+
+        List<CacheDataRow> rows = new ArrayList<>();
+
+        while (cur.next()) {
+            CacheDataRow row = cur.get();
+
+            rows.add(row);
+        }
+
+        return rows;
+    }
+
+    /**
+     * Checks stored versions equality.
+     *
+     * @param left Keys versions to compare.
+     * @param right Keys versions to compare.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void assertVersionsEquals(Map<KeyCacheObject, List<CacheDataRow>> left,
+        Map<KeyCacheObject, List<CacheDataRow>> right) throws IgniteCheckedException {
+        assertNotNull(left);
+        assertNotNull(right);
+
+        assertTrue(!left.isEmpty());
+        assertTrue(!right.isEmpty());
+
+        assertEqualsCollections(left.keySet(), right.keySet());
+
+        for (KeyCacheObject key : right.keySet()) {
+            List<CacheDataRow> leftRows = left.get(key);
+            List<CacheDataRow> rightRows = right.get(key);
+
+            assertKeyVersionsEquals(leftRows, rightRows);
+        }
+    }
+
+    /**
+     *
+     * @param leftRows Left rows.
+     * @param rightRows Right rows.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void assertKeyVersionsEquals(List<CacheDataRow> leftRows, List<CacheDataRow> rightRows)
+        throws IgniteCheckedException {
+
+        assertNotNull(leftRows);
+        assertNotNull(rightRows);
+
+        assertEquals("leftRows=" + leftRows + ", rightRows=" + rightRows, leftRows.size(), rightRows.size());
+
+        for (int i = 0; i < leftRows.size(); i++) {
+            CacheDataRow leftRow = leftRows.get(i);
+            CacheDataRow rightRow = rightRows.get(i);
+
+            assertNotNull(leftRow);
+            assertNotNull(rightRow);
+
+            assertTrue(leftRow instanceof MvccDataRow);
+            assertTrue(rightRow instanceof MvccDataRow);
+
+            leftRow.key().valueBytes(null);
+
+            assertEquals(leftRow.expireTime(), rightRow.expireTime());
+            assertEquals(leftRow.partition(), rightRow.partition());
+            assertArrayEquals(leftRow.value().valueBytes(null), rightRow.value().valueBytes(null));
+            assertEquals(leftRow.version(), rightRow.version());
+            assertEquals(leftRow.cacheId(), rightRow.cacheId());
+            assertEquals(leftRow.hash(), rightRow.hash());
+            assertEquals(leftRow.key(), rightRow.key());
+            assertTrue(MvccUtils.compare(leftRow, rightRow.mvccVersion()) == 0);
+            assertTrue(MvccUtils.compareNewVersion(leftRow, rightRow.newMvccVersion()) == 0);
+            assertEquals(leftRow.newMvccCoordinatorVersion(), rightRow.newMvccCoordinatorVersion());
+            assertEquals(leftRow.newMvccCounter(), rightRow.newMvccCounter());
+            assertEquals(leftRow.newMvccOperationCounter(), rightRow.newMvccOperationCounter());
+        }
+    }
+
+    /**
+     * Retrieves all table rows from local node.
+     * @param cache Cache.
+     * @param tblName Table name.
+     * @return All table rows.
+     */
+    private List getAll(IgniteCache cache, String tblName) {
+        List res = cache.query(new SqlFieldsQuery("SELECT * FROM " + tblName)).getAll();
+
+        Collections.sort(res, new Comparator<Object>() {
+            @Override public int compare(Object o1, Object o2) {
+                List<?> l1 = (List<?>)o1;
+                List<?> l2 = (List<?>)o2;
+
+                int res =  ((Comparable)l1.get(0)).compareTo((Comparable)l2.get(0));
+
+                if (res == 0 && l1.size() > 1)
+                    return ((Comparable)l1.get(1)).compareTo((Comparable)l2.get(1));
+                else
+                    return res;
+
+            }
+        });
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java
new file mode 100644
index 0000000..d620e84
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBulkLoadTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.io.File;
+import java.io.Serializable;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+/**
+ *
+ */
+public class CacheMvccBulkLoadTest extends CacheMvccAbstractTest {
+    /** */
+    private IgniteCache<Object, Object> sqlNexus;
+
+    /** */
+    private Statement stmt;
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        Ignite ignite = startGrid(0);
+        sqlNexus = ignite.getOrCreateCache(new CacheConfiguration<>("sqlNexus").setSqlSchema("PUBLIC"));
+        sqlNexus.query(q("" +
+            "create table person(" +
+            "  id int not null primary key," +
+            "  name varchar not null" +
+            ") with \"atomicity=transactional\""
+        ));
+        stmt = DriverManager.getConnection("jdbc:ignite:thin://127.0.0.1").createStatement();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCopyStoresData() throws Exception {
+        String csvFilePath = new File(getClass().getResource("mvcc_person.csv").toURI()).getAbsolutePath();
+        stmt.executeUpdate("copy from '" + csvFilePath + "' into person (id, name) format csv");
+
+        List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll();
+
+        List<List<? extends Serializable>> exp = asList(
+            asList(1, "John"),
+            asList(2, "Jack")
+        );
+        assertEquals(exp, rows);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCopyDoesNotOverwrite() throws Exception {
+        sqlNexus.query(q("insert into person values(1, 'Old')"));
+        String csvFilePath = new File(getClass().getResource("mvcc_person.csv").toURI()).getAbsolutePath();
+        stmt.executeUpdate("copy from '" + csvFilePath + "' into person (id, name) format csv");
+
+        List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll();
+
+        List<List<? extends Serializable>> exp = asList(
+            asList(1, "Old"),
+            asList(2, "Jack")
+        );
+        assertEquals(exp, rows);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCopyLeavesPartialResultsInCaseOfFailure() throws Exception {
+        String csvFilePath = new File(getClass().getResource("mvcc_person_broken.csv").toURI()).getAbsolutePath();
+        try {
+            stmt.executeUpdate("copy from '" + csvFilePath + "' into person (id, name) format csv");
+            fail();
+        }
+        catch (SQLException ignored) {
+            // assert exception is thrown
+        }
+
+        List<List<?>> rows = sqlNexus.query(q("select * from person")).getAll();
+
+        List<List<? extends Serializable>> exp = singletonList(
+            asList(1, "John")
+        );
+        assertEquals(exp, rows);
+    }
+
+    /** */
+    private static SqlFieldsQuery q(String sql) {
+        return new SqlFieldsQuery(sql);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java
new file mode 100644
index 0000000..bb5e753
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccDmlSimpleTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+
+import static java.util.Arrays.asList;
+
+/**
+ *
+ */
+public class CacheMvccDmlSimpleTest extends CacheMvccAbstractTest {
+    /** */
+    private IgniteCache<?, ?> cache;
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cache = startGrid(0).getOrCreateCache(
+            new CacheConfiguration<>("test")
+                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
+                .setSqlSchema("PUBLIC")
+                .setIndexedTypes(Integer.class, Integer.class)
+        );
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testInsert() throws Exception {
+        int cnt = update("insert into Integer(_key, _val) values(1, 1),(2, 2)");
+
+        assertEquals(2, cnt);
+
+        assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+
+        try {
+            update("insert into Integer(_key, _val) values(3, 3),(1, 1)");
+        } catch (CacheException e) {
+            assertTrue(e.getCause() instanceof IgniteSQLException);
+            assertEquals(IgniteQueryErrorCode.DUPLICATE_KEY, ((IgniteSQLException)e.getCause()).statusCode());
+        }
+
+        assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testMerge() throws Exception {
+        {
+            int cnt = update("merge into Integer(_key, _val) values(1, 1),(2, 2)");
+
+            assertEquals(2, cnt);
+            assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+        }
+
+        {
+            int cnt = update("merge into Integer(_key, _val) values(3, 3),(1, 1)");
+
+            assertEquals(2, cnt);
+            assertEquals(asSet(asList(1, 1), asList(2, 2), asList(3, 3)), query("select * from Integer"));
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testUpdate() throws Exception {
+        {
+            int cnt = update("update Integer set _val = 42 where _key = 42");
+
+            assertEquals(0, cnt);
+            assertTrue(query("select * from Integer").isEmpty());
+        }
+
+        update("insert into Integer(_key, _val) values(1, 1),(2, 2)");
+
+        {
+            int cnt = update("update Integer set _val = 42 where _key = 42");
+
+            assertEquals(0, cnt);
+            assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+        }
+
+        {
+            int cnt = update("update Integer set _val = 42 where _key >= 42");
+
+            assertEquals(0, cnt);
+            assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+        }
+
+        {
+            int cnt = update("update Integer set _val = 11 where _key = 1");
+
+            assertEquals(1, cnt);
+            assertEquals(asSet(asList(1, 11), asList(2, 2)), query("select * from Integer"));
+        }
+
+        {
+            int cnt = update("update Integer set _val = 12 where _key <= 2");
+
+            assertEquals(asSet(asList(1, 12), asList(2, 12)), query("select * from Integer"));
+            assertEquals(2, cnt);
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testDelete() throws Exception {
+        {
+            int cnt = update("delete from Integer where _key = 42");
+
+            assertEquals(0, cnt);
+        }
+
+        update("insert into Integer(_key, _val) values(1, 1),(2, 2)");
+
+        {
+            int cnt = update("delete from Integer where _key = 42");
+
+            assertEquals(0, cnt);
+            assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+        }
+
+        {
+            int cnt = update("delete from Integer where _key >= 42");
+
+            assertEquals(0, cnt);
+            assertEquals(asSet(asList(1, 1), asList(2, 2)), query("select * from Integer"));
+        }
+
+        {
+            int cnt = update("delete from Integer where _key = 1");
+
+            assertEquals(1, cnt);
+            assertEquals(asSet(asList(2, 2)), query("select * from Integer"));
+        }
+
+        {
+            int cnt = update("delete from Integer where _key <= 2");
+
+            assertTrue(query("select * from Integer").isEmpty());
+            assertEquals(1, cnt);
+        }
+    }
+
+    /**
+     * @param q Query.
+     * @return Row set.
+     */
+    private Set<List<?>> query(String q) {
+        return new HashSet<>(cache.query(new SqlFieldsQuery(q)).getAll());
+    }
+
+    /**
+     * @param q Query.
+     * @return Updated rows count.
+     */
+    private int update(String q) {
+        return Integer.parseInt(cache.query(new SqlFieldsQuery(q)).getAll().get(0).get(0).toString());
+    }
+
+    /** */
+    private Set<List<?>> asSet(List<?>... ls) {
+        return new HashSet<>(asList(ls));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java
new file mode 100644
index 0000000..235d87f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccIteratorWithConcurrentJdbcTransactionTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class CacheMvccIteratorWithConcurrentJdbcTransactionTest extends CacheMvccIteratorWithConcurrentTransactionTest {
+    /** {@inheritDoc} */
+    @Override boolean jdbcTx() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java
new file mode 100644
index 0000000..97c062f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class CacheMvccLocalEntriesWithConcurrentJdbcTransactionTest extends
+    CacheMvccLocalEntriesWithConcurrentTransactionTest {
+    /** {@inheritDoc} */
+    @Override boolean jdbcTx() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java
new file mode 100644
index 0000000..71d832c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedBackupsTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedBackupsTest extends CacheMvccBackupsAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java
new file mode 100644
index 0000000..12209ab
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSelectForUpdateQueryTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSelectForUpdateQueryTest extends CacheMvccSelectForUpdateQueryAbstractTest {
+    /** {@inheritDoc} */
+    public CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /**
+     *
+     */
+    public void testSelectForUpdateDistributedSegmented() throws Exception {
+        doTestSelectForUpdateDistributed("PersonSeg", false);
+    }
+
+    /**
+     *
+     */
+    public void testSelectForUpdateLocalSegmented() throws Exception {
+        doTestSelectForUpdateLocal("PersonSeg", false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
new file mode 100644
index 0000000..1362b4a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * SQL Mvcc coordinator failover test for partitioned caches.
+ */
+public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbstractSqlCoordinatorFailoverTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.PARTITIONED;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_ClientServer_Backups2_CoordinatorFails() throws Exception {
+        accountsTxReadAll(4, 2, 2, DFLT_PARTITION_COUNT,
+            new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, DML, DFLT_TEST_TIME, RestartMode.RESTART_CRD);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_Server_Backups1_CoordinatorFails_Persistence() throws Exception {
+        persistence = true;
+
+        accountsTxReadAll(2, 0, 1, 64,
+            new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, DML, DFLT_TEST_TIME, RestartMode.RESTART_CRD);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups3_RestartCoordinator_ScanDml() throws Exception {
+        putAllGetAll(RestartMode.RESTART_CRD  , 5, 2, 3, DFLT_PARTITION_COUNT,
+            new InitIndexing(Integer.class, Integer.class), SCAN, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator_ScanDml_Persistence() throws Exception {
+        persistence = true;
+
+        putAllGetAll(RestartMode.RESTART_CRD  , 2, 1, 2, DFLT_PARTITION_COUNT,
+            new InitIndexing(Integer.class, Integer.class), SCAN, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups2_RestartCoordinator_SqlDml_Persistence() throws Exception {
+        persistence = true;
+
+        putAllGetAll(RestartMode.RESTART_CRD, 4, 2, 2, 64,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator_SqlDml() throws Exception {
+        putAllGetAll(RestartMode.RESTART_CRD, 2, 1, 1, 64,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdate_N_Objects_ClientServer_Backups2_Sql() throws Exception {
+        updateNObjectsTest(7, 3, 2, 2, DFLT_PARTITION_COUNT, DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML, RestartMode.RESTART_CRD);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdate_N_Objects_ClientServer_Backups1_Sql_Persistence() throws Exception {
+        persistence = true;
+
+        updateNObjectsTest(10, 2, 1, 1, DFLT_PARTITION_COUNT, DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML, RestartMode.RESTART_CRD);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlReadInProgressCoordinatorFails() throws Exception {
+        readInProgressCoordinatorFails(false, false, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlReadInsideTxInProgressCoordinatorFails() throws Exception {
+        readInProgressCoordinatorFails(false, true, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlReadInProgressCoordinatorFails_ReadDelay() throws Exception {
+        readInProgressCoordinatorFails(true, false, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlReadInsideTxInProgressCoordinatorFails_ReadDelay() throws Exception {
+        readInProgressCoordinatorFails(true, true, PESSIMISTIC, REPEATABLE_READ, SQL, DML, new InitIndexing(Integer.class, Integer.class));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception {
+        readInProgressCoordinatorFailsSimple(false, new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java
new file mode 100644
index 0000000..e0b4a24
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlQueriesTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSqlQueriesTest extends CacheMvccSqlQueriesAbstractTest {
+    /** {@inheritDoc} */
+    protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
new file mode 100644
index 0000000..199cfad
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSqlTxQueriesTest extends CacheMvccSqlTxQueriesAbstractTest {
+    /** {@inheritDoc} */
+    protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java
new file mode 100644
index 0000000..03de543
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlTxQueriesWithReducerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/** */
+public class CacheMvccPartitionedSqlTxQueriesWithReducerTest extends CacheMvccSqlTxQueriesWithReducerAbstractTest {
+    /** {@inheritDoc} */
+    protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java
new file mode 100644
index 0000000..02de0a3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedBackupsTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedBackupsTest extends CacheMvccBackupsAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java
new file mode 100644
index 0000000..a458319
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSelectForUpdateQueryTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedSelectForUpdateQueryTest extends CacheMvccSelectForUpdateQueryAbstractTest {
+    /** {@inheritDoc} */
+    public CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java
new file mode 100644
index 0000000..2f72bce
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlCoordinatorFailoverTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+/**
+ * SQL Mvcc coordinator failover test for replicated caches.
+ */
+public class CacheMvccReplicatedSqlCoordinatorFailoverTest extends CacheMvccAbstractSqlCoordinatorFailoverTest {
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return CacheMode.REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java
new file mode 100644
index 0000000..ba8a5c3
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlQueriesTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedSqlQueriesTest extends CacheMvccSqlQueriesAbstractTest {
+    /** {@inheritDoc} */
+    protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java
new file mode 100644
index 0000000..bde2c5d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/** */
+public class CacheMvccReplicatedSqlTxQueriesTest extends CacheMvccSqlTxQueriesAbstractTest {
+    /** {@inheritDoc} */
+    protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        ccfgs = null;
+        ccfg = null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedJoinPartitionedClient() throws Exception {
+        checkReplicatedJoinPartitioned(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testReplicatedJoinPartitionedServer() throws Exception {
+        checkReplicatedJoinPartitioned(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void checkReplicatedJoinPartitioned(boolean client) throws Exception {
+        ccfgs = new CacheConfiguration[] {
+            cacheConfiguration(REPLICATED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+                .setName("int")
+                .setIndexedTypes(Integer.class, Integer.class),
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 2, DFLT_PARTITION_COUNT)
+                .setIndexedTypes(Integer.class,
+                CacheMvccSqlTxQueriesAbstractTest.MvccTestSqlIndexValue.class),
+            cacheConfiguration(REPLICATED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+                .setName("target")
+                .setIndexedTypes(Integer.class, Integer.class)
+        };
+
+        startGridsMultiThreaded(3);
+
+        this.client = true;
+
+        startGrid(3);
+
+        Ignite node = client ? grid(3) : grid(0);
+
+        List<List<?>> r;
+
+        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(TX_TIMEOUT);
+
+            r = runSql(node, "INSERT INTO \"int\".Integer(_key, _val) VALUES (1,1), (2,2), (3,3)");
+
+            assertEquals(3L, r.get(0).get(0));
+
+            tx.commit();
+        }
+
+        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(TX_TIMEOUT);
+
+            r = runSql(node, "INSERT INTO \"default\".MvccTestSqlIndexValue(_key, idxVal1) " +
+                "VALUES (1,10), (2, 20), (3, 30)");
+
+            assertEquals(3L, r.get(0).get(0));
+
+            tx.commit();
+        }
+
+        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(TX_TIMEOUT);
+
+            r = runSql(node, "INSERT INTO \"target\".Integer(_key, _val) " +
+                "SELECT a._key, a.idxVal1*b._val FROM \"default\".MvccTestSqlIndexValue a " +
+                "JOIN \"int\".Integer b ON a._key = b._key");
+
+            assertEquals(3L, r.get(0).get(0));
+
+            tx.commit();
+        }
+
+        for (int n = 0; n < 3; ++n) {
+            node = grid(n);
+
+            r = runSqlLocal(node, "SELECT _key, _val FROM \"target\".Integer ORDER BY _key");
+
+            assertEquals(3L, r.size());
+
+            assertEquals(1, r.get(0).get(0));
+            assertEquals(2, r.get(1).get(0));
+            assertEquals(3, r.get(2).get(0));
+
+            assertEquals(10, r.get(0).get(1));
+            assertEquals(40, r.get(1).get(1));
+            assertEquals(90, r.get(2).get(1));
+        }
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testReplicatedAndPartitionedUpdateSingleTransaction() throws Exception {
+        ccfgs = new CacheConfiguration[] {
+            cacheConfiguration(REPLICATED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+                .setName("rep")
+                .setIndexedTypes(Integer.class, Integer.class),
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+                .setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class)
+                .setName("part"),
+        };
+
+        startGridsMultiThreaded(3);
+
+        client = true;
+
+        startGrid(3);
+
+        Random rnd = ThreadLocalRandom.current();
+
+        Ignite node = grid(rnd.nextInt(4));
+
+        List<List<?>> r;
+
+        Cache<Integer, Integer> repCache = node.cache("rep");
+
+        repCache.put(1, 1);
+        repCache.put(2, 2);
+        repCache.put(3, 3);
+
+        Cache<Integer, MvccTestSqlIndexValue> partCache = node.cache("part");
+
+        partCache.put(1, new MvccTestSqlIndexValue(1));
+        partCache.put(2, new MvccTestSqlIndexValue(2));
+        partCache.put(3, new MvccTestSqlIndexValue(3));
+
+        try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            tx.timeout(TX_TIMEOUT);
+
+            r = runSql(node, "UPDATE \"rep\".Integer SET _val = _key * 10");
+
+            assertEquals(3L, r.get(0).get(0));
+
+            r = runSql(node, "UPDATE  \"part\".MvccTestSqlIndexValue SET idxVal1 = _key * 10");
+
+            assertEquals(3L, r.get(0).get(0));
+
+            tx.commit();
+        }
+
+        r = runSql(node, "SELECT COUNT(1) FROM \"rep\".Integer r JOIN \"part\".MvccTestSqlIndexValue p" +
+            " ON r._key = p._key WHERE r._val = p.idxVal1");
+
+        assertEquals(3L, r.get(0).get(0));
+
+        for (int n = 0; n < 3; ++n) {
+            node = grid(n);
+
+            r = runSqlLocal(node, "SELECT _key, _val FROM \"rep\".Integer ORDER BY _key");
+
+            assertEquals(3L, r.size());
+
+            assertEquals(1, r.get(0).get(0));
+            assertEquals(2, r.get(1).get(0));
+            assertEquals(3, r.get(2).get(0));
+
+            assertEquals(10, r.get(0).get(1));
+            assertEquals(20, r.get(1).get(1));
+            assertEquals(30, r.get(2).get(1));
+        }
+    }
+
+    /**
+     * Run query.
+     *
+     * @param node Node.
+     * @param sqlText Query.
+     * @return Results.
+     */
+    private List<List<?>> runSql(Ignite node, String sqlText) {
+        GridQueryProcessor qryProc = ((IgniteEx)node).context().query();
+
+        return qryProc.querySqlFields(new SqlFieldsQuery(sqlText), false).getAll();
+    }
+
+    /**
+     * Run query locally.
+     *
+     * @param node Node.
+     * @param sqlText Query.
+     * @return Results.
+     */
+    private List<List<?>> runSqlLocal(Ignite node, String sqlText) {
+        GridQueryProcessor qryProc = ((IgniteEx)node).context().query();
+
+        return qryProc.querySqlFields(new SqlFieldsQuery(sqlText).setLocal(true), false).getAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java
new file mode 100644
index 0000000..173c43f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccReplicatedSqlTxQueriesWithReducerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import org.apache.ignite.cache.CacheMode;
+
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/** */
+public class CacheMvccReplicatedSqlTxQueriesWithReducerTest extends CacheMvccSqlTxQueriesWithReducerAbstractTest {
+    /** {@inheritDoc} */
+    protected CacheMode cacheMode() {
+        return REPLICATED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java
new file mode 100644
index 0000000..7272def
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccScanQueryWithConcurrentJdbcTransactionTest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class CacheMvccScanQueryWithConcurrentJdbcTransactionTest extends
+    CacheMvccScanQueryWithConcurrentTransactionTest {
+    /** {@inheritDoc} */
+    @Override boolean jdbcTx() {
+        return true;
+    }
+}