You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2018/08/30 11:53:27 UTC

[05/38] ignite git commit: IGNITE-4191: MVCC and transactional SQL support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov, Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov, Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
new file mode 100644
index 0000000..8492d2a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSelectForUpdateQueryAbstractTest.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.connect;
+import static org.apache.ignite.internal.processors.cache.index.AbstractSchemaSelfTest.execute;
+
+/**
+ * Test for {@code SELECT FOR UPDATE} queries.
+ */
+public abstract class CacheMvccSelectForUpdateQueryAbstractTest extends CacheMvccAbstractTest {
+    /** */
+    private static final int CACHE_SIZE = 50;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        disableScheduledVacuum = getName().equals("testSelectForUpdateAfterAbortedTx");
+
+        startGrids(3);
+
+        CacheConfiguration seg = new CacheConfiguration("segmented*");
+
+        seg.setCacheMode(cacheMode());
+
+        if (seg.getCacheMode() == PARTITIONED)
+            seg.setQueryParallelism(4);
+
+        grid(0).addCacheConfiguration(seg);
+
+        try (Connection c = connect(grid(0))) {
+            execute(c, "create table person (id int primary key, firstName varchar, lastName varchar) " +
+                "with \"atomicity=transactional,cache_name=Person\"");
+
+            execute(c, "create table person_seg (id int primary key, firstName varchar, lastName varchar) " +
+                "with \"atomicity=transactional,cache_name=PersonSeg,template=segmented\"");
+
+            try (Transaction tx = grid(0).transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+                TransactionIsolation.REPEATABLE_READ)) {
+
+                for (int i = 1; i <= CACHE_SIZE; i++) {
+                    execute(c, "insert into person(id, firstName, lastName) values(" + i + ",'" + i + "','" + i + "')");
+
+                    execute(c, "insert into person_seg(id, firstName, lastName) " +
+                        "values(" + i + ",'" + i + "','" + i + "')");
+                }
+
+                tx.commit();
+            }
+        }
+
+        AffinityTopologyVersion curVer = grid(0).context().cache().context().exchange().readyAffinityVersion();
+
+        AffinityTopologyVersion nextVer = curVer.nextMinorVersion();
+
+        // Let's wait for rebalance to complete.
+        for (int i = 0; i < 3; i++) {
+            IgniteEx node = grid(i);
+
+            IgniteInternalFuture<AffinityTopologyVersion> fut =
+                node.context().cache().context().exchange().affinityReadyFuture(nextVer);
+
+            if (fut != null)
+                fut.get();
+        }
+    }
+
+    /**
+     *
+     */
+    public void testSelectForUpdateDistributed() throws Exception {
+        doTestSelectForUpdateDistributed("Person", false);
+    }
+
+
+    /**
+     *
+     */
+    public void testSelectForUpdateLocal() throws Exception {
+        doTestSelectForUpdateLocal("Person", false);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testSelectForUpdateOutsideTx() throws Exception {
+        doTestSelectForUpdateDistributed("Person", true);
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testSelectForUpdateOutsideTxLocal() throws Exception {
+        doTestSelectForUpdateLocal("Person", true);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param outsideTx Whether select is executed outside transaction
+     * @throws Exception If failed.
+     */
+    void doTestSelectForUpdateLocal(String cacheName, boolean outsideTx) throws Exception {
+        Ignite node = grid(0);
+
+        IgniteCache<Integer, ?> cache = node.cache(cacheName);
+
+        Transaction ignored = outsideTx ? null : node.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+            TransactionIsolation.REPEATABLE_READ);
+
+        try {
+            SqlFieldsQuery qry = new SqlFieldsQuery("select id, * from " + tableName(cache) + " order by id for update")
+                .setLocal(true);
+
+            FieldsQueryCursor<List<?>> query = cache.query(qry);
+
+            List<List<?>> res = query.getAll();
+
+            List<Integer> keys = new ArrayList<>();
+
+            for (List<?> r : res)
+                keys.add((Integer)r.get(0));
+
+            checkLocks(cacheName, keys, !outsideTx);
+        }
+        finally {
+            U.close(ignored, log);
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param outsideTx Whether select is executed outside transaction
+     * @throws Exception If failed.
+     */
+    void doTestSelectForUpdateDistributed(String cacheName, boolean outsideTx) throws Exception {
+        Ignite node = grid(0);
+
+        IgniteCache<Integer, ?> cache = node.cache(cacheName);
+
+        Transaction ignored = outsideTx ? null : node.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+            TransactionIsolation.REPEATABLE_READ);
+
+        try {
+            SqlFieldsQuery qry = new SqlFieldsQuery("select id, * from " + tableName(cache) + " order by id for update")
+                .setPageSize(10);
+
+            FieldsQueryCursor<List<?>> query = cache.query(qry);
+
+            List<List<?>> res = query.getAll();
+
+            List<Integer> keys = new ArrayList<>();
+
+            for (List<?> r : res)
+                keys.add((Integer)r.get(0));
+
+            checkLocks(cacheName, keys, !outsideTx);
+        }
+        finally {
+            U.close(ignored, log);
+        }
+    }
+
+    /**
+     *
+     */
+    public void testSelectForUpdateWithUnion() {
+        assertQueryThrows("select id from person union select 1 for update",
+            "SELECT UNION FOR UPDATE is not supported.");
+    }
+
+    /**
+     *
+     */
+    public void testSelectForUpdateWithJoin() {
+        assertQueryThrows("select p1.id from person p1 join person p2 on p1.id = p2.id for update",
+            "SELECT FOR UPDATE with joins is not supported.");
+    }
+
+    /**
+     *
+     */
+    public void testSelectForUpdateWithLimit() {
+        assertQueryThrows("select id from person limit 0,5 for update",
+            "LIMIT/OFFSET clauses are not supported for SELECT FOR UPDATE.");
+    }
+
+    /**
+     *
+     */
+    public void testSelectForUpdateWithGroupings() {
+        assertQueryThrows("select count(*) from person for update",
+            "SELECT FOR UPDATE with aggregates and/or GROUP BY is not supported.");
+
+        assertQueryThrows("select lastName, count(*) from person group by lastName for update",
+            "SELECT FOR UPDATE with aggregates and/or GROUP BY is not supported.");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSelectForUpdateAfterAbortedTx() throws Exception {
+        assert disableScheduledVacuum;
+
+        Ignite node = grid(0);
+
+        IgniteCache<Integer, ?> cache = node.cache("Person");
+
+        List<List<?>> res;
+
+        try (Transaction tx = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+            TransactionIsolation.REPEATABLE_READ)) {
+
+            res = cache.query(new SqlFieldsQuery("update person set lastName=UPPER(lastName)")).getAll();
+
+            assertEquals((long)CACHE_SIZE, res.get(0).get(0));
+
+            tx.rollback();
+        }
+
+        try (Transaction tx = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+            TransactionIsolation.REPEATABLE_READ)) {
+
+            res = cache.query(new SqlFieldsQuery("select id, * from person order by id for update")).getAll();
+
+            assertEquals(CACHE_SIZE, res.size());
+
+            List<Integer> keys = new ArrayList<>();
+
+            for (List<?> r : res)
+                keys.add((Integer)r.get(0));
+
+            checkLocks("Person", keys, true);
+
+            tx.rollback();
+        }
+    }
+
+    /**
+     * Check that an attempt to get a lock on any key from given list fails by timeout.
+     *
+     * @param cacheName Cache name to check.
+     * @param keys Keys to check.
+     * @param locked Whether the key is locked
+     * @throws Exception if failed.
+     */
+    @SuppressWarnings({"ThrowableNotThrown", "unchecked"})
+    private void checkLocks(String cacheName, List<Integer> keys, boolean locked) throws Exception {
+        Ignite node = ignite(2);
+        IgniteCache cache = node.cache(cacheName);
+
+        List<IgniteInternalFuture<Integer>> calls = new ArrayList<>();
+
+        for (int key : keys) {
+            calls.add(GridTestUtils.runAsync(new Callable<Integer>() {
+                /** {@inheritDoc} */
+                @Override public Integer call() {
+                    try (Transaction ignored = node.transactions().txStart(TransactionConcurrency.PESSIMISTIC,
+                        TransactionIsolation.REPEATABLE_READ)) {
+                        List<List<?>> res = cache
+                            .query(
+                                new SqlFieldsQuery("select * from " + tableName(cache) +
+                                    " where id = " + key + " for update").setTimeout(1, TimeUnit.SECONDS)
+                            )
+                            .getAll();
+
+                        return (Integer)res.get(0).get(0);
+                    }
+                }
+            }));
+        }
+
+        for (IgniteInternalFuture fut : calls) {
+            if (!locked)
+                fut.get(TX_TIMEOUT);
+            else {
+                GridTestUtils.assertThrows(null, new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            return fut.get(TX_TIMEOUT);
+                        }
+                        catch (IgniteCheckedException e) {
+                            if (X.hasCause(e, CacheException.class))
+                                throw X.cause(e, CacheException.class);
+
+                            throw e;
+                        }
+                    }
+                }, CacheException.class, "IgniteTxTimeoutCheckedException");
+            }
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     * @return Name of the table contained by this cache.
+     */
+    @SuppressWarnings("unchecked")
+    private static String tableName(IgniteCache<?, ?> cache) {
+        return ((Collection<QueryEntity>)cache.getConfiguration(CacheConfiguration.class).getQueryEntities())
+            .iterator().next().getTableName();
+    }
+
+    /**
+     * Test that query throws exception with expected message.
+     * @param qry SQL.
+     * @param exMsg Expected message.
+     */
+    private void assertQueryThrows(String qry, String exMsg) {
+        assertQueryThrows(qry, exMsg, false);
+
+        assertQueryThrows(qry, exMsg, true);
+    }
+
+    /**
+     * Test that query throws exception with expected message.
+     * @param qry SQL.
+     * @param exMsg Expected message.
+     * @param loc Local query flag.
+     */
+    private void assertQueryThrows(String qry, String exMsg, boolean loc) {
+        Ignite node = grid(0);
+
+        GridTestUtils.assertThrows(null, new Callable<Object>() {
+            @Override public Object call() {
+                return node.cache("Person").query(new SqlFieldsQuery(qry).setLocal(loc)).getAll();
+            }
+        }, IgniteSQLException.class, exMsg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeWithConcurrentJdbcTransactionTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeWithConcurrentJdbcTransactionTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeWithConcurrentJdbcTransactionTest.java
new file mode 100644
index 0000000..437195f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSizeWithConcurrentJdbcTransactionTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+/**
+ *
+ */
+public class CacheMvccSizeWithConcurrentJdbcTransactionTest extends CacheMvccSizeWithConcurrentTransactionTest {
+    /** {@inheritDoc} */
+    @Override boolean jdbcTx() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
new file mode 100644
index 0000000..4ad667b9
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesAbstractTest.java
@@ -0,0 +1,1611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL_SUM;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * TODO IGNITE-6739: text/spatial indexes with mvcc.
+ * TODO IGNITE-6739: indexingSpi with mvcc.
+ * TODO IGNITE-6739: setQueryParallelism with mvcc.
+ * TODO IGNITE-6739: dynamic index create.
+ */
+@SuppressWarnings("unchecked")
+public abstract class CacheMvccSqlQueriesAbstractTest extends CacheMvccAbstractTest {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_SingleNode_SinglePartition() throws Exception {
+        accountsTxReadAll(1, 0, 0, 1,
+            new InitIndexing(Integer.class, MvccTestAccount.class), false, SQL, PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_WithRemoves_SingleNode_SinglePartition() throws Exception {
+        accountsTxReadAll(1, 0, 0, 1,
+            new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64,
+            new InitIndexing(Integer.class, MvccTestAccount.class), false, SQL, PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_SingleNode_Persistence() throws Exception {
+        persistence = true;
+
+        testAccountsTxSql_SingleNode();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSumSql_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64,
+            new InitIndexing(Integer.class, MvccTestAccount.class), false, SQL_SUM, PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_WithRemoves_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64,
+            new InitIndexing(Integer.class, MvccTestAccount.class), true, SQL, PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_WithRemoves_SingleNode_Persistence() throws Exception {
+        persistence = true;
+
+        testAccountsTxSql_WithRemoves_SingleNode();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_ClientServer_Backups2() throws Exception {
+        accountsTxReadAll(4, 2, 2, 64,
+            new InitIndexing(Integer.class, MvccTestAccount.class), false, SQL, PUT);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_SingleNode() throws Exception {
+        updateSingleValue(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_LocalQuery_SingleNode() throws Exception {
+        updateSingleValue(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_ClientServer() throws Exception {
+        updateSingleValue(false, false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @param locQry Local query flag.
+     * @throws Exception If failed.
+     */
+    private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception {
+        final int VALS = 100;
+
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final int INC_BY = 110;
+
+        final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                Map<Integer, MvccTestSqlIndexValue> vals = new HashMap<>();
+
+                for (int i = 0; i < VALS; i++)
+                    vals.put(i, new MvccTestSqlIndexValue(i));
+
+                cache.putAll(vals);
+            }
+        };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            Integer key = rnd.nextInt(VALS);
+
+                            cache.cache.invoke(key, new CacheEntryProcessor<Integer, MvccTestSqlIndexValue, Object>() {
+                                @Override public Object process(MutableEntry<Integer, MvccTestSqlIndexValue> e, Object... args) {
+                                    Integer key = e.getKey();
+
+                                    MvccTestSqlIndexValue val = e.getValue();
+
+                                    int newIdxVal;
+
+                                    if (val.idxVal1 < INC_BY) {
+                                        assertEquals(key.intValue(), val.idxVal1);
+
+                                        newIdxVal = val.idxVal1 + INC_BY;
+                                    }
+                                    else {
+                                        assertEquals(INC_BY + key, val.idxVal1);
+
+                                        newIdxVal = key;
+                                    }
+
+                                    e.setValue(new MvccTestSqlIndexValue(newIdxVal));
+
+                                    return null;
+                                }
+                            });
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> fieldsQrys = new ArrayList<>();
+
+                    fieldsQrys.add(
+                        new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=?").setLocal(locQry));
+
+                    fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=? or idxVal1=?").setLocal(locQry));
+
+                    fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?").setLocal(locQry));
+
+                    List<SqlQuery<Integer, MvccTestSqlIndexValue>> sqlQrys = new ArrayList<>();
+
+                    sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=?").setLocal(locQry));
+
+                    sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=? or idxVal1=?").setLocal(locQry));
+
+                    sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "_key=?").setLocal(locQry));
+
+                    while (!stop.get()) {
+                        Integer key = rnd.nextInt(VALS);
+
+                        int qryIdx = rnd.nextInt(3);
+
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        List<List<?>> res;
+
+                        try {
+                            if (rnd.nextBoolean()) {
+                                SqlFieldsQuery qry = fieldsQrys.get(qryIdx);
+
+                                if (qryIdx == 1)
+                                    qry.setArgs(key, key + INC_BY);
+                                else
+                                    qry.setArgs(key);
+
+                                res = cache.cache.query(qry).getAll();
+                            }
+                            else {
+                                SqlQuery<Integer, MvccTestSqlIndexValue> qry = sqlQrys.get(qryIdx);
+
+                                if (qryIdx == 1)
+                                    qry.setArgs(key, key + INC_BY);
+                                else
+                                    qry.setArgs(key);
+
+                                res = new ArrayList<>();
+
+                                for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.cache.query(qry).getAll()) {
+                                    List<Object> row = new ArrayList<>(2);
+
+                                    row.add(e.getKey());
+                                    row.add(e.getValue().idxVal1);
+
+                                    res.add(row);
+                                }
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        assertTrue(qryIdx == 0 || !res.isEmpty());
+
+                        if (!res.isEmpty()) {
+                            assertEquals(1, res.size());
+
+                            List<?> resVals = res.get(0);
+
+                            Integer key0 = (Integer)resVals.get(0);
+                            Integer val0 = (Integer)resVals.get(1);
+
+                            assertEquals(key, key0);
+                            assertTrue(val0.equals(key) || val0.equals(key + INC_BY));
+                        }
+                    }
+
+                    if (idx == 0) {
+                        SqlFieldsQuery qry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue");
+
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        List<List<?>> res;
+
+                        try {
+                            res = cache.cache.query(qry).getAll();
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        assertEquals(VALS, res.size());
+
+                        for (List<?> vals : res)
+                            info("Value: " + vals);
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            init,
+            writer,
+            reader);
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_SingleNode() throws Exception {
+        joinTransactional(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_ClientServer() throws Exception {
+        joinTransactional(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_DistributedJoins_ClientServer() throws Exception {
+        joinTransactional(false, true);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @param distributedJoin {@code True} to test distributed joins.
+     * @throws Exception If failed.
+     */
+    private void joinTransactional(boolean singleNode, final boolean distributedJoin) throws Exception {
+        final int KEYS = 100;
+
+        final int writers = 4;
+
+        final int readers = 4;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+                        try {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                Integer key = rnd.nextInt(KEYS);
+
+                                JoinTestChildKey childKey = new JoinTestChildKey(key);
+
+                                JoinTestChild child = (JoinTestChild)cache.cache.get(childKey);
+
+                                if (child == null) {
+                                    Integer parentKey = distributedJoin ? key + 100 : key;
+
+                                    child = new JoinTestChild(parentKey);
+
+                                    cache.cache.put(childKey, child);
+
+                                    JoinTestParent parent = new JoinTestParent(parentKey);
+
+                                    cache.cache.put(new JoinTestParentKey(parentKey), parent);
+                                }
+                                else {
+                                    cache.cache.remove(childKey);
+
+                                    cache.cache.remove(new JoinTestParentKey(child.parentId));
+                                }
+
+                                tx.commit();
+                            }
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id)").
+                        setDistributedJoins(distributedJoin));
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id = 10").
+                        setDistributedJoins(distributedJoin));
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id != 10").
+                        setDistributedJoins(distributedJoin));
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        try {
+                            for (SqlFieldsQuery qry : qrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                if (!res.isEmpty()) {
+                                    for (List<?> resRow : res) {
+                                        Integer parentId = (Integer)resRow.get(1);
+
+                                        assertNotNull(parentId);
+                                    }
+                                }
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    if (idx == 0) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        try {
+                            List<List<?>> res = cache.cache.query(qrys.get(0)).getAll();
+
+                            info("Reader finished, result: " + res);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(JoinTestParentKey.class, JoinTestParent.class,
+                JoinTestChildKey.class, JoinTestChild.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_DistributedJoins_ClientServer2() throws Exception {
+        final int KEYS = 100;
+
+        final int writers = 1;
+
+        final int readers = 4;
+
+        final int CHILDREN_CNT = 10;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+                        try {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                Integer key = rnd.nextInt(KEYS);
+
+                                JoinTestParentKey parentKey = new JoinTestParentKey(key);
+
+                                JoinTestParent parent = (JoinTestParent)cache.cache.get(parentKey);
+
+                                if (parent == null) {
+                                    for (int i = 0; i < CHILDREN_CNT; i++)
+                                        cache.cache.put(new JoinTestChildKey(key * 10_000 + i), new JoinTestChild(key));
+
+                                    cache.cache.put(parentKey, new JoinTestParent(key));
+                                }
+                                else {
+                                    for (int i = 0; i < CHILDREN_CNT; i++)
+                                        cache.cache.remove(new JoinTestChildKey(key * 10_000 + i));
+
+                                    cache.cache.remove(parentKey);
+                                }
+
+                                tx.commit();
+                            }
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id=?").
+                        setDistributedJoins(true);
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        qry.setArgs(rnd.nextInt(KEYS));
+
+                        try {
+                            List<List<?>> res = cache.cache.query(qry).getAll();
+
+                            if (!res.isEmpty())
+                                assertEquals(CHILDREN_CNT, res.size());
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Reader finished, read count: " + cnt);
+                }
+            };
+
+        readWriteTest(
+            null,
+            4,
+            2,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(JoinTestParentKey.class, JoinTestParent.class,
+                JoinTestChildKey.class, JoinTestChild.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinSimple() throws Exception {
+        startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        int[] backups = {0, 1, 2};
+
+        for (int b : backups) {
+            IgniteCache<Object, Object> cache = srv0.createCache(
+                cacheConfiguration(cacheMode(), FULL_SYNC, b, DFLT_PARTITION_COUNT).
+                    setIndexedTypes(JoinTestParentKey.class, JoinTestParent.class, JoinTestChildKey.class, JoinTestChild.class));
+
+            int cntr = 0;
+
+            int expCnt = 0;
+
+            for (int i = 0; i < 10; i++) {
+                JoinTestParentKey parentKey = new JoinTestParentKey(i);
+
+                cache.put(parentKey, new JoinTestParent(i));
+
+                for (int c = 0; c < i; c++) {
+                    JoinTestChildKey childKey = new JoinTestChildKey(cntr++);
+
+                    cache.put(childKey, new JoinTestChild(i));
+
+                    expCnt++;
+                }
+            }
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " +
+                "JoinTestChild c join JoinTestParent p on (c.parentId = p.id)").
+                setDistributedJoins(true);
+
+            Map<Integer, Integer> resMap = new HashMap<>();
+
+            List<List<?>> res = cache.query(qry).getAll();
+
+            assertEquals(expCnt, res.size());
+
+            for (List<?> resRow : res) {
+                Integer parentId = (Integer)resRow.get(0);
+
+                Integer cnt = resMap.get(parentId);
+
+                if (cnt == null)
+                    resMap.put(parentId, 1);
+                else
+                    resMap.put(parentId, cnt + 1);
+            }
+
+            for (int i = 1; i < 10; i++)
+                assertEquals(i, (Object)resMap.get(i));
+
+            srv0.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheRecreate() throws Exception {
+        cacheRecreate(new InitIndexing(Integer.class, MvccTestAccount.class));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheRecreateChangeIndexedType() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        final int PARTS = 64;
+
+        {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, PARTS).
+                setIndexedTypes(Integer.class, MvccTestAccount.class);
+
+            IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg);
+
+            for (int k = 0; k < PARTS * 2; k++) {
+                assertNull(cache.get(k));
+
+                int vals = k % 3 + 1;
+
+                for (int v = 0; v < vals; v++)
+                    cache.put(k, new MvccTestAccount(v, 1));
+
+                assertEquals(vals - 1, cache.get(k).val);
+            }
+
+            assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestAccount.class, "true")).getAll().size());
+
+            srv0.destroyCache(cache.getName());
+        }
+
+        {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, PARTS).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class);
+
+            IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(ccfg);
+
+            for (int k = 0; k < PARTS * 2; k++) {
+                assertNull(cache.get(k));
+
+                int vals = k % 3 + 1;
+
+                for (int v = 0; v < vals; v++)
+                    cache.put(k, new MvccTestSqlIndexValue(v));
+
+                assertEquals(vals - 1, cache.get(k).idxVal1);
+            }
+
+            assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true")).getAll().size());
+
+            srv0.destroyCache(cache.getName());
+        }
+
+        {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, PARTS).
+                setIndexedTypes(Long.class, Long.class);
+
+            IgniteCache<Long, Long> cache = (IgniteCache)srv0.createCache(ccfg);
+
+            for (int k = 0; k < PARTS * 2; k++) {
+                assertNull(cache.get((long)k));
+
+                int vals = k % 3 + 1;
+
+                for (int v = 0; v < vals; v++)
+                    cache.put((long)k, (long)v);
+
+                assertEquals((long)(vals - 1), (Object)cache.get((long)k));
+            }
+
+            assertEquals(PARTS * 2, cache.query(new SqlQuery<>(Long.class, "true")).getAll().size());
+
+            srv0.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testChangeValueType1() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+            setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class);
+
+        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+        cache.put(1, new MvccTestSqlIndexValue(1));
+        cache.put(1, new MvccTestSqlIndexValue(2));
+
+        checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2);
+
+        cache.put(1, 1);
+
+        assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1);
+
+        cache.put(1, 2);
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testChangeValueType2() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+            setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class);
+
+        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+        cache.put(1, new MvccTestSqlIndexValue(1));
+        cache.put(1, new MvccTestSqlIndexValue(2));
+
+        checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2);
+
+        cache.remove(1);
+
+        assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+        cache.put(1, 1);
+
+        assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1);
+
+        cache.put(1, 2);
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCountTransactional_SingleNode() throws Exception {
+        countTransactional(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCountTransactional_ClientServer() throws Exception {
+        countTransactional(false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @throws Exception If failed.
+     */
+    private void countTransactional(boolean singleNode) throws Exception {
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final int THREAD_KEY_RANGE = 100;
+
+        final int VAL_RANGE = 10;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int min = idx * THREAD_KEY_RANGE;
+                    int max = min + THREAD_KEY_RANGE;
+
+                    info("Thread range [min=" + min + ", max=" + max + ']');
+
+                    int cnt = 0;
+
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            // Add or remove 10 keys.
+                            if (!keys.isEmpty() && (keys.size() == THREAD_KEY_RANGE || rnd.nextInt(3) == 0)) {
+                                Set<Integer> rmvKeys = new HashSet<>();
+
+                                for (Integer key : keys) {
+                                    rmvKeys.add(key);
+
+                                    if (rmvKeys.size() == 10)
+                                        break;
+                                }
+
+                                assertEquals(10, rmvKeys.size());
+
+                                cache.cache.removeAll(rmvKeys);
+
+                                keys.removeAll(rmvKeys);
+                            }
+                            else {
+                                TreeMap<Integer, MvccTestSqlIndexValue> map = new TreeMap<>();
+
+                                while (map.size() != 10) {
+                                    Integer key = rnd.nextInt(min, max);
+
+                                    if (keys.add(key))
+                                        map.put(key, new MvccTestSqlIndexValue(rnd.nextInt(VAL_RANGE)));
+                                }
+
+                                assertEquals(10, map.size());
+
+                                cache.cache.putAll(map);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+                    qrys.add(new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue"));
+
+                    qrys.add(new SqlFieldsQuery(
+                        "select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 <= " + VAL_RANGE));
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            for (SqlFieldsQuery qry : qrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                assertEquals(1, res.size());
+
+                                Long cnt = (Long)res.get(0).get(0);
+
+                                assertTrue(cnt % 10 == 0);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMaxMinTransactional_SingleNode() throws Exception {
+        maxMinTransactional(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMaxMinTransactional_ClientServer() throws Exception {
+        maxMinTransactional(false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @throws Exception If failed.
+     */
+    private void maxMinTransactional(boolean singleNode) throws Exception {
+        final int writers = 1;
+
+        final int readers = 1;
+
+        final int THREAD_OPS = 10;
+
+        final int OP_RANGE = 10;
+
+        final int THREAD_KEY_RANGE = OP_RANGE * THREAD_OPS;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int min = idx * THREAD_KEY_RANGE;
+
+                    info("Thread range [start=" + min + ']');
+
+                    int cnt = 0;
+
+                    boolean add = true;
+
+                    int op = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            int startKey = min + op * OP_RANGE;
+
+                            if (add) {
+                                Map<Integer, MvccTestSqlIndexValue> vals = new HashMap<>();
+
+                                for (int i = 0; i < 10; i++) {
+                                    Integer key = startKey + i + 1;
+
+                                    vals.put(key, new MvccTestSqlIndexValue(key));
+                                }
+
+                                cache.cache.putAll(vals);
+
+                                // info("put " + vals.keySet());
+                            }
+                            else {
+                                Set<Integer> rmvKeys = new HashSet<>();
+
+                                for (int i = 0; i < 10; i++)
+                                    rmvKeys.add(startKey + i + 1);
+
+                                cache.cache.removeAll(rmvKeys);
+
+                                // info("remove " + rmvKeys);
+                            }
+
+                            if (++op == THREAD_OPS) {
+                                add = !add;
+
+                                op = 0;
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> maxQrys = new ArrayList<>();
+                    List<SqlFieldsQuery> minQrys = new ArrayList<>();
+
+                    maxQrys.add(new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue"));
+                    maxQrys.add(new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 >= 0"));
+
+                    minQrys.add(new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue"));
+                    minQrys.add(new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 >= 0"));
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            for (SqlFieldsQuery qry : maxQrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                assertEquals(1, res.size());
+
+                                Integer m = (Integer)res.get(0).get(0);
+
+                                assertTrue(m == null || m % 10 == 0);
+                            }
+
+                            for (SqlFieldsQuery qry : minQrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                assertEquals(1, res.size());
+
+                                Integer m = (Integer)res.get(0).get(0);
+
+                                assertTrue(m == null || m % 10 == 1);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlQueriesWithMvcc() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(
+            cacheConfiguration(cacheMode(), FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class));
+
+        for (int i = 0; i < 10; i++)
+            cache.put(i, new MvccTestSqlIndexValue(i));
+
+        sqlQueriesWithMvcc(cache, true);
+
+        sqlQueriesWithMvcc(cache, false);
+
+        // TODO IGNITE-8031
+//        startGrid(1);
+//
+//        awaitPartitionMapExchange();
+//
+//        sqlQueriesWithMvcc(cache, false);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param loc Local query flag.
+     */
+    private void sqlQueriesWithMvcc(IgniteCache<Integer, MvccTestSqlIndexValue> cache, boolean loc) {
+        assertEquals(10,
+            cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true").setLocal(loc)).getAll().size());
+
+        assertEquals(10,
+            cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue").setLocal(loc)).getAll().size());
+
+        assertEquals(10,
+            cache.query(new SqlFieldsQuery("" +
+                "select (select count (*) from MvccTestSqlIndexValue where idxVal1 = t1.idxVal1) as c1," +
+                " (select 0 from dual) as c2" +
+                " from MvccTestSqlIndexValue as t1" +
+                " join (select * from MvccTestSqlIndexValue) as t2 on t1.idxVal1 = t2.idxVal1").setLocal(loc)).getAll().size());
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 9);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 0").setLocal(loc), 9);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 4);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 0);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 100").setLocal(loc), 0);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 0);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 5").setLocal(loc), 6);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue").setLocal(loc), 10L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0").setLocal(loc), 10L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 < 100").setLocal(loc), 10L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >0 and idxVal1 < 5").setLocal(loc), 4L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 1").setLocal(loc), 9L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 > 100").setLocal(loc), 0L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 = 1").setLocal(loc), 1L);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param qry Query.
+     * @param exp Expected value.
+     */
+    private void checkSingleResult(IgniteCache cache, SqlFieldsQuery qry, Object exp) {
+        List<List<?>> res = cache.query(qry).getAll();
+
+        assertEquals(1, res.size());
+
+        List<?> row = res.get(0);
+
+        assertEquals(1, row.size());
+
+        assertEquals(exp, row.get(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlSimple() throws Exception {
+        startGrid(0);
+
+        for (int i = 0; i < 4; i++)
+            sqlSimple(i * 512);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 5; i++)
+            sqlSimple(rnd.nextInt(2048));
+    }
+
+    /**
+     * @param inlineSize Inline size.
+     * @throws Exception If failed.
+     */
+    private void sqlSimple(int inlineSize) throws Exception {
+        Ignite srv0 = ignite(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache =  (IgniteCache)srv0.createCache(
+            cacheConfiguration(cacheMode(), FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+                setSqlIndexMaxInlineSize(inlineSize));
+
+        Map<Integer, Integer> expVals = new HashMap<>();
+
+        checkValues(expVals, cache);
+
+        cache.put(1, new MvccTestSqlIndexValue(1));
+        expVals.put(1, 1);
+
+        checkValues(expVals, cache);
+
+        cache.put(1, new MvccTestSqlIndexValue(2));
+        expVals.put(1, 2);
+
+        checkValues(expVals, cache);
+
+        cache.put(2, new MvccTestSqlIndexValue(1));
+        expVals.put(2, 1);
+        cache.put(3, new MvccTestSqlIndexValue(1));
+        expVals.put(3, 1);
+        cache.put(4, new MvccTestSqlIndexValue(1));
+        expVals.put(4, 1);
+
+        checkValues(expVals, cache);
+
+        cache.remove(1);
+        expVals.remove(1);
+
+        checkValues(expVals, cache);
+
+        checkNoValue(1, cache);
+
+        cache.put(1, new MvccTestSqlIndexValue(10));
+        expVals.put(1, 10);
+
+        checkValues(expVals, cache);
+
+        checkActiveQueriesCleanup(srv0);
+
+        srv0.destroyCache(cache.getName());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlSimplePutRemoveRandom() throws Exception {
+        startGrid(0);
+
+        testSqlSimplePutRemoveRandom(0);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 3; i++)
+            testSqlSimplePutRemoveRandom(rnd.nextInt(2048));
+    }
+
+    /**
+     * @param inlineSize Inline size.
+     * @throws Exception If failed.
+     */
+    private void testSqlSimplePutRemoveRandom(int inlineSize) throws Exception {
+        Ignite srv0 = grid(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache) srv0.createCache(
+            cacheConfiguration(cacheMode(), FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+                setSqlIndexMaxInlineSize(inlineSize));
+
+        Map<Integer, Integer> expVals = new HashMap<>();
+
+        final int KEYS = 100;
+        final int VALS = 10;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        long stopTime = System.currentTimeMillis() + 5_000;
+
+        for (int i = 0; i < 100_000; i++) {
+            Integer key = rnd.nextInt(KEYS);
+
+            if (rnd.nextInt(5) == 0) {
+                cache.remove(key);
+
+                expVals.remove(key);
+            }
+            else {
+                Integer val = rnd.nextInt(VALS);
+
+                cache.put(key, new MvccTestSqlIndexValue(val));
+
+                expVals.put(key, val);
+            }
+
+            checkValues(expVals, cache);
+
+            if (System.currentTimeMillis() > stopTime) {
+                info("Stop test, iteration: " + i);
+
+                break;
+            }
+        }
+
+        for (int i = 0; i < KEYS; i++) {
+            if (!expVals.containsKey(i))
+                checkNoValue(i, cache);
+        }
+
+        checkActiveQueriesCleanup(srv0);
+
+        srv0.destroyCache(cache.getName());
+    }
+
+    /**
+     * @param key Key.
+     * @param cache Cache.
+     */
+    private void checkNoValue(Object key, IgniteCache cache) {
+        SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+        qry.setArgs(key);
+
+        List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll();
+
+        assertTrue(res.isEmpty());
+    }
+
+    /**
+     * @param expVals Expected values.
+     * @param cache Cache.
+     */
+    private void checkValues(Map<Integer, Integer> expVals, IgniteCache<Integer, MvccTestSqlIndexValue> cache) {
+        SqlFieldsQuery cntQry = new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue");
+
+        Long cnt = (Long)cache.query(cntQry).getAll().get(0).get(0);
+
+        assertEquals((long)expVals.size(), (Object)cnt);
+
+        SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "true");
+
+        Map<Integer, Integer> vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key >= 0");
+
+        vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 >= 0");
+
+        vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        Map<Integer, Set<Integer>> expIdxVals = new HashMap<>();
+
+        for (Map.Entry<Integer, Integer> e : expVals.entrySet()) {
+            qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+            qry.setArgs(e.getKey());
+
+            List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll();
+
+            assertEquals(1, res.size());
+            assertEquals(e.getKey(), res.get(0).getKey());
+            assertEquals(e.getValue(), (Integer)res.get(0).getValue().idxVal1);
+
+            SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?");
+            fieldsQry.setArgs(e.getKey());
+
+            List<List<?>> fieldsRes = cache.query(fieldsQry).getAll();
+
+            assertEquals(1, fieldsRes.size());
+            assertEquals(e.getKey(), fieldsRes.get(0).get(0));
+            assertEquals(e.getValue(), fieldsRes.get(0).get(1));
+
+            Integer val = e.getValue();
+
+            Set<Integer> keys = expIdxVals.get(val);
+
+            if (keys == null)
+                expIdxVals.put(val, keys = new HashSet<>());
+
+            assertTrue(keys.add(e.getKey()));
+        }
+
+        for (Map.Entry<Integer, Set<Integer>> expE : expIdxVals.entrySet()) {
+            qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 = ?");
+            qry.setArgs(expE.getKey());
+
+            vals = new HashMap<>();
+
+            for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll()) {
+                assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+                assertEquals(expE.getKey(), (Integer)e.getValue().idxVal1);
+
+                assertTrue(expE.getValue().contains(e.getKey()));
+            }
+
+            assertEquals(expE.getValue().size(), vals.size());
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestParentKey implements Serializable {
+        /** */
+        private int key;
+
+        /**
+         * @param key Key.
+         */
+        JoinTestParentKey(int key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            JoinTestParentKey that = (JoinTestParentKey)o;
+
+            return key == that.key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key;
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestParent {
+        /** */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /**
+         * @param id ID.
+         */
+        JoinTestParent(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(JoinTestParent.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestChildKey implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private int key;
+
+        /**
+         * @param key Key.
+         */
+        JoinTestChildKey(int key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            JoinTestChildKey that = (JoinTestChildKey)o;
+
+            return key == that.key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key;
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestChild {
+        /** */
+        @QuerySqlField(index = true)
+        private int parentId;
+
+        /**
+         * @param parentId Parent ID.
+         */
+        JoinTestChild(int parentId) {
+            this.parentId = parentId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(JoinTestChild.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class MvccTestSqlIndexValue implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private int idxVal1;
+
+        /**
+         * @param idxVal1 Indexed value 1.
+         */
+        MvccTestSqlIndexValue(int idxVal1) {
+            this.idxVal1 = idxVal1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MvccTestSqlIndexValue.class, this);
+        }
+    }
+}