You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/25 12:16:04 UTC

[1/4] ignite git commit: ignite-3478 Mvcc support for sql indexes

Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 00bd4794a -> 6150f3a0a


http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
index 7ba1b32..e77a3f1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
@@ -17,11 +17,42 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.util.lang.GridInClosure3;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
 /**
- *
+ * TODO IGNITE-3478: text/spatial indexes with mvcc.
+ * TODO IGNITE-3478: indexingSpi with mvcc.
+ * TODO IGNITE-3478: setQueryParallelism with mvcc.
+ * TODO IGNITE-3478: dynamic index create.
  */
 @SuppressWarnings("unchecked")
 public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
@@ -29,11 +60,1538 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
      * @throws Exception If failed.
      */
     public void testAccountsTxSql_SingleNode_SinglePartition() throws Exception {
-        accountsTxReadAll(1, 0, 0, 1, new IgniteInClosure<CacheConfiguration>() {
-            @Override public void apply(CacheConfiguration ccfg) {
-                ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class);
+        accountsTxReadAll(1, 0, 0, 1, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_WithRemoves_SingleNode_SinglePartition() throws Exception {
+        accountsTxReadAll(1, 0, 0, 1, new InitIndexing(Integer.class, MvccTestAccount.class), true, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSumSql_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_SUM);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_WithRemoves_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), true, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_ClientServer_Backups2() throws Exception {
+        accountsTxReadAll(4, 2, 2, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_SingleNode() throws Exception {
+        updateSingleValue(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_LocalQuery_SingleNode() throws Exception {
+        updateSingleValue(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_ClientServer() throws Exception {
+        updateSingleValue(false, false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @param locQry Local query flag.
+     * @throws Exception If failed.
+     */
+    private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception {
+        final int VALS = 100;
+
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final int INC_BY = 110;
+
+        final IgniteInClosure<IgniteCache<Object, Object>> init = new IgniteInClosure<IgniteCache<Object, Object>>() {
+            @Override public void apply(IgniteCache<Object, Object> cache) {
+                Map<Integer, MvccTestSqlIndexValue> vals = new HashMap<>();
+
+                for (int i = 0; i < VALS; i++)
+                    vals.put(i, new MvccTestSqlIndexValue(i));
+
+                cache.putAll(vals);
+            }
+        };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            Integer key = rnd.nextInt(VALS);
+
+                            cache.cache.invoke(key, new CacheEntryProcessor<Integer, MvccTestSqlIndexValue, Object>() {
+                                @Override public Object process(MutableEntry<Integer, MvccTestSqlIndexValue> e, Object... args) {
+                                    Integer key = e.getKey();
+
+                                    MvccTestSqlIndexValue val = e.getValue();
+
+                                    int newIdxVal;
+
+                                    if (val.idxVal1 < INC_BY) {
+                                        assertEquals(key.intValue(), val.idxVal1);
+
+                                        newIdxVal = val.idxVal1 + INC_BY;
+                                    }
+                                    else {
+                                        assertEquals(INC_BY + key, val.idxVal1);
+
+                                        newIdxVal = key;
+                                    }
+
+                                    e.setValue(new MvccTestSqlIndexValue(newIdxVal));
+
+                                    return null;
+                                }
+                            });
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> fieldsQrys = new ArrayList<>();
+
+                    fieldsQrys.add(
+                        new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=?").setLocal(locQry));
+
+                    fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=? or idxVal1=?").setLocal(locQry));
+
+                    fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?").setLocal(locQry));
+
+                    List<SqlQuery<Integer, MvccTestSqlIndexValue>> sqlQrys = new ArrayList<>();
+
+                    sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=?").setLocal(locQry));
+
+                    sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=? or idxVal1=?").setLocal(locQry));
+
+                    sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "_key=?").setLocal(locQry));
+
+                    while (!stop.get()) {
+                        Integer key = rnd.nextInt(VALS);
+
+                        int qryIdx = rnd.nextInt(3);
+
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        List<List<?>> res;
+
+                        try {
+                            if (rnd.nextBoolean()) {
+                                SqlFieldsQuery qry = fieldsQrys.get(qryIdx);
+
+                                if (qryIdx == 1)
+                                    qry.setArgs(key, key + INC_BY);
+                                else
+                                    qry.setArgs(key);
+
+                                res = cache.cache.query(qry).getAll();
+                            }
+                            else {
+                                SqlQuery<Integer, MvccTestSqlIndexValue> qry = sqlQrys.get(qryIdx);
+
+                                if (qryIdx == 1)
+                                    qry.setArgs(key, key + INC_BY);
+                                else
+                                    qry.setArgs(key);
+
+                                res = new ArrayList<>();
+
+                                for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.cache.query(qry).getAll()) {
+                                    List<Object> row = new ArrayList<>(2);
+
+                                    row.add(e.getKey());
+                                    row.add(e.getValue().idxVal1);
+
+                                    res.add(row);
+                                }
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        assertTrue(qryIdx == 0 || !res.isEmpty());
+
+                        if (!res.isEmpty()) {
+                            assertEquals(1, res.size());
+
+                            List<?> resVals = res.get(0);
+
+                            Integer key0 = (Integer)resVals.get(0);
+                            Integer val0 = (Integer)resVals.get(1);
+
+                            assertEquals(key, key0);
+                            assertTrue(val0.equals(key) || val0.equals(key + INC_BY));
+                        }
+                    }
+
+                    if (idx == 0) {
+                        SqlFieldsQuery qry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue");
+
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        List<List<?>> res;
+
+                        try {
+                            res = cache.cache.query(qry).getAll();
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        assertEquals(VALS, res.size());
+
+                        for (List<?> vals : res)
+                            info("Value: " + vals);
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            init,
+            writer,
+            reader);
+
+        for (Ignite node : G.allGrids())
+            checkActiveQueriesCleanup(node);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_SingleNode() throws Exception {
+        joinTransactional(true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_ClientServer() throws Exception {
+        joinTransactional(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_DistributedJoins_ClientServer() throws Exception {
+        joinTransactional(false, true);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @param distributedJoin {@code True} to test distributed joins.
+     * @throws Exception If failed.
+     */
+    private void joinTransactional(boolean singleNode, final boolean distributedJoin) throws Exception {
+        final int KEYS = 100;
+
+        final int writers = 4;
+
+        final int readers = 4;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+                        try {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                Integer key = rnd.nextInt(KEYS);
+
+                                JoinTestChildKey childKey = new JoinTestChildKey(key);
+
+                                JoinTestChild child = (JoinTestChild)cache.cache.get(childKey);
+
+                                if (child == null) {
+                                    Integer parentKey = distributedJoin ? key + 100 : key;
+
+                                    child = new JoinTestChild(parentKey);
+
+                                    cache.cache.put(childKey, child);
+
+                                    JoinTestParent parent = new JoinTestParent(parentKey);
+
+                                    cache.cache.put(new JoinTestParentKey(parentKey), parent);
+                                }
+                                else {
+                                    cache.cache.remove(childKey);
+
+                                    cache.cache.remove(new JoinTestParentKey(child.parentId));
+                                }
+
+                                tx.commit();
+                            }
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id)").
+                        setDistributedJoins(distributedJoin));
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id = 10").
+                        setDistributedJoins(distributedJoin));
+
+                    qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id != 10").
+                        setDistributedJoins(distributedJoin));
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        try {
+                            for (SqlFieldsQuery qry : qrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                if (!res.isEmpty()) {
+                                    for (List<?> resRow : res) {
+                                        Integer parentId = (Integer)resRow.get(1);
+
+                                        assertNotNull(parentId);
+                                    }
+                                }
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    if (idx == 0) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        try {
+                            List<List<?>> res = cache.cache.query(qrys.get(0)).getAll();
+
+                            info("Reader finished, result: " + res);
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(JoinTestParentKey.class, JoinTestParent.class,
+                JoinTestChildKey.class, JoinTestChild.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testJoinTransactional_DistributedJoins_ClientServer2() throws Exception {
+        final int KEYS = 100;
+
+        final int writers = 1;
+
+        final int readers = 4;
+
+        final int CHILDREN_CNT = 10;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions();
+
+                        try {
+                            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                                Integer key = rnd.nextInt(KEYS);
+
+                                JoinTestParentKey parentKey = new JoinTestParentKey(key);
+
+                                JoinTestParent parent = (JoinTestParent)cache.cache.get(parentKey);
+
+                                if (parent == null) {
+                                    for (int i = 0; i < CHILDREN_CNT; i++)
+                                        cache.cache.put(new JoinTestChildKey(key * 10_000 + i), new JoinTestChild(key));
+
+                                    cache.cache.put(parentKey, new JoinTestParent(key));
+                                }
+                                else {
+                                    for (int i = 0; i < CHILDREN_CNT; i++)
+                                        cache.cache.remove(new JoinTestChildKey(key * 10_000 + i));
+
+                                    cache.cache.remove(parentKey);
+                                }
+
+                                tx.commit();
+                            }
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " +
+                        "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id=?").
+                        setDistributedJoins(true);
+
+                    int cnt = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Object, Object> cache = randomCache(caches, rnd);
+
+                        qry.setArgs(rnd.nextInt(KEYS));
+
+                        try {
+                            List<List<?>> res = cache.cache.query(qry).getAll();
+
+                            if (!res.isEmpty())
+                                assertEquals(CHILDREN_CNT, res.size());
+
+                            cnt++;
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Reader finished, read count: " + cnt);
+                }
+            };
+
+        readWriteTest(
+            null,
+            4,
+            2,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(JoinTestParentKey.class, JoinTestParent.class,
+                JoinTestChildKey.class, JoinTestChild.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDistributedJoinSimple() throws Exception {
+        startGridsMultiThreaded(4);
+
+        Ignite srv0 = ignite(0);
+
+        int[] backups = {0, 1, 2};
+
+        for (int b : backups) {
+            IgniteCache<Object, Object> cache = srv0.createCache(
+                cacheConfiguration(PARTITIONED, FULL_SYNC, b, DFLT_PARTITION_COUNT).
+                    setIndexedTypes(JoinTestParentKey.class, JoinTestParent.class, JoinTestChildKey.class, JoinTestChild.class));
+
+            int cntr = 0;
+
+            int expCnt = 0;
+
+            for (int i = 0; i < 10; i++) {
+                JoinTestParentKey parentKey = new JoinTestParentKey(i);
+
+                cache.put(parentKey, new JoinTestParent(i));
+
+                for (int c = 0; c < i; c++) {
+                    JoinTestChildKey childKey = new JoinTestChildKey(cntr++);
+
+                    cache.put(childKey, new JoinTestChild(i));
+
+                    expCnt++;
+                }
+            }
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " +
+                "JoinTestChild c join JoinTestParent p on (c.parentId = p.id)").
+                setDistributedJoins(true);
+
+            Map<Integer, Integer> resMap = new HashMap<>();
+
+            List<List<?>> res = cache.query(qry).getAll();
+
+            assertEquals(expCnt, res.size());
+
+            for (List<?> resRow : res) {
+                Integer parentId = (Integer)resRow.get(0);
+
+                Integer cnt = resMap.get(parentId);
+
+                if (cnt == null)
+                    resMap.put(parentId, 1);
+                else
+                    resMap.put(parentId, cnt + 1);
+            }
+
+            for (int i = 1; i < 10; i++)
+                assertEquals(i, (Object)resMap.get(i));
+
+            srv0.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheRecreate() throws Exception {
+        cacheRecreate(new InitIndexing(Integer.class, MvccTestAccount.class));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheRecreateChangeIndexedType() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        final int PARTS = 64;
+
+        {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS).
+                setIndexedTypes(Integer.class, MvccTestAccount.class);
+
+            IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg);
+
+            for (int k = 0; k < PARTS * 2; k++) {
+                assertNull(cache.get(k));
+
+                int vals = k % 3 + 1;
+
+                for (int v = 0; v < vals; v++)
+                    cache.put(k, new MvccTestAccount(v, 1));
+
+                assertEquals(vals - 1, cache.get(k).val);
+            }
+
+            assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestAccount.class, "true")).getAll().size());
+
+            srv0.destroyCache(cache.getName());
+        }
+
+        {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class);
+
+            IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(ccfg);
+
+            for (int k = 0; k < PARTS * 2; k++) {
+                assertNull(cache.get(k));
+
+                int vals = k % 3 + 1;
+
+                for (int v = 0; v < vals; v++)
+                    cache.put(k, new MvccTestSqlIndexValue(v));
+
+                assertEquals(vals - 1, cache.get(k).idxVal1);
+            }
+
+            assertEquals(PARTS * 2, cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true")).getAll().size());
+
+            srv0.destroyCache(cache.getName());
+        }
+
+        {
+            CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS).
+                setIndexedTypes(Long.class, Long.class);
+
+            IgniteCache<Long, Long> cache = (IgniteCache)srv0.createCache(ccfg);
+
+            for (int k = 0; k < PARTS * 2; k++) {
+                assertNull(cache.get((long)k));
+
+                int vals = k % 3 + 1;
+
+                for (int v = 0; v < vals; v++)
+                    cache.put((long)k, (long)v);
+
+                assertEquals((long)(vals - 1), (Object)cache.get((long)k));
             }
-        }, false, ReadMode.SQL_ALL);
+
+            assertEquals(PARTS * 2, cache.query(new SqlQuery<>(Long.class, "true")).getAll().size());
+
+            srv0.destroyCache(cache.getName());
+        }
     }
 
+    /**
+     * @throws Exception If failed.
+     */
+    public void testChangeValueType1() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+            setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class);
+
+        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+        cache.put(1, new MvccTestSqlIndexValue(1));
+        cache.put(1, new MvccTestSqlIndexValue(2));
+
+        checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2);
+
+        cache.put(1, 1);
+
+        assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1);
+
+        cache.put(1, 2);
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testChangeValueType2() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+            setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class, Integer.class, Integer.class);
+
+        IgniteCache<Object, Object> cache = srv0.createCache(ccfg);
+
+        cache.put(1, new MvccTestSqlIndexValue(1));
+        cache.put(1, new MvccTestSqlIndexValue(2));
+
+        checkSingleResult(cache, new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue"), 2);
+
+        cache.remove(1);
+
+        assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+        cache.put(1, 1);
+
+        assertEquals(0, cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue")).getAll().size());
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 1);
+
+        cache.put(1, 2);
+
+        checkSingleResult(cache, new SqlFieldsQuery("select _val from Integer"), 2);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCountTransactional_SingleNode() throws Exception {
+      countTransactional(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCountTransactional_ClientServer() throws Exception {
+        countTransactional(false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @throws Exception If failed.
+     */
+    private void countTransactional(boolean singleNode) throws Exception {
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final int THREAD_KEY_RANGE = 100;
+
+        final int VAL_RANGE = 10;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int min = idx * THREAD_KEY_RANGE;
+                    int max = min + THREAD_KEY_RANGE;
+
+                    info("Thread range [min=" + min + ", max=" + max + ']');
+
+                    int cnt = 0;
+
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            // Add or remove 10 keys.
+                            if (!keys.isEmpty() && (keys.size() == THREAD_KEY_RANGE || rnd.nextInt(3) == 0 )) {
+                                Set<Integer> rmvKeys = new HashSet<>();
+
+                                for (Integer key : keys) {
+                                    rmvKeys.add(key);
+
+                                    if (rmvKeys.size() == 10)
+                                        break;
+                                }
+
+                                assertEquals(10, rmvKeys.size());
+
+                                cache.cache.removeAll(rmvKeys);
+
+                                keys.removeAll(rmvKeys);
+                            }
+                            else {
+                                TreeMap<Integer, MvccTestSqlIndexValue> map = new TreeMap<>();
+
+                                while (map.size() != 10) {
+                                    Integer key = rnd.nextInt(min, max);
+
+                                    if (keys.add(key))
+                                        map.put(key, new MvccTestSqlIndexValue(rnd.nextInt(VAL_RANGE)));
+                                }
+
+                                assertEquals(10, map.size());
+
+                                cache.cache.putAll(map);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+                    qrys.add(new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue"));
+
+                    qrys.add(new SqlFieldsQuery(
+                        "select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 <= " + VAL_RANGE));
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            for (SqlFieldsQuery qry : qrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                assertEquals(1, res.size());
+
+                                Long cnt = (Long)res.get(0).get(0);
+
+                                assertTrue(cnt % 10 == 0);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMaxMinTransactional_SingleNode() throws Exception {
+        maxMinTransactional(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMaxMinTransactional_ClientServer() throws Exception {
+        maxMinTransactional(false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @throws Exception If failed.
+     */
+    private void maxMinTransactional(boolean singleNode) throws Exception {
+        final int writers = 1;
+
+        final int readers = 1;
+
+        final int THREAD_OPS = 10;
+
+        final int OP_RANGE = 10;
+
+        final int THREAD_KEY_RANGE = OP_RANGE * THREAD_OPS;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int min = idx * THREAD_KEY_RANGE;
+
+                    info("Thread range [start=" + min + ']');
+
+                    int cnt = 0;
+
+                    boolean add = true;
+
+                    int op = 0;
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            int startKey = min + op * OP_RANGE;
+
+                            if (add) {
+                                Map<Integer, MvccTestSqlIndexValue> vals = new HashMap<>();
+
+                                for (int i = 0; i < 10; i++) {
+                                    Integer key = startKey + i + 1;
+
+                                    vals.put(key, new MvccTestSqlIndexValue(key));
+                                }
+
+                                cache.cache.putAll(vals);
+
+                                // info("put " + vals.keySet());
+                            }
+                            else {
+                                Set<Integer> rmvKeys = new HashSet<>();
+
+                                for (int i = 0; i < 10; i++)
+                                    rmvKeys.add(startKey + i + 1);
+
+                                cache.cache.removeAll(rmvKeys);
+
+                                // info("remove " + rmvKeys);
+                            }
+
+                            if (++op == THREAD_OPS) {
+                                add = !add;
+
+                                op = 0;
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> maxQrys = new ArrayList<>();
+                    List<SqlFieldsQuery> minQrys = new ArrayList<>();
+
+                    maxQrys.add(new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue"));
+                    maxQrys.add(new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 >= 0"));
+
+                    minQrys.add(new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue"));
+                    minQrys.add(new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 >= 0"));
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd);
+
+                        try {
+                            for (SqlFieldsQuery qry : maxQrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                assertEquals(1, res.size());
+
+                                Integer m = (Integer)res.get(0).get(0);
+
+                                assertTrue(m == null || m % 10 == 0);
+                            }
+
+                            for (SqlFieldsQuery qry : minQrys) {
+                                List<List<?>> res = cache.cache.query(qry).getAll();
+
+                                assertEquals(1, res.size());
+
+                                Integer m = (Integer)res.get(0).get(0);
+
+                                assertTrue(m == null || m % 10 == 1);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlQueriesWithMvcc() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class));
+
+        for (int i = 0; i < 10; i++)
+            cache.put(i, new MvccTestSqlIndexValue(i));
+
+        sqlQueriesWithMvcc(cache, true);
+
+        sqlQueriesWithMvcc(cache, false);
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        sqlQueriesWithMvcc(cache, false);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param loc Local query flag.
+     */
+    private void sqlQueriesWithMvcc(IgniteCache<Integer, MvccTestSqlIndexValue> cache, boolean loc) {
+        assertEquals(10,
+            cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true").setLocal(loc)).getAll().size());
+
+        assertEquals(10,
+            cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue").setLocal(loc)).getAll().size());
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 9);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 0").setLocal(loc), 9);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 4);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 0);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 100").setLocal(loc), 0);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 0);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 5").setLocal(loc), 6);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue").setLocal(loc), 10L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0").setLocal(loc), 10L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 < 100").setLocal(loc), 10L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >0 and idxVal1 < 5").setLocal(loc), 4L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 1").setLocal(loc), 9L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 > 100").setLocal(loc), 0L);
+
+        checkSingleResult(cache,
+            new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 = 1").setLocal(loc), 1L);
+    }
+
+    /**
+     * @param cache Cache.
+     * @param qry Query.
+     * @param exp Expected value.
+     */
+    private void checkSingleResult(IgniteCache cache, SqlFieldsQuery qry, Object exp) {
+        List<List<?>> res = cache.query(qry).getAll();
+
+        assertEquals(1, res.size());
+
+        List<?> row = res.get(0);
+
+        assertEquals(1, row.size());
+
+        assertEquals(exp, row.get(0));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlSimple() throws Exception {
+        startGrid(0);
+
+        for (int i = 0; i < 4; i++)
+            sqlSimple(i * 512);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 5; i++)
+            sqlSimple(rnd.nextInt(2048));
+    }
+
+    /**
+     * @param inlineSize Inline size.
+     * @throws Exception If failed.
+     */
+    private void sqlSimple(int inlineSize) throws Exception {
+        Ignite srv0 = ignite(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache =  (IgniteCache)srv0.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+                setSqlIndexMaxInlineSize(inlineSize));
+
+        Map<Integer, Integer> expVals = new HashMap<>();
+
+        checkValues(expVals, cache);
+
+        cache.put(1, new MvccTestSqlIndexValue(1));
+        expVals.put(1, 1);
+
+        checkValues(expVals, cache);
+
+        cache.put(1, new MvccTestSqlIndexValue(2));
+        expVals.put(1, 2);
+
+        checkValues(expVals, cache);
+
+        cache.put(2, new MvccTestSqlIndexValue(1));
+        expVals.put(2, 1);
+        cache.put(3, new MvccTestSqlIndexValue(1));
+        expVals.put(3, 1);
+        cache.put(4, new MvccTestSqlIndexValue(1));
+        expVals.put(4, 1);
+
+        checkValues(expVals, cache);
+
+        cache.remove(1);
+        expVals.remove(1);
+
+        checkValues(expVals, cache);
+
+        checkNoValue(1, cache);
+
+        cache.put(1, new MvccTestSqlIndexValue(10));
+        expVals.put(1, 10);
+
+        checkValues(expVals, cache);
+
+        checkActiveQueriesCleanup(srv0);
+
+        srv0.destroyCache(cache.getName());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlSimplePutRemoveRandom() throws Exception {
+        startGrid(0);
+
+        testSqlSimplePutRemoveRandom(0);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 3; i++)
+            testSqlSimplePutRemoveRandom(rnd.nextInt(2048));
+    }
+
+    /**
+     * @param inlineSize Inline size.
+     * @throws Exception If failed.
+     */
+    private void testSqlSimplePutRemoveRandom(int inlineSize) throws Exception {
+        Ignite srv0 = grid(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache) srv0.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+                setSqlIndexMaxInlineSize(inlineSize));
+
+        Map<Integer, Integer> expVals = new HashMap<>();
+
+        final int KEYS = 100;
+        final int VALS = 10;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        long stopTime = System.currentTimeMillis() + 5_000;
+
+        for (int i = 0; i < 100_000; i++) {
+            Integer key = rnd.nextInt(KEYS);
+
+            if (rnd.nextInt(5) == 0) {
+                cache.remove(key);
+
+                expVals.remove(key);
+            }
+            else {
+                Integer val = rnd.nextInt(VALS);
+
+                cache.put(key, new MvccTestSqlIndexValue(val));
+
+                expVals.put(key, val);
+            }
+
+            checkValues(expVals, cache);
+
+            if (System.currentTimeMillis() > stopTime) {
+                info("Stop test, iteration: " + i);
+
+                break;
+            }
+        }
+
+        for (int i = 0; i < KEYS; i++) {
+            if (!expVals.containsKey(i))
+                checkNoValue(i, cache);
+        }
+
+        checkActiveQueriesCleanup(srv0);
+
+        srv0.destroyCache(cache.getName());
+    }
+
+    /**
+     * @param key Key.
+     * @param cache Cache.
+     */
+    private void checkNoValue(Object key, IgniteCache cache) {
+        SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+        qry.setArgs(key);
+
+        List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll();
+
+        assertTrue(res.isEmpty());
+    }
+
+    /**
+     * @param expVals Expected values.
+     * @param cache Cache.
+     */
+    private void checkValues(Map<Integer, Integer> expVals, IgniteCache<Integer, MvccTestSqlIndexValue> cache) {
+        SqlFieldsQuery cntQry = new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue");
+
+        Long cnt = (Long)cache.query(cntQry).getAll().get(0).get(0);
+
+        assertEquals((long)expVals.size(), (Object)cnt);
+
+        SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "true");
+
+        Map<Integer, Integer> vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key >= 0");
+
+        vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 >= 0");
+
+        vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        Map<Integer, Set<Integer>> expIdxVals = new HashMap<>();
+
+        for (Map.Entry<Integer, Integer> e : expVals.entrySet()) {
+            qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+            qry.setArgs(e.getKey());
+
+            List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll();
+
+            assertEquals(1, res.size());
+            assertEquals(e.getKey(), res.get(0).getKey());
+            assertEquals(e.getValue(), (Integer)res.get(0).getValue().idxVal1);
+
+            SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?");
+            fieldsQry.setArgs(e.getKey());
+
+            List<List<?>> fieldsRes = cache.query(fieldsQry).getAll();
+
+            assertEquals(1, fieldsRes.size());
+            assertEquals(e.getKey(), fieldsRes.get(0).get(0));
+            assertEquals(e.getValue(), fieldsRes.get(0).get(1));
+
+            Integer val = e.getValue();
+
+            Set<Integer> keys = expIdxVals.get(val);
+
+            if (keys == null)
+                expIdxVals.put(val, keys = new HashSet<>());
+
+            assertTrue(keys.add(e.getKey()));
+        }
+
+        for (Map.Entry<Integer, Set<Integer>> expE : expIdxVals.entrySet()) {
+            qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 = ?");
+            qry.setArgs(expE.getKey());
+
+            vals = new HashMap<>();
+
+            for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll()) {
+                assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+                assertEquals(expE.getKey(), (Integer)e.getValue().idxVal1);
+
+                assertTrue(expE.getValue().contains(e.getKey()));
+            }
+
+            assertEquals(expE.getValue().size(), vals.size());
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestParentKey implements Serializable {
+        /** */
+        private int key;
+
+        /**
+         * @param key Key.
+         */
+        JoinTestParentKey(int key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            JoinTestParentKey that = (JoinTestParentKey)o;
+
+            return key == that.key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key;
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestParent {
+        /** */
+        @QuerySqlField(index = true)
+        private int id;
+
+        /**
+         * @param id ID.
+         */
+        JoinTestParent(int id) {
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(JoinTestParent.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestChildKey implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private int key;
+
+        /**
+         * @param key Key.
+         */
+        JoinTestChildKey(int key) {
+            this.key = key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            JoinTestChildKey that = (JoinTestChildKey)o;
+
+            return key == that.key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return key;
+        }
+    }
+
+    /**
+     *
+     */
+    static class JoinTestChild {
+        /** */
+        @QuerySqlField(index = true)
+        private int parentId;
+
+        /**
+         * @param parentId Parent ID.
+         */
+        JoinTestChild(int parentId) {
+            this.parentId = parentId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(JoinTestChild.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class MvccTestSqlIndexValue implements Serializable {
+        /** */
+        @QuerySqlField(index = true)
+        private int idxVal1;
+
+        /**
+         * @param idxVal1 Indexed value 1.
+         */
+        MvccTestSqlIndexValue(int idxVal1) {
+            this.idxVal1 = idxVal1;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(MvccTestSqlIndexValue.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    static class InitIndexing implements IgniteInClosure<CacheConfiguration> {
+        /** */
+        private final Class[] idxTypes;
+
+        /**
+         * @param idxTypes Indexed types.
+         */
+        InitIndexing(Class<?>... idxTypes) {
+            this.idxTypes = idxTypes;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(CacheConfiguration cfg) {
+            cfg.setIndexedTypes(idxTypes);
+        }
+    }
 }


[4/4] ignite git commit: ignite-3478 Mvcc support for sql indexes

Posted by sb...@apache.org.
ignite-3478 Mvcc support for sql indexes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6150f3a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6150f3a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6150f3a0

Branch: refs/heads/ignite-3478
Commit: 6150f3a0ad310810606ec5bafbd007804808ff25
Parents: 00bd479
Author: sboikov <sb...@gridgain.com>
Authored: Wed Oct 25 15:15:56 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Oct 25 15:15:56 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  181 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |   26 +-
 .../cache/mvcc/CoordinatorAckRequestTx.java     |    2 +-
 .../cache/mvcc/PreviousCoordinatorQueries.java  |    4 +-
 .../cache/persistence/CacheDataRowAdapter.java  |    6 +-
 .../cache/persistence/tree/BPlusTree.java       |   42 +-
 .../cache/persistence/tree/io/IOVersions.java   |    7 +
 .../cache/persistence/tree/io/PageIO.java       |   85 +-
 .../cache/query/GridCacheQueryManager.java      |   11 +-
 .../cache/tree/AbstractDataInnerIO.java         |    8 +-
 .../cache/tree/AbstractDataLeafIO.java          |    6 +-
 .../cache/tree/CacheDataRowStore.java           |    6 +-
 .../processors/cache/tree/CacheDataTree.java    |    2 +-
 .../cache/tree/CacheIdAwareDataInnerIO.java     |    2 +-
 .../cache/tree/CacheIdAwareDataLeafIO.java      |    2 +-
 .../processors/cache/tree/DataInnerIO.java      |    2 +-
 .../processors/cache/tree/DataLeafIO.java       |    2 +-
 .../internal/processors/cache/tree/DataRow.java |   17 +-
 .../processors/cache/tree/MvccCleanupRow.java   |   48 +
 .../processors/cache/tree/MvccDataRow.java      |   25 +-
 .../processors/cache/tree/MvccUpdateRow.java    |   23 +-
 .../processors/cache/tree/SearchRow.java        |    2 +-
 .../datastreamer/DataStreamerImpl.java          |    2 +-
 .../processors/query/GridQueryIndexing.java     |    8 +-
 .../processors/query/GridQueryProcessor.java    |   43 +-
 ...IgniteClientCacheInitializationFailTest.java |    4 +-
 .../cache/mvcc/CacheMvccAbstractTest.java       |  123 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |   78 +-
 .../processors/database/BPlusTreeSelfTest.java  |  106 +-
 .../query/h2/opt/GridH2SpatialIndex.java        |    5 +
 .../cache/query/GridCacheTwoStepQuery.java      |   18 +
 .../processors/query/h2/IgniteH2Indexing.java   |   41 +-
 .../query/h2/database/H2PkHashIndex.java        |   11 +-
 .../query/h2/database/H2RowFactory.java         |   30 +-
 .../processors/query/h2/database/H2Tree.java    |  102 +-
 .../query/h2/database/H2TreeIndex.java          |   74 +-
 .../h2/database/H2TreeMvccFilterClosure.java    |  106 ++
 .../h2/database/io/AbstractH2ExtrasInnerIO.java |  190 +++
 .../h2/database/io/AbstractH2ExtrasLeafIO.java  |  187 +++
 .../query/h2/database/io/AbstractH2InnerIO.java |  106 ++
 .../query/h2/database/io/AbstractH2LeafIO.java  |  108 ++
 .../query/h2/database/io/H2ExtrasInnerIO.java   |  115 +-
 .../query/h2/database/io/H2ExtrasLeafIO.java    |  111 +-
 .../query/h2/database/io/H2IOUtils.java         |  113 ++
 .../query/h2/database/io/H2InnerIO.java         |   41 +-
 .../query/h2/database/io/H2LeafIO.java          |   41 +-
 .../h2/database/io/H2MvccExtrasInnerIO.java     |   77 +
 .../h2/database/io/H2MvccExtrasLeafIO.java      |   76 +
 .../query/h2/database/io/H2MvccInnerIO.java     |   42 +
 .../query/h2/database/io/H2MvccLeafIO.java      |   42 +
 .../query/h2/database/io/H2RowLinkIO.java       |   33 +
 .../query/h2/opt/GridH2IndexBase.java           |   27 +-
 .../query/h2/opt/GridH2KeyValueRowOnheap.java   |   30 +-
 .../query/h2/opt/GridH2MetaTable.java           |    5 +
 .../query/h2/opt/GridH2PlainRowFactory.java     |   17 +-
 .../query/h2/opt/GridH2QueryContext.java        |   27 +-
 .../processors/query/h2/opt/GridH2Row.java      |   24 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   12 +-
 .../query/h2/opt/GridH2SearchRow.java           |   41 +
 .../query/h2/opt/GridH2SearchRowAdapter.java    |   13 +-
 .../processors/query/h2/opt/GridH2Table.java    |   53 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |   38 +-
 .../h2/twostep/GridMergeIndexIterator.java      |   16 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |   46 +-
 .../h2/twostep/msg/GridH2QueryRequest.java      |   83 +-
 .../cache/mvcc/CacheMvccSqlQueriesTest.java     | 1568 +++++++++++++++++-
 66 files changed, 3955 insertions(+), 587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 1280e75..8ce47bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,9 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -57,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
+import org.apache.ignite.internal.processors.cache.tree.MvccCleanupRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
@@ -88,6 +87,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_START_CNTR;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
@@ -1419,12 +1419,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 // TODO IGNITE-3478: null is passed for loaded from store, need handle better.
                 if (mvccVer == null) {
-                    mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L);
+                    mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, MVCC_START_CNTR, 0L);
 
                     newVal = true;
                 }
                 else
-                    assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+                    assert val != null || versionForRemovedValue(mvccVer.coordinatorVersion());
 
                 if (val != null) {
                     val.valueBytes(coCtx);
@@ -1476,8 +1476,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 assert !old;
 
-                if (val != null)
+                if (val != null) {
                     incrementSize(cctx.cacheId());
+
+                    if (cctx.queries().enabled())
+                        cctx.queries().store(updateRow, mvccVer, null);
+                }
             }
             finally {
                 busyLock.leaveBusy();
@@ -1531,6 +1535,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
                     assert !primary : updateRow;
+
+                    cleanup(cctx, updateRow.cleanupRows(), false);
+
+                    return null;
                 }
                 else {
                     rowStore.addRow(updateRow);
@@ -1543,7 +1551,19 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                         incrementSize(cctx.cacheId());
                 }
 
-                cleanup(updateRow.cleanupRows(), false);
+                CacheDataRow oldRow = updateRow.oldRow();
+
+                if (oldRow != null)
+                    oldRow.key(key);
+
+                GridCacheQueryManager qryMgr = cctx.queries();
+
+                if (qryMgr.enabled())
+                    qryMgr.store(updateRow, mvccVer, oldRow);
+
+                updatePendingEntries(cctx, updateRow, oldRow);
+
+                cleanup(cctx, updateRow.cleanupRows(), false);
 
                 return updateRow.activeTransactions();
             }
@@ -1590,18 +1610,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
                     assert !primary : updateRow;
 
-                    cleanup(updateRow.cleanupRows(), false);
+                    cleanup(cctx, updateRow.cleanupRows(), false);
+
+                    return null;
                 }
                 else {
                     if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
                         decrementSize(cacheId);
 
-                    CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
+                    long rmvRowLink = cleanup(cctx, updateRow.cleanupRows(), true);
 
-                    if (rmvRow == null)
+                    if (rmvRowLink == 0)
                         rowStore.addRow(updateRow);
                     else
-                        updateRow.link(rmvRow.link());
+                        updateRow.link(rmvRowLink);
 
                     assert updateRow.link() != 0L;
 
@@ -1610,6 +1632,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     assert !old;
                 }
 
+                CacheDataRow oldRow = updateRow.oldRow();
+
+                if (oldRow != null) {
+                    assert oldRow.link() != 0 : oldRow;
+
+                    oldRow.key(key);
+
+                    GridCacheQueryManager qryMgr = cctx.queries();
+
+                    if (qryMgr.enabled())
+                        qryMgr.remove(key, oldRow, mvccVer);
+
+                    clearPendingEntries(cctx, oldRow);
+                }
+
                 return updateRow.activeTransactions();
             }
             finally {
@@ -1623,26 +1660,40 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
+            boolean cleanup = cctx.queries().enabled() || hasPendingEntries;
+
             GridCursor<CacheDataRow> cur = dataTree.find(
                 new MvccSearchRow(cacheId, key, Long.MAX_VALUE, Long.MAX_VALUE),
                 new MvccSearchRow(cacheId, key, 1, 1),
-                CacheDataRowAdapter.RowData.KEY_ONLY);
+                cleanup ? CacheDataRowAdapter.RowData.NO_KEY : CacheDataRowAdapter.RowData.LINK_ONLY);
 
             boolean first = true;
 
             while (cur.next()) {
                 CacheDataRow row = cur.get();
 
+                row.key(key);
+
                 assert row.link() != 0 : row;
 
                 boolean rmvd = dataTree.removex(row);
 
-                assert rmvd;
+                assert rmvd : row;
+
+                boolean rmvdVal = versionForRemovedValue(row.mvccCoordinatorVersion());
+
+                if (cleanup && !rmvdVal) {
+                    if (cctx.queries().enabled())
+                        cctx.queries().remove(key, row, null);
+
+                    if (first)
+                        clearPendingEntries(cctx, row);
+                }
 
                 rowStore.removeRow(row.link());
 
                 if (first) {
-                    if (!versionForRemovedValue(row.mvccCoordinatorVersion()))
+                    if (!rmvdVal)
                         decrementSize(cctx.cacheId());
 
                     first = false;
@@ -1651,36 +1702,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /**
+         * @param cctx Cache context.
          * @param cleanupRows Rows to cleanup.
          * @param findRmv {@code True} if need keep removed row entry.
-         * @return Removed row entry if found.
+         * @return Removed row link of {@code 0} if not found.
          * @throws IgniteCheckedException If failed.
          */
-        @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv)
+        private long cleanup(GridCacheContext cctx, @Nullable List<MvccCleanupRow> cleanupRows, boolean findRmv)
             throws IgniteCheckedException {
-            CacheSearchRow rmvRow = null;
+            long rmvRowLink = 0;
 
             if (cleanupRows != null) {
+                GridCacheQueryManager qryMgr = cctx.queries();
+
                 for (int i = 0; i < cleanupRows.size(); i++) {
-                    CacheSearchRow oldRow = cleanupRows.get(i);
+                    MvccCleanupRow cleanupRow = cleanupRows.get(i);
+
+                    assert cleanupRow.link() != 0 : cleanupRow;
 
-                    assert oldRow.link() != 0L : oldRow;
+                    if (qryMgr.enabled() && !versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) {
+                        CacheDataRow oldRow = dataTree.remove(cleanupRow);
 
-                    boolean rmvd = dataTree.removex(oldRow);
+                        assert oldRow != null : cleanupRow;
 
-                    assert rmvd;
+                        qryMgr.remove(oldRow.key(), oldRow, null);
+                    }
+                    else {
+                        boolean rmvd = dataTree.removex(cleanupRow);
+
+                        assert rmvd;
+                    }
 
                     if (findRmv &&
-                        rmvRow == null &&
-                        versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
-                        rmvRow = oldRow;
+                        rmvRowLink == 0 &&
+                        versionForRemovedValue(cleanupRow.mvccCoordinatorVersion())) {
+                        rmvRowLink = cleanupRow.link();
                     }
                     else
-                        rowStore.removeRow(oldRow.link());
+                        rowStore.removeRow(cleanupRow.link());
                 }
             }
 
-            return rmvRow;
+            return rmvRowLink;
         }
 
         /** {@inheritDoc} */
@@ -1753,32 +1816,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
             KeyCacheObject key = newRow.key();
 
-            long expireTime = newRow.expireTime();
-
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
             if (qryMgr.enabled())
-                qryMgr.store(newRow, oldRow);
+                qryMgr.store(newRow, null, oldRow);
+
+            updatePendingEntries(cctx, newRow, oldRow);
 
             if (oldRow != null) {
                 assert oldRow.link() != 0 : oldRow;
 
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
-
                 if (newRow.link() != oldRow.link())
                     rowStore.removeRow(oldRow.link());
             }
 
+            updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value());
+        }
+
+        /**
+         * @param cctx Cache context.
+         * @param newRow
+         * @param oldRow
+         * @throws IgniteCheckedException If failed.
+         */
+        private void updatePendingEntries(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow)
+            throws IgniteCheckedException
+        {
+            long expireTime = newRow.expireTime();
+
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+            if (oldRow != null) {
+                assert oldRow.link() != 0 : oldRow;
+
+                if (pendingEntries != null && oldRow.expireTime() != 0)
+                    pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+            }
+
             if (pendingEntries != null && expireTime != 0) {
                 pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link()));
 
                 hasPendingEntries = true;
             }
-
-            updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value());
         }
 
         /** {@inheritDoc} */
@@ -1792,7 +1871,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
 
                 GridCacheQueryManager qryMgr = cctx.queries();
 
-                qryMgr.store(row, null);
+                qryMgr.store(row, null, null); // TODO IGNITE-3478.
             }
         }
 
@@ -1821,14 +1900,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
          */
         private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
             if (oldRow != null) {
-                int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
-
-                assert oldRow.link() != 0 : oldRow;
-                assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId :
-                    "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "].";
-
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+                clearPendingEntries(cctx, oldRow);
 
                 decrementSize(cctx.cacheId());
             }
@@ -1836,7 +1908,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             GridCacheQueryManager qryMgr = cctx.queries();
 
             if (qryMgr.enabled())
-                qryMgr.remove(key, oldRow);
+                qryMgr.remove(key, oldRow, null);
 
             if (oldRow != null)
                 rowStore.removeRow(oldRow.link());
@@ -1844,6 +1916,23 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null);
         }
 
+        /**
+         * @param cctx
+         * @param oldRow
+         * @throws IgniteCheckedException
+         */
+        private void clearPendingEntries(GridCacheContext cctx, CacheDataRow oldRow)
+            throws IgniteCheckedException {
+            int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
+
+            assert oldRow.link() != 0 : oldRow;
+            assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId :
+                "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "].";
+
+            if (pendingEntries != null && oldRow.expireTime() != 0)
+                pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
+        }
+
         /** {@inheritDoc} */
         @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
             key.valueBytes(cctx.cacheObjectContext());
@@ -1985,7 +2074,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                         if (curKey != null && row.key().equals(curKey))
                             continue;
 
-                        if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+                        if (versionForRemovedValue(rowCrdVerMasked)) {
                             curKey = row.key();
 
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index fd3c2af..07e30d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -75,10 +75,10 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  */
 public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     /** */
-    public static final long COUNTER_NA = 0L;
+    public static final long MVCC_COUNTER_NA = 0L;
 
     /** */
-    public static final long START_VER = 1L;
+    public static final long MVCC_START_CNTR = 1L;
 
     /** */
     private static final boolean STAT_CNTRS = false;
@@ -99,7 +99,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     private volatile MvccCoordinator curCrd;
 
     /** */
-    private final AtomicLong mvccCntr = new AtomicLong(START_VER);
+    private final AtomicLong mvccCntr = new AtomicLong(MVCC_START_CNTR);
 
     /** */
     private final GridAtomicLong committedCntr = new GridAtomicLong(1L);
@@ -148,6 +148,18 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param crdVer Mvcc coordinator version.
+     * @param cntr Counter.
+     * @return Always {@code true}.
+     */
+    public static boolean assertMvccVersionValid(long crdVer, long cntr) {
+        assert unmaskCoordinatorVersion(crdVer) > 0;
+        assert cntr != MVCC_COUNTER_NA;
+
+        return true;
+    }
+
+    /**
      * @param crdVer Coordinator version.
      * @return Coordinator version with removed value flag.
      */
@@ -651,7 +663,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
     private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
         onTxDone(msg.txCounter());
 
-        if (msg.queryCounter() != COUNTER_NA) {
+        if (msg.queryCounter() != MVCC_COUNTER_NA) {
             if (msg.queryCoordinatorVersion() == 0)
                 onQueryDone(nodeId, msg.queryCounter());
             else
@@ -824,7 +836,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
             else
                 qryCnt.incrementAndGet();
 
-            res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+            res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA);
 
             return res;
         }
@@ -909,7 +921,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
 //            }
 //        }
 //
-//        res.init(futId, crdVer, mvccCntr, COUNTER_NA);
+//        res.init(futId, crdVer, mvccCntr, MVCC_COUNTER_NA);
 //
 //        return res;
     }
@@ -1197,7 +1209,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
          * @param res Response.
          */
         void onResponse(MvccCoordinatorVersionResponse res) {
-            assert res.counter() != COUNTER_NA;
+            assert res.counter() != MVCC_COUNTER_NA;
 
             if (lsnr != null)
                 lsnr.onMvccResponse(crd.nodeId(), res);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
index c0512f0..5ab3d3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -60,7 +60,7 @@ public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
 
     /** {@inheritDoc} */
     long queryCounter() {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
index 5c56f40..521e989 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousCoordinatorQueries.java
@@ -26,9 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -165,7 +163,7 @@ class PreviousCoordinatorQueries {
      */
     void onQueryDone(UUID nodeId, long crdVer, long cntr) {
         assert crdVer != 0;
-        assert cntr != CacheCoordinatorsProcessor.COUNTER_NA;
+        assert cntr != CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 
         synchronized (this) {
             MvccCounter mvccCntr = new MvccCounter(crdVer, cntr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 1e3a229..29bb6bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -60,6 +60,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
     protected CacheObject val;
 
     /** */
+    @GridToStringInclude
     protected long expireTime = -1;
 
     /** */
@@ -599,7 +600,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
         KEY_ONLY,
 
         /** */
-        NO_KEY
+        NO_KEY,
+
+        /** */
+        LINK_ONLY,
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index b31a61f..1ebb1e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -884,12 +884,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
     /**
      * @param upper Upper bound.
+     * @param c Filter closure.
      * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
      * @return Cursor.
      * @throws IgniteCheckedException If failed.
      */
-    private GridCursor<T> findLowerUnbounded(L upper, Object x) throws IgniteCheckedException {
-        ForwardCursor cursor = new ForwardCursor(null, upper, x);
+    private GridCursor<T> findLowerUnbounded(L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
+        ForwardCursor cursor = new ForwardCursor(null, upper, c, x);
 
         long firstPageId;
 
@@ -946,13 +947,25 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @throws IgniteCheckedException If failed.
      */
     public final GridCursor<T> find(L lower, L upper, Object x) throws IgniteCheckedException {
+        return find(lower, upper, null, x);
+    }
+
+    /**
+     * @param lower Lower bound inclusive or {@code null} if unbounded.
+     * @param upper Upper bound inclusive or {@code null} if unbounded.
+     * @param c Filter closure.
+     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+     * @return Cursor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final GridCursor<T> find(L lower, L upper, TreeRowClosure<L, T> c, Object x) throws IgniteCheckedException {
         checkDestroyed();
 
         try {
             if (lower == null)
-                return findLowerUnbounded(upper, x);
+                return findLowerUnbounded(upper, c, x);
 
-            ForwardCursor cursor = new ForwardCursor(lower, upper, x);
+            ForwardCursor cursor = new ForwardCursor(lower, upper, c, x);
 
             cursor.find();
 
@@ -4751,14 +4764,19 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /** */
         private int row = -1;
 
+        /** */
+        private final TreeRowClosure<L, T> c;
+
         /**
          * @param lowerBound Lower bound.
          * @param upperBound Upper bound.
+         * @param c Filter closure.
          * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
          */
-        ForwardCursor(L lowerBound, L upperBound, Object x) {
+        ForwardCursor(L lowerBound, L upperBound, TreeRowClosure<L, T> c, Object x) {
             super(lowerBound, upperBound);
 
+            this.c = c;
             this.x = x;
         }
 
@@ -4782,15 +4800,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             if (rows == EMPTY)
                 rows = (T[])new Object[cnt];
 
+            int resCnt = 0;
+
             for (int i = 0; i < cnt; i++) {
-                T r = getRow(io, pageAddr, startIdx + i, x);
+                int itemIdx = startIdx + i;
 
-                rows = GridArrays.set(rows, i, r);
+                if (c == null || c.apply(BPlusTree.this, io, pageAddr, itemIdx)) {
+                    T r = getRow(io, pageAddr, itemIdx, x);
+
+                    rows = GridArrays.set(rows, resCnt++, r);
+                }
             }
 
-            GridArrays.clearTail(rows, cnt);
+            GridArrays.clearTail(rows, resCnt);
 
-            return true;
+            return resCnt > 0;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
index d74d344..9dcad9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/IOVersions.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.tree.io;
 
+import org.apache.ignite.internal.util.typedef.internal.S;
+
 /**
  * Registry for IO versions.
  */
@@ -99,4 +101,9 @@ public final class IOVersions<V extends PageIO> {
 
         return res;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IOVersions.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index 2de0b8c..0a42129 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -88,6 +88,12 @@ public abstract class PageIO {
     /** */
     private static IOVersions<? extends BPlusLeafIO<?>> h2LeafIOs;
 
+    /** */
+    private static IOVersions<? extends BPlusInnerIO<?>> h2MvccInnerIOs;
+
+    /** */
+    private static IOVersions<? extends BPlusLeafIO<?>> h2MvccLeafIOs;
+
     /** Maximum payload size. */
     public static final short MAX_PAYLOAD_SIZE = 2048;
 
@@ -98,6 +104,12 @@ public abstract class PageIO {
     private static List<IOVersions<? extends BPlusLeafIO<?>>> h2ExtraLeafIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
 
     /** */
+    private static List<IOVersions<? extends BPlusInnerIO<?>>> h2ExtraMvccInnerIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
+
+    /** */
+    private static List<IOVersions<? extends BPlusLeafIO<?>>> h2ExtraMvccLeafIOs = new ArrayList<>(MAX_PAYLOAD_SIZE);
+
+    /** */
     public static final int TYPE_OFF = 0;
 
     /** */
@@ -184,24 +196,42 @@ public abstract class PageIO {
     public static final short T_PART_CNTRS = 20;
 
     /** Index for payload == 1. */
-    public static final short T_H2_EX_REF_LEAF_START = 10000;
+    public static final short T_H2_EX_REF_LEAF_START = 10_000;
 
     /** */
     public static final short T_H2_EX_REF_LEAF_END = T_H2_EX_REF_LEAF_START + MAX_PAYLOAD_SIZE - 1;
 
     /** */
-    public static final short T_H2_EX_REF_INNER_START = 20000;
+    public static final short T_H2_EX_REF_INNER_START = 20_000;
 
     /** */
     public static final short T_H2_EX_REF_INNER_END = T_H2_EX_REF_INNER_START + MAX_PAYLOAD_SIZE - 1;
 
     /** */
+    public static final short T_H2_EX_REF_MVCC_LEAF_START = 23_000;
+
+    /** */
+    public static final short T_H2_EX_REF_MVCC_LEAF_END = T_H2_EX_REF_MVCC_LEAF_START + MAX_PAYLOAD_SIZE - 1;
+
+    /** */
+    public static final short T_H2_EX_REF_MVCC_INNER_START = 26_000;
+
+    /** */
+    public static final short T_H2_EX_REF_MVCC_INNER_END = T_H2_EX_REF_MVCC_INNER_START + MAX_PAYLOAD_SIZE - 1;
+
+    /** */
     public static final short T_DATA_REF_MVCC_INNER = 21;
 
     /** */
     public static final short T_DATA_REF_MVCC_LEAF = 22;
 
     /** */
+    public static final short T_H2_MVCC_REF_LEAF = 23;
+
+    /** */
+    public static final short T_H2_MVCC_REF_INNER = 24;
+
+    /** */
     private final int ver;
 
     /** */
@@ -334,13 +364,19 @@ public abstract class PageIO {
      *
      * @param innerIOs Inner IO versions.
      * @param leafIOs Leaf IO versions.
+     * @param mvccInnerIOs Inner IO versions with mvcc enabled.
+     * @param mvccLeafIOs Leaf IO versions with mvcc enabled.
      */
     public static void registerH2(
         IOVersions<? extends BPlusInnerIO<?>> innerIOs,
-        IOVersions<? extends BPlusLeafIO<?>> leafIOs
+        IOVersions<? extends BPlusLeafIO<?>> leafIOs,
+        IOVersions<? extends BPlusInnerIO<?>> mvccInnerIOs,
+        IOVersions<? extends BPlusLeafIO<?>> mvccLeafIOs
     ) {
         h2InnerIOs = innerIOs;
         h2LeafIOs = leafIOs;
+        h2MvccInnerIOs = mvccInnerIOs;
+        h2MvccLeafIOs = mvccLeafIOs;
     }
 
     /**
@@ -348,8 +384,10 @@ public abstract class PageIO {
      *
      * @param innerExtIOs Extra versions.
      */
-    public static void registerH2ExtraInner(IOVersions<? extends BPlusInnerIO<?>> innerExtIOs) {
-        h2ExtraInnerIOs.add(innerExtIOs);
+    public static void registerH2ExtraInner(IOVersions<? extends BPlusInnerIO<?>> innerExtIOs, boolean mvcc) {
+        List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? h2ExtraMvccInnerIOs : h2ExtraInnerIOs;
+
+        ios.add(innerExtIOs);
     }
 
     /**
@@ -357,24 +395,30 @@ public abstract class PageIO {
      *
      * @param leafExtIOs Extra versions.
      */
-    public static void registerH2ExtraLeaf(IOVersions<? extends BPlusLeafIO<?>> leafExtIOs) {
-        h2ExtraLeafIOs.add(leafExtIOs);
+    public static void registerH2ExtraLeaf(IOVersions<? extends BPlusLeafIO<?>> leafExtIOs, boolean mvcc) {
+        List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? h2ExtraMvccLeafIOs : h2ExtraLeafIOs;
+
+        ios.add(leafExtIOs);
     }
 
     /**
      * @param idx Index.
      * @return IOVersions for given idx.
      */
-    public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int idx) {
-        return h2ExtraInnerIOs.get(idx);
+    public static IOVersions<? extends BPlusInnerIO<?>> getInnerVersions(int idx, boolean mvcc) {
+        List<IOVersions<? extends BPlusInnerIO<?>>> ios = mvcc ? h2ExtraMvccInnerIOs : h2ExtraInnerIOs;
+
+        return ios.get(idx);
     }
 
     /**
      * @param idx Index.
      * @return IOVersions for given idx.
      */
-    public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int idx) {
-        return h2ExtraLeafIOs.get(idx);
+    public static IOVersions<? extends BPlusLeafIO<?>> getLeafVersions(int idx, boolean mvcc) {
+        List<IOVersions<? extends BPlusLeafIO<?>>> ios = mvcc ? h2ExtraMvccLeafIOs : h2ExtraLeafIOs;
+
+        return ios.get(idx);
     }
 
     /**
@@ -493,13 +537,18 @@ public abstract class PageIO {
      */
     @SuppressWarnings("unchecked")
     public static <Q extends BPlusIO<?>> Q getBPlusIO(int type, int ver) throws IgniteCheckedException {
-
         if (type >= T_H2_EX_REF_LEAF_START && type <= T_H2_EX_REF_LEAF_END)
             return (Q)h2ExtraLeafIOs.get(type - T_H2_EX_REF_LEAF_START).forVersion(ver);
 
         if (type >= T_H2_EX_REF_INNER_START && type <= T_H2_EX_REF_INNER_END)
             return (Q)h2ExtraInnerIOs.get(type - T_H2_EX_REF_INNER_START).forVersion(ver);
 
+        if (type >= T_H2_EX_REF_MVCC_LEAF_START && type <= T_H2_EX_REF_MVCC_LEAF_END)
+            return (Q)h2ExtraMvccLeafIOs.get(type - T_H2_EX_REF_MVCC_LEAF_START).forVersion(ver);
+
+        if (type >= T_H2_EX_REF_MVCC_INNER_START && type <= T_H2_EX_REF_MVCC_INNER_END)
+            return (Q)h2ExtraMvccInnerIOs.get(type - T_H2_EX_REF_MVCC_INNER_START).forVersion(ver);
+
         switch (type) {
             case T_H2_REF_INNER:
                 if (h2InnerIOs == null)
@@ -513,6 +562,18 @@ public abstract class PageIO {
 
                 return (Q)h2LeafIOs.forVersion(ver);
 
+            case T_H2_MVCC_REF_INNER:
+                if (h2MvccInnerIOs == null)
+                    break;
+
+                return (Q)h2MvccInnerIOs.forVersion(ver);
+
+            case T_H2_MVCC_REF_LEAF:
+                if (h2MvccLeafIOs == null)
+                    break;
+
+                return (Q)h2MvccLeafIOs.forVersion(ver);
+
             case T_DATA_REF_INNER:
                 return (Q)DataInnerIO.VERSIONS.forVersion(ver);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 59b7613..fb5728a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -382,10 +382,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
     /**
      * @param newRow New row.
+     * @param mvccVer Mvcc version for update.
      * @param prevRow Previous row.
      * @throws IgniteCheckedException In case of error.
      */
-    public void store(CacheDataRow newRow, @Nullable CacheDataRow prevRow)
+    public void store(CacheDataRow newRow, @Nullable MvccCoordinatorVersion mvccVer, @Nullable CacheDataRow prevRow)
         throws IgniteCheckedException {
         assert enabled();
         assert newRow != null && newRow.value() != null && newRow.link() != 0 : newRow;
@@ -405,7 +406,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
             }
 
             if (qryProcEnabled)
-                qryProc.store(cctx, newRow, prevRow);
+                qryProc.store(cctx, newRow, mvccVer, prevRow);
         }
         finally {
             invalidateResultCache();
@@ -417,9 +418,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
     /**
      * @param key Key.
      * @param prevRow Previous row.
+     * @param newVer Mvcc version for remove operation.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow) throws IgniteCheckedException {
+    public void remove(KeyCacheObject key, @Nullable CacheDataRow prevRow, @Nullable MvccCoordinatorVersion newVer)
+        throws IgniteCheckedException {
         if (!QueryUtils.isEnabled(cctx.config()))
             return; // No-op.
 
@@ -435,7 +438,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
 
             // val may be null if we have no previous value. We should not call processor in this case.
             if (qryProcEnabled && prevRow != null)
-                qryProc.remove(cctx, prevRow);
+                qryProc.remove(cctx, prevRow, newVer);
         }
         finally {
             invalidateResultCache();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index 31aa2ca..c36d5cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 
 /**
@@ -62,7 +62,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
 
         if (storeMvccVersion()) {
             assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row;
-            assert row.mvccCounter() != COUNTER_NA : row;
+            assert row.mvccCounter() != MVCC_COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
             off += 8;
@@ -82,7 +82,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvccCntr = getMvccCounter(pageAddr, idx);
 
             assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
-            assert mvccCntr != COUNTER_NA;
+            assert mvccCntr != MVCC_COUNTER_NA;
 
             return ((CacheDataTree)tree).rowStore().mvccRow(cacheId,
                 hash,
@@ -128,7 +128,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
 
             assert unmaskCoordinatorVersion(mvccTopVer) > 0 : mvccTopVer;
-            assert mvccCntr != COUNTER_NA;
+            assert mvccCntr != MVCC_COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index 47d8a6f..d60aef2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 
 /**
@@ -64,7 +64,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = row.mvccCounter();
 
             assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
-            assert mvccUpdateCntr != COUNTER_NA;
+            assert mvccUpdateCntr != MVCC_COUNTER_NA;
 
             PageUtils.putLong(pageAddr, off, mvccCrdVer);
             off += 8;
@@ -100,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
 
             assert unmaskCoordinatorVersion(mvccUpdateTopVer) > 0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != COUNTER_NA;
+            assert mvccUpdateCntr != MVCC_COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index 85624d5..5537794 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -73,9 +73,9 @@ public class CacheDataRowStore extends RowStore {
      * @return Search row.
      */
     MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
-        if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) {
-            if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
-                return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr);
+        if (versionForRemovedValue(crdVer)) {
+            if (rowData == CacheDataRowAdapter.RowData.NO_KEY || rowData == CacheDataRowAdapter.RowData.LINK_ONLY)
+                return MvccDataRow.removedRowNoKey(link, partId, cacheId, crdVer, mvccCntr);
             else
                 rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index a699cd3..9f85640 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -169,7 +169,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
 
         long mvccCntr = io.getMvccCounter(pageAddr, idx);
 
-        assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA;
+        assert row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
 
         cmp = Long.compare(row.mvccCounter(), mvccCntr);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
index 3d02b27..36ffd49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataInnerIO.java
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
index 58ae9ff..ae6fc0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheIdAwareDataLeafIO.java
@@ -59,6 +59,6 @@ public final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
index 19a5c47..98a5450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataInnerIO.java
@@ -59,6 +59,6 @@ public final class DataInnerIO extends AbstractDataInnerIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
index ab10b96..b644e6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataLeafIO.java
@@ -59,6 +59,6 @@ public final class DataLeafIO extends AbstractDataLeafIO {
 
     /** {@inheritDoc} */
     @Override public long getMvccCounter(long pageAddr, int idx) {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index d1e90d4..8853d6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -50,15 +50,13 @@ public class DataRow extends CacheDataRowAdapter {
         this.part = part;
 
         try {
-            // We can not init data row lazily because underlying buffer can be concurrently cleared.
-            initFromLink(grp, rowData);
+            // We can not init data row lazily outside of entry lock because underlying buffer can be concurrently cleared.
+            if (rowData != RowData.LINK_ONLY)
+                initFromLink(grp, rowData);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
         }
-
-        if (key != null)
-            key.partition(part);
     }
 
     /**
@@ -84,11 +82,18 @@ public class DataRow extends CacheDataRowAdapter {
     /**
      *
      */
-    protected DataRow() {
+    DataRow() {
         super(0);
     }
 
     /** {@inheritDoc} */
+    @Override public void key(KeyCacheObject key) {
+        super.key(key);
+
+        hash = key.hashCode();
+    }
+
+    /** {@inheritDoc} */
     @Override public int partition() {
         return part;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
new file mode 100644
index 0000000..92caf70
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccCleanupRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+
+/**
+ * Row contains only link.
+ */
+public class MvccCleanupRow extends MvccSearchRow {
+    /** */
+    private final long link;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param key Key.
+     * @param crdVer Mvcc coordinator version.
+     * @param mvccCntr Mvcc counter.
+     * @param link Link.
+     */
+    MvccCleanupRow(int cacheId, KeyCacheObject key, long crdVer, long mvccCntr, long link) {
+        super(cacheId, key, crdVer, mvccCntr);
+
+        assert link != 0L;
+
+        this.link = link;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long link() {
+        return link;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index 916ea93..a2cf079 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,10 +18,9 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
 
 /**
  *
@@ -34,6 +33,13 @@ public class MvccDataRow extends DataRow {
     private long mvccCntr;
 
     /**
+     *
+     */
+    private MvccDataRow() {
+        // No-op.
+    }
+
+    /**
      * @param grp Context.
      * @param hash Key hash.
      * @param link Link.
@@ -42,24 +48,17 @@ public class MvccDataRow extends DataRow {
      * @param crdVer Mvcc coordinator version.
      * @param mvccCntr Mvcc counter.
      */
-    MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
+    public MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
         super(grp, hash, link, part, rowData);
 
-        assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
-        assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+        assertMvccVersionValid(crdVer, mvccCntr);
 
         this.crdVer = crdVer;
         this.mvccCntr = mvccCntr;
     }
 
     /**
-     *
-     */
-    private MvccDataRow() {
-        // No-op.
-    }
-
-    /**
+     * @param link Link.
      * @param part Partition.
      * @param cacheId Cache ID.
      * @param crdVer Mvcc coordinator version.
@@ -67,12 +66,14 @@ public class MvccDataRow extends DataRow {
      * @return Row.
      */
     static MvccDataRow removedRowNoKey(
+        long link,
         int part,
         int cacheId,
         long crdVer,
         long mvccCntr) {
         MvccDataRow row = new MvccDataRow();
 
+        row.link = link;
         row.cacheId = cacheId;
         row.part = part;
         row.crdVer = crdVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index fb2a6cf..0b37a94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
@@ -34,6 +33,7 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
@@ -51,7 +51,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     private GridLongList activeTxs;
 
     /** */
-    private List<CacheSearchRow> cleanupRows;
+    private List<MvccCleanupRow> cleanupRows;
 
     /** */
     private final MvccCoordinatorVersion mvccVer;
@@ -66,7 +66,9 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
      * @param key Key.
      * @param val Value.
      * @param ver Version.
+     * @param expireTime Expire time.
      * @param mvccVer Mvcc version.
+     * @param needOld {@code True} if need previous value.
      * @param part Partition.
      * @param cacheId Cache ID.
      */
@@ -109,7 +111,7 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     /**
      * @return Rows which are safe to cleanup.
      */
-    public List<CacheSearchRow> cleanupRows() {
+    public List<MvccCleanupRow> cleanupRows() {
         return cleanupRows;
     }
 
@@ -175,8 +177,6 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
                     if (needOld)
                         oldRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
                 }
-                res = versionForRemovedValue(rowCrdVerMasked) ?
-                    UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
             }
         }
 
@@ -199,26 +199,25 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
 
             int cmp;
 
+            long rowCntr = rowIo.getMvccCounter(pageAddr, idx);
+
             if (crdVer == rowCrdVer)
-                cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
+                cmp = Long.compare(mvccVer.cleanupVersion(), rowCntr);
             else
                 cmp = 1;
 
             if (cmp >= 0) {
                 // Do not cleanup oldest version.
                 if (canCleanup) {
-                    CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
-
-                    assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
+                    assert assertMvccVersionValid(rowCrdVer, rowCntr);
 
                     // Should not be possible to cleanup active tx.
-                    assert rowCrdVer != crdVer
-                        || !mvccVer.activeTransactions().contains(row.mvccCounter());
+                    assert rowCrdVer != crdVer || !mvccVer.activeTransactions().contains(rowCntr);
 
                     if (cleanupRows == null)
                         cleanupRows = new ArrayList<>();
 
-                    cleanupRows.add(row);
+                    cleanupRows.add(new MvccCleanupRow(cacheId, key, rowCrdVerMasked, rowCntr, rowIo.getLink(pageAddr, idx)));
                 }
                 else
                     canCleanup = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
index 5bdc495..5fd7e8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/SearchRow.java
@@ -83,7 +83,7 @@ public class SearchRow implements CacheSearchRow {
 
     /** {@inheritDoc} */
     @Override public long mvccCounter() {
-        return CacheCoordinatorsProcessor.COUNTER_NA;
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index e6300a9..dab2ec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
     /** Version which is less then any version generated on coordinator. */
     private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
-        new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L);
+        new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.MVCC_START_CNTR, 0L);
 
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index b0a3831..5bd4bc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
@@ -217,10 +218,13 @@ public interface GridQueryIndexing {
      * @param cctx Cache context.
      * @param type Type descriptor.
      * @param row New row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @throws IgniteCheckedException If failed.
      */
-    public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row)
-        throws IgniteCheckedException;
+    public void store(GridCacheContext cctx,
+        GridQueryTypeDescriptor type,
+        CacheDataRow row,
+        @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException;
 
     /**
      * Removes index entry by key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 4886b1b..3b3dec0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.StoredCacheData;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -1700,14 +1701,19 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     /**
      * @param cctx Cache context.
      * @param newRow New row.
+     * @param mvccVer Mvcc version for update.
      * @param prevRow Previous row.
      * @throws IgniteCheckedException In case of error.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    public void store(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow prevRow)
-        throws IgniteCheckedException {
+    public void store(GridCacheContext cctx,
+        CacheDataRow newRow,
+        @Nullable MvccCoordinatorVersion mvccVer,
+        @Nullable CacheDataRow prevRow) throws IgniteCheckedException
+    {
         assert cctx != null;
         assert newRow != null;
+        assert !cctx.mvccEnabled() || mvccVer != null;
 
         KeyCacheObject key = newRow.key();
 
@@ -1734,14 +1740,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                     prevRow.value(),
                     false);
 
-                if (prevValDesc != null && prevValDesc != desc)
+                if (prevValDesc != null && prevValDesc != desc) {
                     idx.remove(cctx, prevValDesc, prevRow);
+
+                    prevRow = null;
+                }
             }
 
             if (desc == null)
                 return;
 
-            idx.store(cctx, desc, newRow);
+            if (cctx.mvccEnabled()) {
+                // Add new mvcc value.
+                idx.store(cctx, desc, newRow, null);
+
+                // Set info about more recent version for previous record.
+                if (prevRow != null)
+                    idx.store(cctx, desc, prevRow, mvccVer);
+            }
+            else
+                idx.store(cctx, desc, newRow, null);
         }
         finally {
             busyLock.leaveBusy();
@@ -2304,12 +2322,14 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
     /**
      * @param cctx Cache context.
-     * @param val Row.
+     * @param val Value removed from cache.
+     * @param newVer Mvcc version for remove operation.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    public void remove(GridCacheContext cctx, CacheDataRow val)
+    public void remove(GridCacheContext cctx, CacheDataRow val, @Nullable MvccCoordinatorVersion newVer)
         throws IgniteCheckedException {
         assert val != null;
+        assert cctx.mvccEnabled() || newVer == null;
 
         if (log.isDebugEnabled())
             log.debug("Remove [cacheName=" + cctx.name() + ", key=" + val.key()+ ", val=" + val.value() + "]");
@@ -2330,7 +2350,16 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             if (desc == null)
                 return;
 
-            idx.remove(cctx, desc, val);
+            if (cctx.mvccEnabled()) {
+                if (newVer != null) {
+                    // Set info about more recent version for previous record.
+                    idx.store(cctx, desc, val, newVer);
+                }
+                else
+                    idx.remove(cctx, desc, val);
+            }
+            else
+                idx.remove(cctx, desc, val);
         }
         finally {
             busyLock.leaveBusy();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
index b0b758a..d77fb81 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientCacheInitializationFailTest.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryIndexing;
@@ -310,7 +311,8 @@ public class IgniteClientCacheInitializationFailTest extends GridCommonAbstractT
         }
 
         /** {@inheritDoc} */
-        @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow val) {
+        @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row,
+            @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException {
             // No-op.
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 999144f..1949cd2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -160,6 +161,71 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param cfgC Optional closure applied to cache configuration.
+     * @throws Exception If failed.
+     */
+    final void cacheRecreate(@Nullable IgniteInClosure<CacheConfiguration> cfgC) throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        final int PARTS = 64;
+
+        CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+        if (cfgC != null)
+            cfgC.apply(ccfg);
+
+        IgniteCache<Integer, MvccTestAccount> cache = (IgniteCache)srv0.createCache(ccfg);
+
+        for (int k = 0; k < PARTS * 2; k++) {
+            assertNull(cache.get(k));
+
+            int vals = k % 3 + 1;
+
+            for (int v = 0; v < vals; v++)
+                cache.put(k, new MvccTestAccount(v, 1));
+
+            assertEquals(vals - 1, cache.get(k).val);
+        }
+
+        srv0.destroyCache(cache.getName());
+
+        ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+        if (cfgC != null)
+            cfgC.apply(ccfg);
+
+        cache = (IgniteCache)srv0.createCache(ccfg);
+
+        for (int k = 0; k < PARTS * 2; k++) {
+            assertNull(cache.get(k));
+
+            int vals = k % 3 + 2;
+
+            for (int v = 0; v < vals; v++)
+                cache.put(k, new MvccTestAccount(v + 100, 1));
+
+            assertEquals(vals - 1 + 100, cache.get(k).val);
+        }
+
+        srv0.destroyCache(cache.getName());
+
+        ccfg = cacheConfiguration(PARTITIONED, FULL_SYNC, 0, PARTS);
+
+        IgniteCache<Long, Long> cache0 = (IgniteCache)srv0.createCache(ccfg);
+
+        for (long k = 0; k < PARTS * 2; k++) {
+            assertNull(cache0.get(k));
+
+            int vals = (int)(k % 3 + 2);
+
+            for (long v = 0; v < vals; v++)
+                cache0.put(k, v);
+
+            assertEquals((long)(vals - 1), (Object)cache0.get(k));
+        }
+    }
+
+    /**
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
@@ -332,13 +398,15 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                     Map<Integer, Integer> lastUpdateCntrs = new HashMap<>();
 
+                    SqlFieldsQuery sumQry = new SqlFieldsQuery("select sum(val) from MvccTestAccount");
+
                     while (!stop.get()) {
                         while (keys.size() < ACCOUNTS)
                             keys.add(rnd.nextInt(ACCOUNTS));
 
                         TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd);
 
-                        Map<Integer, MvccTestAccount> accounts;
+                        Map<Integer, MvccTestAccount> accounts = null;
 
                         try {
                             switch (readMode) {
@@ -378,7 +446,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                                         for (List<?> row : cache.cache.query(qry)) {
                                             Integer id = (Integer)row.get(0);
-                                            Integer val = (Integer)row.get(0);
+                                            Integer val = (Integer)row.get(1);
 
                                             MvccTestAccount old = accounts.put(id, new MvccTestAccount(val, 1));
 
@@ -389,6 +457,18 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                                     break;
                                 }
 
+                                case SQL_SUM: {
+                                    List<List<?>> res = cache.cache.query(sumQry).getAll();
+
+                                    assertEquals(1, res.size());
+
+                                    BigDecimal sum = (BigDecimal)res.get(0).get(0);
+
+                                    assertEquals(ACCOUNT_START_VAL * ACCOUNTS, sum.intValue());
+
+                                    break;
+                                }
+
                                 default: {
                                     fail();
 
@@ -400,29 +480,31 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                             cache.readUnlock();
                         }
 
-                        if (!withRmvs)
-                            assertEquals(ACCOUNTS, accounts.size());
+                        if (accounts != null) {
+                            if (!withRmvs)
+                                assertEquals(ACCOUNTS, accounts.size());
 
-                        int sum = 0;
+                            int sum = 0;
 
-                        for (int i = 0; i < ACCOUNTS; i++) {
-                            MvccTestAccount account = accounts.get(i);
+                            for (int i = 0; i < ACCOUNTS; i++) {
+                                MvccTestAccount account = accounts.get(i);
 
-                            if (account != null) {
-                                sum += account.val;
+                                if (account != null) {
+                                    sum += account.val;
 
-                                Integer cntr = lastUpdateCntrs.get(i);
+                                    Integer cntr = lastUpdateCntrs.get(i);
 
-                                if (cntr != null)
-                                    assertTrue(cntr <= account.updateCnt);
+                                    if (cntr != null)
+                                        assertTrue(cntr <= account.updateCnt);
 
-                                lastUpdateCntrs.put(i, cntr);
+                                    lastUpdateCntrs.put(i, cntr);
+                                }
+                                else
+                                    assertTrue(withRmvs);
                             }
-                            else
-                                assertTrue(withRmvs);
-                        }
 
-                        assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                            assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                        }
                     }
 
                     if (idx == 0) {
@@ -713,7 +795,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
      * @param node Node.
      * @throws Exception If failed.
      */
-    final void checkActiveQueriesCleanup(Ignite node) throws Exception {
+    protected final void checkActiveQueriesCleanup(Ignite node) throws Exception {
         final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators();
 
         assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition(
@@ -827,7 +909,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
         SCAN,
 
         /** */
-        SQL_ALL
+        SQL_ALL,
+
+        /** */
+        SQL_SUM
     }
 
     /**


[2/4] ignite git commit: ignite-3478 Mvcc support for sql indexes

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
index 85dcf50..8954de0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2LeafIO.java
@@ -17,20 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.h2.database.io;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
 
 /**
  * Leaf page for H2 row references.
  */
-public class H2LeafIO extends BPlusLeafIO<SearchRow> implements H2RowLinkIO {
+public class H2LeafIO extends AbstractH2LeafIO {
     /** */
     public static final IOVersions<H2LeafIO> VERSIONS = new IOVersions<>(
         new H2LeafIO(1)
@@ -39,36 +31,7 @@ public class H2LeafIO extends BPlusLeafIO<SearchRow> implements H2RowLinkIO {
     /**
      * @param ver Page format version.
      */
-    protected H2LeafIO(int ver) {
+    private H2LeafIO(int ver) {
         super(T_H2_REF_LEAF, ver, 8);
     }
-
-    /** {@inheritDoc} */
-    @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
-        GridH2Row row0 = (GridH2Row)row;
-
-        assert row0.link() != 0;
-
-        PageUtils.putLong(pageAddr, off, row0.link());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
-        assert srcIo == this;
-
-        PageUtils.putLong(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx));
-    }
-
-    /** {@inheritDoc} */
-    @Override public SearchRow getLookupRow(BPlusTree<SearchRow,?> tree, long pageAddr, int idx)
-        throws IgniteCheckedException {
-        long link = getLink(pageAddr, idx);
-
-        return ((H2Tree)tree).getRowFactory().getRow(link);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLink(long pageAddr, int idx) {
-        return PageUtils.getLong(pageAddr, offset(idx));
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
new file mode 100644
index 0000000..fa6978e
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasInnerIO.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ *
+ */
+class H2MvccExtrasInnerIO extends AbstractH2ExtrasInnerIO {
+    /** */
+    private final int crdVerOff;
+
+    /** */
+    private final int cntrOff;
+
+    /** */
+    private final int newCrdVerOff;
+
+    /** */
+    private final int newCntrOff;
+
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param payloadSize Payload size.
+     */
+    H2MvccExtrasInnerIO(short type, int ver, int payloadSize) {
+        super(type, ver, 40, payloadSize);
+
+        crdVerOff = payloadSize + 8;
+        cntrOff = payloadSize + 16;
+        newCrdVerOff = payloadSize + 24;
+        newCntrOff = payloadSize + 32;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + cntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
new file mode 100644
index 0000000..2448e76
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccExtrasLeafIO.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+
+/**
+ * Leaf page for H2 row references.
+ */
+class H2MvccExtrasLeafIO extends AbstractH2ExtrasLeafIO {
+    /** */
+    private final int crdVerOff;
+
+    /** */
+    private final int cntrOff;
+
+    /** */
+    private final int newCrdVerOff;
+
+    /** */
+    private final int newCntrOff;
+
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param payloadSize Payload size.
+     */
+    H2MvccExtrasLeafIO(short type, int ver, int payloadSize) {
+        super(type, ver, 40, payloadSize);
+
+        crdVerOff = payloadSize + 8;
+        cntrOff = payloadSize + 16;
+        newCrdVerOff = payloadSize + 24;
+        newCntrOff = payloadSize + 32;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + crdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + cntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCrdVerOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + newCntrOff);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
new file mode 100644
index 0000000..e64ab43
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccInnerIO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/**
+ * Inner page for H2 row references.
+ */
+public class H2MvccInnerIO extends AbstractH2InnerIO {
+    /** */
+    public static final IOVersions<H2MvccInnerIO> VERSIONS = new IOVersions<>(
+        new H2MvccInnerIO(1)
+    );
+
+    /**
+     * @param ver Page format version.
+     */
+    private H2MvccInnerIO(int ver) {
+        super(T_H2_MVCC_REF_INNER, ver, 40);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
new file mode 100644
index 0000000..a364432
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2MvccLeafIO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/**
+ *
+ */
+public class H2MvccLeafIO extends AbstractH2LeafIO {
+    /** */
+    public static final IOVersions<H2MvccLeafIO> VERSIONS = new IOVersions<>(
+        new H2MvccLeafIO(1)
+    );
+
+    /**
+     * @param ver Page format version.
+     */
+    private H2MvccLeafIO(int ver) {
+        super(T_H2_MVCC_REF_LEAF, ver, 40);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
index ce69197..d828c44 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2RowLinkIO.java
@@ -27,4 +27,37 @@ public interface H2RowLinkIO {
      * @return Row link.
      */
     public long getLink(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc coordinator version.
+     */
+    public long getMvccCoordinatorVersion(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc counter.
+     */
+    public long getMvccCounter(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc coordinator version.
+     */
+    public long getNewMvccCoordinatorVersion(long pageAddr, int idx);
+
+    /**
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @return Mvcc counter.
+     */
+    public long getNewMvccCounter(long pageAddr, int idx);
+
+    /**
+     * @return {@code True} if IO stores mvcc information.
+     */
+    public boolean storeMvccInfo();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 92b7d10..96b331a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage;
@@ -200,6 +201,12 @@ public abstract class GridH2IndexBase extends BaseIndex {
     public abstract GridH2Row put(GridH2Row row);
 
     /**
+     * @param row Row.
+     * @return {@code True} if replaced existing row.
+     */
+    public abstract boolean putx(GridH2Row row);
+
+    /**
      * Remove row from index.
      *
      * @param row Row.
@@ -426,7 +433,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     // This is the first request containing all the search rows.
                     assert !msg.bounds().isEmpty() : "empty bounds";
 
-                    src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter());
+                    src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter(), qctx.mvccFilter());
                 }
                 else {
                     // This is request to fetch next portion of data.
@@ -1469,20 +1476,28 @@ public abstract class GridH2IndexBase extends BaseIndex {
         /** */
         final IndexingQueryFilter filter;
 
+        /** */
+        private final H2TreeMvccFilterClosure mvccFilter;
+
         /** Iterator. */
         Iterator<GridH2Row> iter = emptyIterator();
 
         /**
          * @param bounds Bounds.
+         * @param segment Segment.
          * @param filter Filter.
+         * @param mvccFilter Mvcc filter.
          */
         RangeSource(
             Iterable<GridH2RowRangeBounds> bounds,
             int segment,
-            IndexingQueryFilter filter
+            IndexingQueryFilter filter,
+            H2TreeMvccFilterClosure mvccFilter
         ) {
             this.segment = segment;
             this.filter = filter;
+            this.mvccFilter = mvccFilter;
+
             boundsIter = bounds.iterator();
         }
 
@@ -1540,7 +1555,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
                 IgniteTree t = treeForRead(segment);
 
-                iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter));
+                iter = new CursorIteratorWrapper(doFind0(t, first, last, filter, mvccFilter));
 
                 if (!iter.hasNext()) {
                     // We have to return empty range here.
@@ -1565,17 +1580,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param t Tree.
      * @param first Lower bound.
-     * @param includeFirst Whether lower bound should be inclusive.
      * @param last Upper bound always inclusive.
      * @param filter Filter.
+     * @param mvccFilter Mvcc filter.
      * @return Iterator over rows in given range.
      */
     protected GridCursor<GridH2Row> doFind0(
         IgniteTree t,
         @Nullable SearchRow first,
-        boolean includeFirst,
         @Nullable SearchRow last,
-        IndexingQueryFilter filter) {
+        IndexingQueryFilter filter,
+        H2TreeMvccFilterClosure mvccFilter) {
         throw new UnsupportedOperationException();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
index e855536..62b459a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOnheap.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -56,17 +58,24 @@ public class GridH2KeyValueRowOnheap extends GridH2Row {
     /** */
     private Value ver;
 
+    /** */
+    private final MvccCoordinatorVersion newVer;
+
     /**
      * Constructor.
      *
      * @param desc Row descriptor.
      * @param row Row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @param keyType Key type.
      * @param valType Value type.
      * @throws IgniteCheckedException If failed.
      */
-    public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc, CacheDataRow row, int keyType, int valType)
-        throws IgniteCheckedException {
+    public GridH2KeyValueRowOnheap(GridH2RowDescriptor desc,
+        CacheDataRow row,
+        MvccCoordinatorVersion newVer,
+        int keyType,
+        int valType) throws IgniteCheckedException {
         super(row);
 
         this.desc = desc;
@@ -78,6 +87,23 @@ public class GridH2KeyValueRowOnheap extends GridH2Row {
 
         if (row.version() != null)
             this.ver = desc.wrap(row.version(), Value.JAVA_OBJECT);
+
+        this.newVer = newVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long newMvccCoordinatorVersion() {
+        return newVer != null ? newVer.coordinatorVersion() : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long newMvccCounter() {
+        return newVer != null ? newVer.counter(): CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean indexSearchRow() {
+        return false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
index 5e09a86..38ad9d0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2MetaTable.java
@@ -284,6 +284,11 @@ public class GridH2MetaTable extends TableBase {
                     throw new IllegalStateException("Index: " + idx);
             }
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return false; // TODO IGNITE-3478, check meta table with mvcc.
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
index fd8a613..d24dc08 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2PlainRowFactory.java
@@ -70,7 +70,7 @@ public class GridH2PlainRowFactory extends RowFactory {
         /**
          * @param key Key.
          */
-        public RowKey(Value key) {
+        RowKey(Value key) {
             this.key = key;
         }
 
@@ -92,6 +92,11 @@ public class GridH2PlainRowFactory extends RowFactory {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(RowKey.class, this);
         }
@@ -138,6 +143,11 @@ public class GridH2PlainRowFactory extends RowFactory {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(RowPair.class, this);
         }
@@ -174,6 +184,11 @@ public class GridH2PlainRowFactory extends RowFactory {
         }
 
         /** {@inheritDoc} */
+        @Override public boolean indexSearchRow() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(RowSimple.class, this);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 91f0aef..b490179 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
@@ -83,6 +84,9 @@ public class GridH2QueryContext {
     /** */
     private GridH2CollocationModel qryCollocationMdl;
 
+    /** */
+    private H2TreeMvccFilterClosure mvccFilter;
+
     /**
      * @param locNodeId Local node ID.
      * @param nodeId The node who initiated the query.
@@ -102,13 +106,34 @@ public class GridH2QueryContext {
      * @param segmentId Index segment ID.
      * @param type Query type.
      */
-    public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+    public GridH2QueryContext(UUID locNodeId,
+        UUID nodeId,
+        long qryId,
+        int segmentId,
+        GridH2QueryType type) {
         assert segmentId == 0 || type == MAP;
 
         key = new Key(locNodeId, nodeId, qryId, segmentId, type);
     }
 
     /**
+     * @return Mvcc version.
+     */
+    @Nullable public H2TreeMvccFilterClosure mvccFilter() {
+        return mvccFilter;
+    }
+
+    /**
+     * @param mvccFilter Mvcc filter.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext mvccFilter(H2TreeMvccFilterClosure mvccFilter) {
+        this.mvccFilter = mvccFilter;
+
+        return this;
+    }
+
+    /**
      * @return Type.
      */
     public GridH2QueryType type() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 54e0417..785b791 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.h2.opt;
 
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
@@ -88,16 +89,35 @@ public abstract class GridH2Row extends GridH2SearchRowAdapter implements CacheD
 
     /** {@inheritDoc} */
     @Override public long mvccCoordinatorVersion() {
-        throw new UnsupportedOperationException();
+        return row.mvccCoordinatorVersion();
     }
 
     /** {@inheritDoc} */
     @Override public long mvccCounter() {
-        throw new UnsupportedOperationException();
+        return row.mvccCounter();
     }
 
     /** {@inheritDoc} */
     @Override public boolean removed() {
         throw new UnsupportedOperationException();
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean indexSearchRow() {
+        return false;
+    }
+
+    /**
+     * @return Part of new mvcc version.
+     */
+    public long newMvccCoordinatorVersion() {
+        return 0;
+    }
+
+    /**
+     * @return Part of new mvcc version.
+     */
+    public long newMvccCounter() {
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 1d915e5..ad91deb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -29,6 +29,7 @@ import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
@@ -58,6 +59,7 @@ import org.h2.value.ValueString;
 import org.h2.value.ValueTime;
 import org.h2.value.ValueTimestamp;
 import org.h2.value.ValueUuid;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.KEY_COL;
@@ -273,17 +275,21 @@ public class GridH2RowDescriptor {
      * Creates new row.
      *
      * @param dataRow Data row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @return Row.
      * @throws IgniteCheckedException If failed.
      */
-    public GridH2Row createRow(CacheDataRow dataRow) throws IgniteCheckedException {
+    public GridH2Row createRow(CacheDataRow dataRow, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException {
         GridH2Row row;
 
         try {
-            if (dataRow.value() == null) // Only can happen for remove operation, can create simple search row.
+            if (dataRow.value() == null) { // Only can happen for remove operation, can create simple search row.
+                assert newVer == null;
+
                 row = new GridH2KeyRowOnheap(dataRow, wrap(dataRow.key(), keyType));
+            }
             else
-                row = new GridH2KeyValueRowOnheap(this, dataRow, keyType, valType);
+                row = new GridH2KeyValueRowOnheap(this, dataRow, newVer, keyType, valType);
         }
         catch (ClassCastException e) {
             throw new IgniteCheckedException("Failed to convert key to SQL type. " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
new file mode 100644
index 0000000..4b3940c
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRow.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.h2.result.Row;
+
+/**
+ *
+ */
+public interface GridH2SearchRow extends Row {
+    /**
+     * @return Mvcc coordinator version.
+     */
+    public long mvccCoordinatorVersion();
+
+    /**
+     * @return Mvcc counter.
+     */
+    public long mvccCounter();
+
+    /**
+     * @return {@code True} for rows used for index search (as opposed to rows stored in {@link H2Tree}.
+     */
+    public boolean indexSearchRow();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
index 24a90b3..4fc8ee5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SearchRowAdapter.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.opt;
 
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.store.Data;
@@ -25,7 +26,7 @@ import org.h2.value.Value;
 /**
  * Dummy H2 search row adadpter.
  */
-public abstract class GridH2SearchRowAdapter implements Row {
+public abstract class GridH2SearchRowAdapter implements GridH2SearchRow {
     /** {@inheritDoc} */
     @Override public void setKeyAndVersion(SearchRow old) {
         throw new UnsupportedOperationException();
@@ -100,4 +101,14 @@ public abstract class GridH2SearchRowAdapter implements Row {
     @Override public Value[] getValueList() {
         throw new UnsupportedOperationException();
     }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCounter() {
+        return CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 6c353e9..ca9c1f5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -28,6 +28,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -399,15 +401,16 @@ public class GridH2Table extends TableBase {
      * otherwise value and expiration time will be updated or new row will be added.
      *
      * @param row Row.
+     * @param newVer Version of new mvcc value inserted for the same key.
      * @param rmv If {@code true} then remove, else update row.
      * @return {@code true} If operation succeeded.
      * @throws IgniteCheckedException If failed.
      */
-    public boolean update(CacheDataRow row, boolean rmv)
+    public boolean update(CacheDataRow row, @Nullable MvccCoordinatorVersion newVer, boolean rmv)
         throws IgniteCheckedException {
         assert desc != null;
 
-        GridH2Row h2Row = desc.createRow(row);
+        GridH2Row h2Row = desc.createRow(row, newVer);
 
         if (rmv)
             return doUpdate(h2Row, true);
@@ -454,6 +457,8 @@ public class GridH2Table extends TableBase {
      */
     @SuppressWarnings("LockAcquiredButNotSafelyReleased")
     private boolean doUpdate(final GridH2Row row, boolean del) throws IgniteCheckedException {
+        assert !cctx.mvccEnabled() || row.mvccCounter() != CacheCoordinatorsProcessor.MVCC_COUNTER_NA : row;
+
         // Here we assume that each key can't be updated concurrently and case when different indexes
         // getting updated from different threads with different rows with the same key is impossible.
         lock(false);
@@ -466,10 +471,25 @@ public class GridH2Table extends TableBase {
             if (!del) {
                 assert rowFactory == null || row.link() != 0 : row;
 
-                GridH2Row old = pk.put(row); // Put to PK.
+                GridH2Row old;
+
+                // Put to PK.
+                if (cctx.mvccEnabled()) {
+                    boolean replaced = pk.putx(row);
+
+                    assert replaced == (row.newMvccCoordinatorVersion() != 0);
 
-                if (old == null)
-                    size.increment();
+                    old = null;
+
+                    if (!replaced)
+                        size.increment();
+                }
+                else {
+                    old = pk.put(row);
+
+                    if (old == null)
+                        size.increment();
+                }
 
                 int len = idxs.size();
 
@@ -536,17 +556,24 @@ public class GridH2Table extends TableBase {
     private void addToIndex(GridH2IndexBase idx, Index pk, GridH2Row row, GridH2Row old, boolean tmp) {
         assert !idx.getIndexType().isUnique() : "Unique indexes are not supported: " + idx;
 
-        GridH2Row old2 = idx.put(row);
+        if (idx.ctx.mvccEnabled()) {
+            boolean replaced = idx.putx(row);
 
-        if (old2 != null) { // Row was replaced in index.
-            if (!tmp) {
-                if (!eq(pk, old2, old))
-                    throw new IllegalStateException("Row conflict should never happen, unique indexes are " +
-                        "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']');
+            assert replaced == (row.newMvccCoordinatorVersion() != 0);
+        }
+        else {
+            GridH2Row old2 = idx.put(row);
+
+            if (old2 != null) { // Row was replaced in index.
+                if (!tmp) {
+                    if (!eq(pk, old2, old))
+                        throw new IllegalStateException("Row conflict should never happen, unique indexes are " +
+                                "not supported [idx=" + idx + ", old=" + old + ", old2=" + old2 + ']');
+                }
             }
+            else if (old != null) // Row was not replaced, need to remove manually.
+                idx.removex(old);
         }
-        else if (old != null) // Row was not replaced, need to remove manually.
-            idx.removex(old);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 77b928f..fe21b1d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
@@ -482,7 +484,8 @@ public class GridMapQueryExecutor {
                     false, // Replicated is always false here (see condition above).
                     req.timeout(),
                     params,
-                    true); // Lazy = true.
+                    true,
+                    req.mvccVersion()); // Lazy = true.
             }
             else {
                 ctx.closure().callLocal(
@@ -504,7 +507,8 @@ public class GridMapQueryExecutor {
                                 false,
                                 req.timeout(),
                                 params,
-                                false); // Lazy = false.
+                                false,
+                                req.mvccVersion()); // Lazy = false.
 
                             return null;
                         }
@@ -528,7 +532,8 @@ public class GridMapQueryExecutor {
             replicated,
             req.timeout(),
             params,
-            lazy);
+            lazy,
+            req.mvccVersion());
     }
 
     /**
@@ -544,6 +549,7 @@ public class GridMapQueryExecutor {
      * @param pageSize Page size.
      * @param distributedJoinMode Query distributed join mode.
      * @param lazy Streaming flag.
+     * @param mvccVer Mvcc version.
      */
     private void onQueryRequest0(
         final ClusterNode node,
@@ -561,7 +567,8 @@ public class GridMapQueryExecutor {
         final boolean replicated,
         final int timeout,
         final Object[] params,
-        boolean lazy
+        boolean lazy,
+        @Nullable final MvccCoordinatorVersion mvccVer
     ) {
         if (lazy && MapQueryLazyWorker.currentWorker() == null) {
             // Lazy queries must be re-submitted to dedicated workers.
@@ -570,8 +577,24 @@ public class GridMapQueryExecutor {
 
             worker.submit(new Runnable() {
                 @Override public void run() {
-                    onQueryRequest0(node, reqId, segmentId, schemaName, qrys, cacheIds, topVer, partsMap, parts,
-                        pageSize, distributedJoinMode, enforceJoinOrder, replicated, timeout, params, true);
+                    onQueryRequest0(
+                        node,
+                        reqId,
+                        segmentId,
+                        schemaName,
+                        qrys,
+                        cacheIds,
+                        topVer,
+                        partsMap,
+                        parts,
+                        pageSize,
+                        distributedJoinMode,
+                        enforceJoinOrder,
+                        replicated,
+                        timeout,
+                        params,
+                        true,
+                        mvccVer);
                 }
             });
 
@@ -639,6 +662,9 @@ public class GridMapQueryExecutor {
                 .topologyVersion(topVer)
                 .reservations(reserved);
 
+            if (mvccVer != null)
+                qctx.mvccFilter(new H2TreeMvccFilterClosure(mvccVer));
+
             Connection conn = h2.connectionForSchema(schemaName);
 
             H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
index 1c0efb3..4518d14 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
@@ -25,8 +25,10 @@ import java.util.NoSuchElementException;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.h2.index.Cursor;
 import org.h2.result.Row;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects.
@@ -59,6 +61,9 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
     /** Whether remote resources were released. */
     private boolean released;
 
+    /** */
+    private MvccQueryTracker mvccTracker;
+
     /**
      * Constructor.
      *
@@ -69,14 +74,19 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
      * @param distributedJoins Distributed joins.
      * @throws IgniteCheckedException if failed.
      */
-    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run,
-        long qryReqId, boolean distributedJoins)
+    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec,
+        Collection<ClusterNode> nodes,
+        ReduceQueryRun run,
+        long qryReqId,
+        boolean distributedJoins,
+        @Nullable MvccQueryTracker mvccTracker)
         throws IgniteCheckedException {
         this.rdcExec = rdcExec;
         this.nodes = nodes;
         this.run = run;
         this.qryReqId = qryReqId;
         this.distributedJoins = distributedJoins;
+        this.mvccTracker = mvccTracker;
 
         this.idxIter = run.indexes().iterator();
 
@@ -155,7 +165,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable {
     private void releaseIfNeeded() {
         if (!released) {
             try {
-                rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins);
+                rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins, mvccTracker);
             }
             finally {
                 released = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index f85cd94..80b1970 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -59,6 +59,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -83,11 +85,13 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryReq
 import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
@@ -562,6 +566,31 @@ public class GridReduceQueryExecutor {
 
             List<Integer> cacheIds = qry.cacheIds();
 
+            MvccQueryTracker mvccTracker = null;
+
+            // TODO IGNITE-3478.
+            if (qry.mvccEnabled()) {
+                assert !cacheIds.isEmpty();
+
+                final GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+                mvccTracker = new MvccQueryTracker(cacheContext(cacheIds.get(0)), true,
+                    new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() {
+                    @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) {
+                        fut.onDone(null, e);
+                    }
+                });
+
+                mvccTracker.requestVersion(topVer);
+
+                try {
+                    fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new CacheException(e);
+                }
+            }
+
             Collection<ClusterNode> nodes = null;
 
             // Explicit partition mapping for unstable topology.
@@ -730,6 +759,9 @@ public class GridReduceQueryExecutor {
                     .timeout(timeoutMillis)
                     .schemaName(schemaName);
 
+                if (mvccTracker != null)
+                    req.mvccVersion(mvccTracker.mvccVersion());
+
                 if (send(nodes, req, parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), false)) {
                     awaitAllReplies(r, nodes, cancel);
 
@@ -763,7 +795,12 @@ public class GridReduceQueryExecutor {
 
                 if (!retry) {
                     if (skipMergeTbl) {
-                        resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins());
+                        resIter = new GridMergeIndexIterator(this,
+                            finalNodes,
+                            r,
+                            qryReqId,
+                            qry.distributedJoins(),
+                            mvccTracker);
 
                         release = false;
                     }
@@ -833,7 +870,7 @@ public class GridReduceQueryExecutor {
             }
             finally {
                 if (release) {
-                    releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins());
+                    releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker);
 
                     if (!skipMergeTbl) {
                         for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
@@ -1028,7 +1065,10 @@ public class GridReduceQueryExecutor {
      * @param distributedJoins Distributed join flag.
      */
     public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId,
-        boolean distributedJoins) {
+        boolean distributedJoins, MvccQueryTracker mvccTracker) {
+        if (mvccTracker != null)
+            mvccTracker.onQueryDone();
+
         // For distributedJoins need always send cancel request to cleanup resources.
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 4e1fadb..347b88c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.QueryTable;
@@ -42,6 +43,7 @@ import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
 
@@ -133,6 +135,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     /** Schema name. */
     private String schemaName;
 
+    /** */
+    private MvccCoordinatorVersion mvccVer;
+
     /**
      * Required by {@link Externalizable}
      */
@@ -157,6 +162,24 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
         params = req.params;
         paramsBytes = req.paramsBytes;
         schemaName = req.schemaName;
+        mvccVer = req.mvccVer;
+    }
+
+    /**
+     * @return Mvcc version.
+     */
+    @Nullable public MvccCoordinatorVersion mvccVersion() {
+        return mvccVer;
+    }
+
+    /**
+     * @param mvccVer Mvcc version.
+     * @return {@code this}.
+     */
+    public GridH2QueryRequest mvccVersion(MvccCoordinatorVersion mvccVer) {
+        this.mvccVer = mvccVer;
+
+        return this;
     }
 
     /**
@@ -435,65 +458,71 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeInt("pageSize", pageSize))
+                if (!writer.writeMessage("mvccVer", mvccVer))
                     return false;
 
                 writer.incrementState();
 
             case 3:
-                if (!writer.writeByteArray("paramsBytes", paramsBytes))
+                if (!writer.writeInt("pageSize", pageSize))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
+                if (!writer.writeByteArray("paramsBytes", paramsBytes))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
+                if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeLong("reqId", reqId))
+                if (!writer.writeIntArray("qryParts", qryParts))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeInt("timeout", timeout))
+                if (!writer.writeLong("reqId", reqId))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeString("schemaName", schemaName))
                     return false;
 
                 writer.incrementState();
 
-
             case 10:
-                if (!writer.writeIntArray("qryParts", qryParts))
+                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeString("schemaName", schemaName))
+                if (!writer.writeInt("timeout", timeout))
                     return false;
 
                 writer.incrementState();
+
+            case 12:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -524,7 +553,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 2:
-                pageSize = reader.readInt("pageSize");
+                mvccVer = reader.readMessage("mvccVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -532,7 +561,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 3:
-                paramsBytes = reader.readByteArray("paramsBytes");
+                pageSize = reader.readInt("pageSize");
 
                 if (!reader.isLastRead())
                     return false;
@@ -540,7 +569,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 4:
-                parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
+                paramsBytes = reader.readByteArray("paramsBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -548,7 +577,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 5:
-                qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
+                parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
 
                 if (!reader.isLastRead())
                     return false;
@@ -556,7 +585,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 6:
-                reqId = reader.readLong("reqId");
+                qryParts = reader.readIntArray("qryParts");
 
                 if (!reader.isLastRead())
                     return false;
@@ -564,7 +593,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 7:
-                tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
+                qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -572,7 +601,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 8:
-                timeout = reader.readInt("timeout");
+                reqId = reader.readLong("reqId");
 
                 if (!reader.isLastRead())
                     return false;
@@ -580,16 +609,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 9:
-                topVer = reader.readMessage("topVer");
+                schemaName = reader.readString("schemaName");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
 
-
             case 10:
-                qryParts = reader.readIntArray("qryParts");
+                tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -597,12 +625,21 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 11:
-                schemaName = reader.readString("schemaName");
+                timeout = reader.readInt("timeout");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
+
+            case 12:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -615,7 +652,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 12;
+        return 13;
     }
 
     /** {@inheritDoc} */


[3/4] ignite git commit: ignite-3478 Mvcc support for sql indexes

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index dbe4ce5..df9f21e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -36,26 +36,24 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.TouchedExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.TestCacheNodeExcludingFilter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -275,6 +273,13 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testCacheRecreate() throws Exception {
+        cacheRecreate(null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testActiveQueriesCleanup() throws Exception {
         activeQueriesCleanup(false);
     }
@@ -3626,6 +3631,67 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
                     checkRow(cctx, row, key0, vers.get(v + 1).get1());
             }
         }
+
+        KeyCacheObject key = cctx.toCacheKeyObject(KEYS);
+
+        cache.put(key, 0);
+
+        cache.remove(key);
+
+        cctx.offheap().mvccRemoveAll((GridCacheMapEntry)cctx.cache().entryEx(key));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExpiration() throws Exception {
+        final IgniteEx node = startGrid(0);
+
+        IgniteCache cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+        final IgniteCache expiryCache =
+            cache.withExpiryPolicy(new TouchedExpiryPolicy(new Duration(TimeUnit.SECONDS, 1)));
+
+        for (int i = 0; i < 10; i++)
+            expiryCache.put(1, i);
+
+        assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return expiryCache.localPeek(1) == null;
+            }
+        }, 5000));
+
+        for (int i = 0; i < 11; i++) {
+            if (i % 2 == 0)
+                expiryCache.put(1, i);
+            else
+                expiryCache.remove(1);
+        }
+
+        assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                return expiryCache.localPeek(1) == null;
+            }
+        }, 5000));
+
+        expiryCache.put(1, 1);
+
+        assertTrue("Failed to wait for expiration", GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    GridCacheContext cctx = node.context().cache().context().cacheContext(CU.cacheId(DEFAULT_CACHE_NAME));
+
+                    KeyCacheObject key = cctx.toCacheKeyObject(1);
+
+                    return cctx.offheap().read(cctx, key) == null;
+                }
+                catch (Exception e) {
+                    fail();
+
+                    return false;
+                }
+            }
+        }, 5000));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 27804d9..335279f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -18,11 +18,14 @@
 package org.apache.ignite.internal.processors.database;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
@@ -218,6 +221,53 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     /**
      * @throws IgniteCheckedException If failed.
      */
+    public void testFindWithClosure() throws IgniteCheckedException {
+        TestTree tree = createTestTree(true);
+        TreeMap<Long, Long> map = new TreeMap<>();
+
+        long size = CNT * CNT;
+
+        for (long i = 1; i <= size; i++) {
+            tree.put(i);
+            map.put(i, i);
+        }
+
+        checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.<Long>emptySet()), null),
+            Collections.<Long>emptyList().iterator());
+
+        checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(map.keySet()), null),
+            map.values().iterator());
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        for (int i = 0; i < 100; i++) {
+            Long val = rnd.nextLong(size) + 1;
+
+            checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(Collections.singleton(val)), null),
+                Collections.singleton(val).iterator());
+        }
+
+        for (int i = 0; i < 200; i++) {
+            long vals = rnd.nextLong(size) + 1;
+
+            TreeSet<Long> exp = new TreeSet<>();
+
+            for (long k = 0; k < vals; k++)
+                exp.add(rnd.nextLong(size) + 1);
+
+            checkCursor(tree.find(null, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+
+            checkCursor(tree.find(0L, null, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+
+            checkCursor(tree.find(0L, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+
+            checkCursor(tree.find(null, size, new TestTreeFindFilteredClosure(exp), null), exp.iterator());
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
     public void _testBenchInvoke() throws IgniteCheckedException {
         MAX_PER_PAGE = 10;
 
@@ -625,12 +675,12 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @param tree
-     * @param lower
-     * @param upper
-     * @param exp
-     * @param expFound
-     * @throws IgniteCheckedException
+     * @param tree Tree.
+     * @param lower Lower bound.
+     * @param upper Upper bound.
+     * @param exp Value to find.
+     * @param expFound {@code True} if value should be found.
+     * @throws IgniteCheckedException If failed.
      */
     private void checkIterate(TestTree tree, long lower, long upper, Long exp, boolean expFound)
         throws IgniteCheckedException {
@@ -641,6 +691,14 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
         assertEquals(expFound, c.found);
     }
 
+    /**
+     * @param tree Tree.
+     * @param lower Lower bound.
+     * @param upper Upper bound.
+     * @param c Closure.
+     * @param expFound {@code True} if value should be found.
+     * @throws IgniteCheckedException If failed.
+     */
     private void checkIterateC(TestTree tree, long lower, long upper, TestTreeRowClosure c, boolean expFound)
         throws IgniteCheckedException {
         c.found = false;
@@ -1307,7 +1365,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testIterateConcurrentPutRemove() throws Exception {
-        findOneBoundedConcurrentPutRemove();
+        iterateConcurrentPutRemove();
     }
 
     /**
@@ -1316,7 +1374,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     public void testIterateConcurrentPutRemove_1() throws Exception {
         MAX_PER_PAGE = 1;
 
-        findOneBoundedConcurrentPutRemove();
+        iterateConcurrentPutRemove();
     }
 
     /**
@@ -1325,7 +1383,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     public void testIterateConcurrentPutRemove_5() throws Exception {
         MAX_PER_PAGE = 5;
 
-        findOneBoundedConcurrentPutRemove();
+        iterateConcurrentPutRemove();
     }
 
     /**
@@ -1334,13 +1392,13 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     public void testIteratePutRemove_10() throws Exception {
         MAX_PER_PAGE = 10;
 
-        findOneBoundedConcurrentPutRemove();
+        iterateConcurrentPutRemove();
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void findOneBoundedConcurrentPutRemove() throws Exception {
+    private void iterateConcurrentPutRemove() throws Exception {
         final TestTree tree = createTestTree(true);
 
         final int KEYS = 10_000;
@@ -1474,7 +1532,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     *
+     * @throws Exception If failed.
      */
     public void testConcurrentGrowDegenerateTreeAndConcurrentRemove() throws Exception {
         //calculate tree size when split happens
@@ -2132,6 +2190,7 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
         /** */
         private Long val;
 
+
         /** {@inheritDoc} */
         @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
             throws IgniteCheckedException {
@@ -2142,4 +2201,27 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
             return false;
         }
     }
+
+    /**
+     *
+     */
+    static class TestTreeFindFilteredClosure implements BPlusTree.TreeRowClosure<Long, Long> {
+        /** */
+        private final Set<Long> vals;
+
+        /**
+         * @param vals Values to allow in filter.
+         */
+        TestTreeFindFilteredClosure(Set<Long> vals) {
+            this.vals = vals;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean apply(BPlusTree<Long, Long> tree, BPlusIO<Long> io, long pageAddr, int idx)
+            throws IgniteCheckedException {
+            Long val = io.getLookupRow(tree, pageAddr, idx);
+
+            return vals.contains(val);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
----------------------------------------------------------------------
diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
index 2cd36b3..fce18f1 100644
--- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
+++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java
@@ -200,6 +200,11 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean putx(GridH2Row row) {
+        return put(row) != null;
+    }
+
     /**
      * @param row Row.
      * @param rowId Row id.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 4a93aaf..f5c5e60 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -65,6 +65,9 @@ public class GridCacheTwoStepQuery {
     /** */
     private CacheQueryPartitionInfo[] derivedPartitions;
 
+    /** */
+    private boolean mvccEnabled;
+
     /**
      * @param originalSql Original query SQL.
      * @param tbls Tables in query.
@@ -241,6 +244,7 @@ public class GridCacheTwoStepQuery {
         cp.distributedJoins = distributedJoins;
         cp.derivedPartitions = derivedPartitions;
         cp.local = local;
+        cp.mvccEnabled = mvccEnabled;
 
         for (int i = 0; i < mapQrys.size(); i++)
             cp.mapQrys.add(mapQrys.get(i).copy());
@@ -262,6 +266,20 @@ public class GridCacheTwoStepQuery {
         return tbls;
     }
 
+    /**
+     * @return Mvcc flag.
+     */
+    public boolean mvccEnabled() {
+        return mvccEnabled;
+    }
+
+    /**
+     * @param mvccEnabled Mvcc flag.
+     */
+    public void mvccEnabled(boolean mvccEnabled) {
+        this.mvccEnabled = mvccEnabled;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index eed1f19..6dc93c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.query.CacheQueryPartitionInfo;
@@ -98,6 +99,8 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerI
 import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccInnerIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2MvccLeafIO;
 import org.apache.ignite.internal.processors.query.h2.ddl.DdlStatementsProcessor;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
@@ -183,7 +186,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * Register IO for indexes.
      */
     static {
-        PageIO.registerH2(H2InnerIO.VERSIONS, H2LeafIO.VERSIONS);
+        PageIO.registerH2(H2InnerIO.VERSIONS, H2LeafIO.VERSIONS, H2MvccInnerIO.VERSIONS, H2MvccLeafIO.VERSIONS);
         H2ExtrasInnerIO.register();
         H2ExtrasLeafIO.register();
 
@@ -537,8 +540,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public void store(GridCacheContext cctx, GridQueryTypeDescriptor type, CacheDataRow row)
-        throws IgniteCheckedException {
+    @Override public void store(GridCacheContext cctx,
+        GridQueryTypeDescriptor type,
+        CacheDataRow row,
+        @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException
+    {
         String cacheName = cctx.name();
 
         H2TableDescriptor tbl = tableDescriptor(schema(cacheName), cacheName, type.name());
@@ -546,7 +552,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (tbl == null)
             return; // Type was rejected.
 
-        tbl.table().update(row, false);
+        tbl.table().update(row, newVer, false);
 
         if (tbl.luceneIndex() != null) {
             long expireTime = row.expireTime();
@@ -575,7 +581,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (tbl == null)
             return;
 
-        if (tbl.table().update(row, true)) {
+        if (tbl.table().update(row, null, true)) {
             if (tbl.luceneIndex() != null)
                 tbl.luceneIndex().remove(row.key());
         }
@@ -671,7 +677,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             SchemaIndexCacheVisitorClosure clo = new SchemaIndexCacheVisitorClosure() {
                 @Override public void apply(CacheDataRow row) throws IgniteCheckedException {
-                    GridH2Row h2Row = rowDesc.createRow(row);
+                    GridH2Row h2Row = rowDesc.createRow(row, null);
 
                     h2Idx.put(h2Row);
                 }
@@ -1546,7 +1552,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (partitions == null && twoStepQry.derivedPartitions() != null) {
             try {
                 partitions = calculateQueryPartitions(twoStepQry.derivedPartitions(), args);
-            } catch (IgniteCheckedException e) {
+            }
+            catch (IgniteCheckedException e) {
                 throw new CacheException("Failed to calculate derived partitions: [qry=" + sqlQry + ", params=" +
                     Arrays.deepToString(args) + "]", e);
             }
@@ -1585,9 +1592,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @param cacheIds Cache IDs.
+     * @param twoStepQry Query.
      * @throws IllegalStateException if segmented indices used with non-segmented indices.
      */
-    private void checkCacheIndexSegmentation(List<Integer> cacheIds) {
+    private void processCaches(List<Integer> cacheIds, GridCacheTwoStepQuery twoStepQry) {
         if (cacheIds.isEmpty())
             return; // Nothing to check
 
@@ -1595,11 +1604,21 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         int expectedParallelism = 0;
 
-        for (Integer cacheId : cacheIds) {
+        boolean mvccEnabled = false;
+
+        for (int i = 0; i < cacheIds.size(); i++) {
+            Integer cacheId = cacheIds.get(i);
+
             GridCacheContext cctx = sharedCtx.cacheContext(cacheId);
 
             assert cctx != null;
 
+            if (i == 0)
+                mvccEnabled = cctx.mvccEnabled();
+            else if (cctx.mvccEnabled() != mvccEnabled)
+                throw new IllegalStateException("Using caches with different mvcc settings in same query is " +
+                    "forbidden.");
+
             if (!cctx.isPartitioned())
                 continue;
 
@@ -1610,6 +1629,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     "forbidden.");
             }
         }
+
+        twoStepQry.mvccEnabled(mvccEnabled);
     }
 
     /**
@@ -2522,7 +2543,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             //Prohibit usage indices with different numbers of segments in same query.
             List<Integer> cacheIds = new ArrayList<>(caches0);
 
-            checkCacheIndexSegmentation(cacheIds);
+            processCaches(cacheIds, twoStepQry);
 
             return cacheIds;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index 59bf153..9a99c62 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -124,13 +124,20 @@ public class H2PkHashIndex extends GridH2IndexBase {
     @SuppressWarnings("StatementWithEmptyBody")
     @Override public GridH2Row put(GridH2Row row) {
         // Should not be called directly. Rows are inserted into underlying cache data stores.
-
         assert false;
 
         throw DbException.getUnsupportedException("put");
     }
 
     /** {@inheritDoc} */
+    @Override public boolean putx(GridH2Row row) {
+        // Should not be called directly. Rows are inserted into underlying cache data stores.
+        assert false;
+
+        throw DbException.getUnsupportedException("putx");
+    }
+
+    /** {@inheritDoc} */
     @Override public GridH2Row remove(SearchRow row) {
         // Should not be called directly. Rows are removed from underlying cache data stores.
 
@@ -197,7 +204,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
             try {
                 CacheDataRow dataRow = cursor.get();
 
-                return tbl.rowDescriptor().createRow(dataRow);
+                return tbl.rowDescriptor().createRow(dataRow, null);
             }
             catch (IgniteCheckedException e) {
                 throw DbException.convert(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
index 40b9b0a..e9ec9e6 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2RowFactory.java
@@ -18,9 +18,9 @@
 package org.apache.ignite.internal.processors.query.h2.database;
 
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
+import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 
@@ -60,17 +60,29 @@ public class H2RowFactory {
 
         rowBuilder.initFromLink(cctx.group(), CacheDataRowAdapter.RowData.FULL);
 
-        GridH2Row row;
-
-        try {
-            row = rowDesc.createRow(rowBuilder);
-        }
-        catch (IgniteCheckedException e) {
-            throw new IgniteException(e);
-        }
+        GridH2Row row = rowDesc.createRow(rowBuilder, null);
 
         assert row.version() != null;
 
         return row;
     }
+
+    /**
+     * @param link Link.
+     * @param mvccCrdVer Mvcc coordinator version.
+     * @param mvccCntr Mvcc counter.
+     * @return Row.
+     * @throws IgniteCheckedException If failed.
+     */
+    public GridH2Row getMvccRow(long link, long mvccCrdVer, long mvccCntr) throws IgniteCheckedException {
+        MvccDataRow row = new MvccDataRow(cctx.group(),
+            0,
+            link,
+            -1, // TODO IGNITE-3478: get partition from link.
+            null,
+            mvccCrdVer,
+            mvccCntr);
+
+        return rowDesc.createRow(row, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index fcfeb16..df77f7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -29,15 +29,22 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusMeta
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerIO;
 import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.result.SearchRow;
 import org.h2.table.IndexColumn;
 import org.h2.value.Value;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  */
-public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
+public abstract class H2Tree extends BPlusTree<GridH2SearchRow, GridH2Row> {
     /** */
     private final H2RowFactory rowStore;
 
@@ -54,6 +61,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
     private final int[] columnIds;
 
     /** */
+    private final boolean mvccEnabled;
+
+    /** */
     private final Comparator<Value> comp = new Comparator<Value>() {
         @Override public int compare(Value o1, Value o2) {
             return compareValues(o1, o2);
@@ -69,9 +79,10 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
      * @param rowStore Row data store.
      * @param metaPageId Meta page ID.
      * @param initNew Initialize new index.
+     * @param mvccEnabled Mvcc flag.
      * @throws IgniteCheckedException If failed.
      */
-    protected H2Tree(
+    H2Tree(
         String name,
         ReuseList reuseList,
         int grpId,
@@ -83,7 +94,8 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
         boolean initNew,
         IndexColumn[] cols,
         List<InlineIndexHelper> inlineIdxs,
-        int inlineSize
+        int inlineSize,
+        boolean mvccEnabled
     ) throws IgniteCheckedException {
         super(name, grpId, pageMem, wal, globalRmvId, metaPageId, reuseList);
 
@@ -93,6 +105,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
         }
 
         this.inlineSize = inlineSize;
+        this.mvccEnabled = mvccEnabled;
 
         assert rowStore != null;
 
@@ -105,7 +118,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
         for (int i = 0; i < cols.length; i++)
             columnIds[i] = cols[i].column.getColumnId();
 
-        setIos(H2ExtrasInnerIO.getVersions(inlineSize), H2ExtrasLeafIO.getVersions(inlineSize));
+        setIos(H2ExtrasInnerIO.getVersions(inlineSize, mvccEnabled), H2ExtrasLeafIO.getVersions(inlineSize, mvccEnabled));
 
         initTree(initNew, inlineSize);
     }
@@ -118,7 +131,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
     }
 
     /** {@inheritDoc} */
-    @Override protected GridH2Row getRow(BPlusIO<SearchRow> io, long pageAddr, int idx, Object ignore)
+    @Override protected GridH2Row getRow(BPlusIO<GridH2SearchRow> io, long pageAddr, int idx, Object ignore)
         throws IgniteCheckedException {
         return (GridH2Row)io.getLookupRow(this, pageAddr, idx);
     }
@@ -159,8 +172,8 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
 
     /** {@inheritDoc} */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override protected int compare(BPlusIO<SearchRow> io, long pageAddr, int idx,
-        SearchRow row) throws IgniteCheckedException {
+    @Override protected int compare(BPlusIO<GridH2SearchRow> io, long pageAddr, int idx,
+        GridH2SearchRow row) throws IgniteCheckedException {
         if (inlineSize() == 0)
             return compareRows(getRow(io, pageAddr, idx), row);
         else {
@@ -195,7 +208,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
             }
 
             if (lastIdxUsed == cols.length)
-                return 0;
+                return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row);
 
             SearchRow rowData = getRow(io, pageAddr, idx);
 
@@ -207,7 +220,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
 
                 if (v2 == null) {
                     // Can't compare further.
-                    return 0;
+                    return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row);
                 }
 
                 Value v1 = rowData.getValue(idx0);
@@ -218,7 +231,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
                     return InlineIndexHelper.fixSort(c, col.sortType);
             }
 
-            return 0;
+            return mvccCompare((H2RowLinkIO)io, pageAddr, idx, row);
         }
     }
 
@@ -229,7 +242,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
      * @param r2 Row 2.
      * @return Compare result.
      */
-    private int compareRows(GridH2Row r1, SearchRow r2) {
+    private int compareRows(GridH2Row r1, GridH2SearchRow r2) {
+        assert !mvccEnabled || r2.indexSearchRow() || assertMvccVersionValid(r2.mvccCoordinatorVersion(), r2.mvccCounter()) : r2;
+
         if (r1 == r2)
             return 0;
 
@@ -241,7 +256,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
 
             if (v1 == null || v2 == null) {
                 // Can't compare further.
-                return 0;
+                return mvccCompare(r1, r2);
             }
 
             int c = compareValues(v1, v2);
@@ -250,6 +265,64 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
                 return InlineIndexHelper.fixSort(c, cols[i].sortType);
         }
 
+        return mvccCompare(r1, r2);
+    }
+
+    /**
+     * @param io IO.
+     * @param pageAddr Page address.
+     * @param idx Item index.
+     * @param r2 Search row.
+     * @return Comparison result.
+     */
+    private int mvccCompare(H2RowLinkIO io, long pageAddr, int idx, GridH2SearchRow r2) {
+        if (mvccEnabled && !r2.indexSearchRow()) {
+            long crdVer1 = io.getMvccCoordinatorVersion(pageAddr, idx);
+            long crdVer2 = r2.mvccCoordinatorVersion();
+
+            assert crdVer1 != 0;
+            assert crdVer2 != 0 : r2;
+
+            int c = Long.compare(unmaskCoordinatorVersion(crdVer1), unmaskCoordinatorVersion(crdVer2));
+
+            if (c != 0)
+                return c;
+
+            long cntr = io.getMvccCounter(pageAddr, idx);
+
+            assert cntr != MVCC_COUNTER_NA;
+            assert r2.mvccCounter() != MVCC_COUNTER_NA : r2;
+
+            return Long.compare(cntr, r2.mvccCounter());
+        }
+
+        return 0;
+    }
+
+    /**
+     * @param r1 First row.
+     * @param r2 Second row.
+     * @return Comparison result.
+     */
+    private int mvccCompare(GridH2Row r1, GridH2SearchRow r2) {
+        if (mvccEnabled && !r2.indexSearchRow()) {
+            long crdVer1 = r1.mvccCoordinatorVersion();
+            long crdVer2 = r2.mvccCoordinatorVersion();
+
+            assert crdVer1 != 0 : r1;
+            assert crdVer2 != 0 : r2;
+
+            int c = Long.compare(unmaskCoordinatorVersion(crdVer1), unmaskCoordinatorVersion(crdVer2));
+
+            if (c != 0)
+                return c;
+
+            assert r1.mvccCounter() != MVCC_COUNTER_NA : r1;
+            assert r2.mvccCounter() != MVCC_COUNTER_NA : r2;
+
+            return Long.compare(r1.mvccCounter(), r2.mvccCounter());
+        }
+
         return 0;
     }
 
@@ -259,4 +332,9 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
      * @return Comparison result.
      */
     public abstract int compareValues(Value v1, Value v2);
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(H2Tree.class, this, "super", super.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 3c0ab5e..87a6eca 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -29,7 +29,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.query.h2.H2Cursor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2IndexBase;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCursor;
@@ -118,7 +120,8 @@ public class H2TreeIndex extends GridH2IndexBase {
                     page.isAllocated(),
                     cols,
                     inlineIdxs,
-                    computeInlineSize(inlineIdxs, inlineSize)) {
+                    computeInlineSize(inlineIdxs, inlineSize),
+                    cctx.mvccEnabled()) {
                     @Override public int compareValues(Value v1, Value v2) {
                         return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2);
                     }
@@ -165,20 +168,33 @@ public class H2TreeIndex extends GridH2IndexBase {
     /** {@inheritDoc} */
     @Override public Cursor find(Session ses, SearchRow lower, SearchRow upper) {
         try {
-            IndexingQueryFilter f = threadLocalFilter();
+            assert lower == null || lower instanceof GridH2SearchRow : lower;
+            assert upper == null || upper instanceof GridH2SearchRow : upper;
+
             IndexingQueryCacheFilter p = null;
+            H2TreeMvccFilterClosure mvccFilter = null;
+
+            GridH2QueryContext qctx = GridH2QueryContext.get();
+
+            if (qctx != null) {
+                IndexingQueryFilter f = qctx.filter();
+
+                if (f != null) {
+                    String cacheName = getTable().cacheName();
 
-            if (f != null) {
-                String cacheName = getTable().cacheName();
+                    p = f.forCache(cacheName);
+                }
 
-                p = f.forCache(cacheName);
+                mvccFilter = qctx.mvccFilter();
             }
 
             int seg = threadLocalSegment();
 
             H2Tree tree = treeForRead(seg);
 
-            return new H2Cursor(tree.find(lower, upper), p);
+            assert !cctx.mvccEnabled() || mvccFilter != null;
+
+            return new H2Cursor(tree.find((GridH2SearchRow)lower, (GridH2SearchRow)upper, mvccFilter, null), p);
         }
         catch (IgniteCheckedException e) {
             throw DbException.convert(e);
@@ -205,7 +221,28 @@ public class H2TreeIndex extends GridH2IndexBase {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean putx(GridH2Row row) {
+        try {
+            InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
+
+            int seg = segmentForRow(row);
+
+            H2Tree tree = treeForRead(seg);
+
+            return tree.putx(row);
+        }
+        catch (IgniteCheckedException e) {
+            throw DbException.convert(e);
+        }
+        finally {
+            InlineIndexHelper.clearCurrentInlineIndexes();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public GridH2Row remove(SearchRow row) {
+        assert row instanceof GridH2SearchRow : row;
+
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
 
@@ -213,7 +250,7 @@ public class H2TreeIndex extends GridH2IndexBase {
 
             H2Tree tree = treeForRead(seg);
 
-            return tree.remove(row);
+            return tree.remove((GridH2SearchRow)row);
         }
         catch (IgniteCheckedException e) {
             throw DbException.convert(e);
@@ -225,6 +262,8 @@ public class H2TreeIndex extends GridH2IndexBase {
 
     /** {@inheritDoc} */
     @Override public void removex(SearchRow row) {
+        assert row instanceof GridH2SearchRow : row;
+
         try {
             InlineIndexHelper.setCurrentInlineIndexes(inlineIdxs);
 
@@ -232,7 +271,7 @@ public class H2TreeIndex extends GridH2IndexBase {
 
             H2Tree tree = treeForRead(seg);
 
-            tree.removex(row);
+            tree.removex((GridH2SearchRow)row);
         }
         catch (IgniteCheckedException e) {
             throw DbException.convert(e);
@@ -282,6 +321,17 @@ public class H2TreeIndex extends GridH2IndexBase {
 
             H2Tree tree = treeForRead(seg);
 
+            if (cctx.mvccEnabled()) {
+                GridH2QueryContext qctx = GridH2QueryContext.get();
+
+                assert qctx != null;
+
+                H2TreeMvccFilterClosure mvccFilter = qctx.mvccFilter();
+
+                assert mvccFilter != null;
+                // TODO IGNITE-3478 (support filter for first/last)
+            }
+
             GridH2Row row = b ? tree.findFirst(): tree.findLast();
 
             return new SingleRowCursor(row);
@@ -321,11 +371,13 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override protected GridCursor<GridH2Row> doFind0(
         IgniteTree t,
         @Nullable SearchRow first,
-        boolean includeFirst,
         @Nullable SearchRow last,
-        IndexingQueryFilter filter) {
+        IndexingQueryFilter filter,
+        H2TreeMvccFilterClosure mvccFilter) {
         try {
-            GridCursor<GridH2Row> range = t.find(first, last);
+            assert !cctx.mvccEnabled() || mvccFilter != null;
+
+            GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, mvccFilter, null);
 
             if (range == null)
                 return EMPTY_CURSOR;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java
new file mode 100644
index 0000000..6ae2312
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeMvccFilterClosure.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.query.h2.database.io.H2RowLinkIO;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
+/**
+ *
+ */
+public class H2TreeMvccFilterClosure implements H2Tree.TreeRowClosure<GridH2SearchRow, GridH2Row> {
+    /** */
+    private final MvccCoordinatorVersion mvccVer;
+
+    /**
+     * @param mvccVer Mvcc version.
+     */
+    public H2TreeMvccFilterClosure(MvccCoordinatorVersion mvccVer) {
+        assert mvccVer != null;
+
+        this.mvccVer = mvccVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(BPlusTree<GridH2SearchRow, GridH2Row> tree,
+        BPlusIO<GridH2SearchRow> io,
+        long pageAddr,
+        int idx)  throws IgniteCheckedException {
+        H2RowLinkIO rowIo = (H2RowLinkIO)io;
+
+        assert rowIo.storeMvccInfo() : rowIo;
+
+        long rowCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
+        assert unmaskCoordinatorVersion(rowCrdVer) == rowCrdVer : rowCrdVer;
+        assert rowCrdVer > 0 : rowCrdVer;
+
+        int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+
+        if (cmp == 0) {
+            long rowCntr = rowIo.getMvccCounter(pageAddr, idx);
+
+            cmp = Long.compare(mvccVer.counter(), rowCntr);
+
+            return cmp >= 0 &&
+                !newVersionAvailable(rowIo, pageAddr, idx) &&
+                !mvccVer.activeTransactions().contains(rowCntr);
+        }
+        else
+            return cmp > 0;
+    }
+
+    /**
+     * @param rowIo Row IO.
+     * @param pageAddr Page address.
+     * @param idx Item index.
+     * @return {@code True}
+     */
+    private boolean newVersionAvailable(H2RowLinkIO rowIo, long pageAddr, int idx) {
+        long newCrdVer = rowIo.getNewMvccCoordinatorVersion(pageAddr, idx);
+
+        if (newCrdVer == 0)
+            return false;
+
+        int cmp = Long.compare(mvccVer.coordinatorVersion(), newCrdVer);
+
+        if (cmp == 0) {
+            long newCntr = rowIo.getNewMvccCounter(pageAddr, idx);
+
+            assert assertMvccVersionValid(newCrdVer, newCntr);
+
+            return newCntr <= mvccVer.counter() && !mvccVer.activeTransactions().contains(newCntr);
+        }
+        else
+            return cmp < 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(H2TreeMvccFilterClosure.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java
new file mode 100644
index 0000000..550aade
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasInnerIO.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+
+/**
+ * Inner page for H2 row references.
+ */
+public abstract class AbstractH2ExtrasInnerIO extends BPlusInnerIO<GridH2SearchRow> implements H2RowLinkIO {
+    /** Payload size. */
+    private final int payloadSize;
+
+    /** */
+    public static void register() {
+        register(false);
+
+        register(true);
+    }
+
+    /**
+     * @param mvcc Mvcc flag.
+     */
+    private static void register(boolean mvcc) {
+        short type = mvcc ? PageIO.T_H2_EX_REF_MVCC_INNER_START : PageIO.T_H2_EX_REF_INNER_START;
+
+        for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++) {
+            IOVersions<? extends AbstractH2ExtrasInnerIO> io =
+                getVersions((short)(type + payload - 1), payload, mvcc);
+
+            PageIO.registerH2ExtraInner(io, mvcc);
+        }
+    }
+
+    /**
+     * @param payload Payload size.
+     * @param mvccEnabled Mvcc flag.
+     * @return IOVersions for given payload.
+     */
+    @SuppressWarnings("unchecked")
+    public static IOVersions<? extends BPlusInnerIO<GridH2SearchRow>> getVersions(int payload, boolean mvccEnabled) {
+        assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
+
+        if (payload == 0)
+            return mvccEnabled ? H2MvccInnerIO.VERSIONS : H2InnerIO.VERSIONS;
+        else
+            return (IOVersions<BPlusInnerIO<GridH2SearchRow>>)PageIO.getInnerVersions((short)(payload - 1), mvccEnabled);
+    }
+
+    /**
+     * @param type Type.
+     * @param payload Payload size.
+     * @param mvcc Mvcc flag.
+     * @return Instance of IO versions.
+     */
+    private static IOVersions<? extends AbstractH2ExtrasInnerIO> getVersions(short type, short payload, boolean mvcc) {
+        return new IOVersions<>(mvcc ? new H2MvccExtrasInnerIO(type, 1, payload) : new H2ExtrasInnerIO(type, 1, payload));
+    }
+
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param itemSize Item size.
+     * @param payloadSize Payload size.
+     */
+    AbstractH2ExtrasInnerIO(short type, int ver, int itemSize, int payloadSize) {
+        super(type, ver, true, itemSize + payloadSize);
+
+        this.payloadSize = payloadSize;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+        GridH2Row row0 = (GridH2Row)row;
+
+        assert row0.link() != 0 : row0;
+
+        List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
+
+        assert inlineIdxs != null : "no inline index helpers";
+
+
+        int fieldOff = 0;
+
+        for (int i = 0; i < inlineIdxs.size(); i++) {
+            InlineIndexHelper idx = inlineIdxs.get(i);
+
+            int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
+
+            if (size == 0)
+                break;
+
+            fieldOff += size;
+        }
+
+        H2IOUtils.storeRow(row0, pageAddr, off + payloadSize, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx)
+        throws IgniteCheckedException {
+        long link = getLink(pageAddr, idx);
+
+        assert link != 0;
+
+        if (storeMvccInfo()) {
+            long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCntr = getMvccCounter(pageAddr, idx);
+
+            return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+        }
+
+        return ((H2Tree)tree).getRowFactory().getRow(link);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+        int srcOff = srcIo.offset(srcIdx);
+
+        byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
+        long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
+
+        assert link != 0;
+
+        int dstOff = offset(dstIdx);
+
+        PageUtils.putBytes(dstPageAddr, dstOff, payload);
+
+        H2IOUtils.store(dstPageAddr, dstOff + payloadSize, srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final long getLink(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        return MVCC_COUNTER_NA;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        return MVCC_COUNTER_NA;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java
new file mode 100644
index 0000000..7beecf2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2ExtrasLeafIO.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_COUNTER_NA;
+
+/**
+ * Leaf page for H2 row references.
+ */
+public class AbstractH2ExtrasLeafIO extends BPlusLeafIO<GridH2SearchRow> implements H2RowLinkIO {
+    /** Payload size. */
+    private final int payloadSize;
+
+    /** */
+    public static void register() {
+        register(false);
+
+        register(true);
+    }
+
+    /**
+     * @param mvcc Mvcc flag.
+     */
+    private static void register(boolean mvcc) {
+        short type = mvcc ? PageIO.T_H2_EX_REF_MVCC_LEAF_START : PageIO.T_H2_EX_REF_LEAF_START;
+
+        for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++) {
+            IOVersions<? extends AbstractH2ExtrasLeafIO> io =
+                getVersions((short)(type + payload - 1), payload, mvcc);
+
+            PageIO.registerH2ExtraLeaf(io, mvcc);
+        }
+    }
+
+    /**
+     * @param payload Payload size.
+     * @param mvccEnabled Mvcc flag.
+     * @return IOVersions for given payload.
+     */
+    @SuppressWarnings("unchecked")
+    public static IOVersions<? extends BPlusLeafIO<GridH2SearchRow>> getVersions(int payload, boolean mvccEnabled) {
+        assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
+
+        if (payload == 0)
+            return mvccEnabled ? H2MvccLeafIO.VERSIONS : H2LeafIO.VERSIONS;
+        else
+            return (IOVersions<BPlusLeafIO<GridH2SearchRow>>)PageIO.getLeafVersions((short)(payload - 1), mvccEnabled);
+    }
+
+    /**
+     * @param type Type.
+     * @param payload Payload size.
+     * @param mvcc Mvcc flag.
+     * @return Versions.
+     */
+    private static IOVersions<? extends AbstractH2ExtrasLeafIO> getVersions(short type, short payload, boolean mvcc) {
+        return new IOVersions<>(mvcc ? new H2MvccExtrasLeafIO(type, 1, payload) : new H2ExtrasLeafIO(type, 1, payload));
+    }
+
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param itemSize Item size.
+     * @param payloadSize Payload size.
+     */
+    AbstractH2ExtrasLeafIO(short type, int ver, int itemSize, int payloadSize) {
+        super(type, ver, itemSize + payloadSize);
+
+        this.payloadSize = payloadSize;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+        GridH2Row row0 = (GridH2Row)row;
+
+        assert row0.link() != 0;
+
+        List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
+
+        assert inlineIdxs != null : "no inline index helpers";
+
+        int fieldOff = 0;
+
+        for (int i = 0; i < inlineIdxs.size(); i++) {
+            InlineIndexHelper idx = inlineIdxs.get(i);
+
+            int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
+
+            if (size == 0)
+                break;
+
+            fieldOff += size;
+        }
+
+        H2IOUtils.storeRow(row0, pageAddr, off + payloadSize, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+        int srcOff = srcIo.offset(srcIdx);
+
+        byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
+        long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
+
+        assert link != 0;
+
+        int dstOff = offset(dstIdx);
+
+        PageUtils.putBytes(dstPageAddr, dstOff, payload);
+
+        H2IOUtils.store(dstPageAddr, dstOff + payloadSize, srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx)
+        throws IgniteCheckedException {
+        long link = getLink(pageAddr, idx);
+
+        if (storeMvccInfo()) {
+            long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCntr = getMvccCounter(pageAddr, idx);
+
+            return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+        }
+
+        return ((H2Tree)tree).getRowFactory().getRow(link);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final long getLink(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        return MVCC_COUNTER_NA;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        return MVCC_COUNTER_NA;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java
new file mode 100644
index 0000000..2f12e75
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2InnerIO.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+/**
+ * Inner page for H2 row references.
+ */
+public abstract class AbstractH2InnerIO extends BPlusInnerIO<GridH2SearchRow> implements H2RowLinkIO {
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param itemSize Single item size on page.
+     */
+    AbstractH2InnerIO(int type, int ver, int itemSize) {
+        super(type, ver, true, itemSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+        GridH2Row row0 = (GridH2Row)row;
+
+        H2IOUtils.storeRow(row0, pageAddr, off, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow, ?> tree, long pageAddr, int idx)
+        throws IgniteCheckedException {
+        long link = getLink(pageAddr, idx);
+
+        if (storeMvccInfo()) {
+            long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCntr = getMvccCounter(pageAddr, idx);
+
+            return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+        }
+
+        return ((H2Tree)tree).getRowFactory().getRow(link);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+        H2IOUtils.store(dstPageAddr, offset(dstIdx), srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLink(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx));
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 8);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 16);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 24);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 32);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java
new file mode 100644
index 0000000..a5cf7c2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/AbstractH2LeafIO.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+/**
+ * Leaf page for H2 row references.
+ */
+public abstract class AbstractH2LeafIO extends BPlusLeafIO<GridH2SearchRow> implements H2RowLinkIO {
+    /**
+     * @param type Page type.
+     * @param ver Page format version.
+     * @param itemSize Single item size on page.
+     */
+    AbstractH2LeafIO(int type, int ver, int itemSize) {
+        super(type, ver, itemSize);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeMvccInfo() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void storeByOffset(long pageAddr, int off, GridH2SearchRow row) {
+        GridH2Row row0 = (GridH2Row)row;
+
+        H2IOUtils.storeRow(row0, pageAddr, off, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final void store(long dstPageAddr, int dstIdx, BPlusIO<GridH2SearchRow> srcIo, long srcPageAddr, int srcIdx) {
+        assert srcIo == this;
+
+        H2IOUtils.store(dstPageAddr, offset(dstIdx), srcIo, srcPageAddr, srcIdx, storeMvccInfo());
+    }
+
+    /** {@inheritDoc} */
+    @Override public final GridH2SearchRow getLookupRow(BPlusTree<GridH2SearchRow,?> tree, long pageAddr, int idx)
+        throws IgniteCheckedException {
+        long link = getLink(pageAddr, idx);
+
+        if (storeMvccInfo()) {
+            long mvccCrdVer = getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCntr = getMvccCounter(pageAddr, idx);
+
+            return ((H2Tree)tree).getRowFactory().getMvccRow(link, mvccCrdVer, mvccCntr);
+        }
+
+        return ((H2Tree)tree).getRowFactory().getRow(link);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getLink(long pageAddr, int idx) {
+        return PageUtils.getLong(pageAddr, offset(idx));
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCoordinatorVersion(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 8);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getMvccCounter(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 16);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCoordinatorVersion(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 24);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long getNewMvccCounter(long pageAddr, int idx) {
+        assert storeMvccInfo();
+
+        return PageUtils.getLong(pageAddr, offset(idx) + 32);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
index b8877e9..8dc8c96 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasInnerIO.java
@@ -17,124 +17,17 @@
 
 package org.apache.ignite.internal.processors.query.h2.database.io;
 
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
-
 /**
  * Inner page for H2 row references.
  */
-public class H2ExtrasInnerIO extends BPlusInnerIO<SearchRow> {
-    /** Payload size. */
-    private final int payloadSize;
-
-    /** */
-    public static void register() {
-        for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++)
-            PageIO.registerH2ExtraInner(getVersions((short)(PageIO.T_H2_EX_REF_INNER_START + payload - 1), payload));
-    }
-
-    /**
-     * @param payload Payload size.
-     * @return IOVersions for given payload.
-     */
-    @SuppressWarnings("unchecked")
-    public static IOVersions<? extends BPlusInnerIO<SearchRow>> getVersions(int payload) {
-        assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
-
-        if (payload == 0)
-            return H2InnerIO.VERSIONS;
-        else
-            return (IOVersions<BPlusInnerIO<SearchRow>>)PageIO.getInnerVersions((short)(payload - 1));
-    }
-
-    /**
-     * @param type Type.
-     * @param payload Payload size.
-     * @return Instance of IO versions.
-     */
-    private static IOVersions<H2ExtrasInnerIO> getVersions(short type, short payload) {
-        return new IOVersions<>(new H2ExtrasInnerIO(type, 1, payload));
-    }
-
+public class H2ExtrasInnerIO extends AbstractH2ExtrasInnerIO implements H2RowLinkIO {
     /**
      * @param type Page type.
      * @param ver Page format version.
      * @param payloadSize Payload size.
      */
-    private H2ExtrasInnerIO(short type, int ver, int payloadSize) {
-        super(type, ver, true, 8 + payloadSize);
-        this.payloadSize = payloadSize;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
-        GridH2Row row0 = (GridH2Row)row;
-
-        assert row0.link() != 0 : row0;
-
-        List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
-
-        assert inlineIdxs != null : "no inline index helpers";
-
-
-        int fieldOff = 0;
-
-        for (int i = 0; i < inlineIdxs.size(); i++) {
-            InlineIndexHelper idx = inlineIdxs.get(i);
-
-            int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
-
-            if (size == 0)
-                break;
-
-            fieldOff += size;
-        }
-
-        PageUtils.putLong(pageAddr, off + payloadSize, row0.link());
-    }
-
-    /** {@inheritDoc} */
-    @Override public SearchRow getLookupRow(BPlusTree<SearchRow, ?> tree, long pageAddr, int idx)
-        throws IgniteCheckedException {
-        long link = getLink(pageAddr, idx);
-
-        assert link != 0;
-
-        return ((H2Tree)tree).getRowFactory().getRow(link);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
-        int srcOff = srcIo.offset(srcIdx);
-
-        byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
-        long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
-
-        assert link != 0;
-
-        int dstOff = offset(dstIdx);
-
-        PageUtils.putBytes(dstPageAddr, dstOff, payload);
-        PageUtils.putLong(dstPageAddr, dstOff + payloadSize, link);
-    }
-
-    /**
-     * @param pageAddr Page address.
-     * @param idx Index.
-     * @return Link to row.
-     */
-    private long getLink(long pageAddr, int idx) {
-        return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+    H2ExtrasInnerIO(short type, int ver, int payloadSize) {
+        super(type, ver, 8, payloadSize);
     }
 }
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
index 6161f8d..085f98b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2ExtrasLeafIO.java
@@ -17,121 +17,16 @@
 
 package org.apache.ignite.internal.processors.query.h2.database.io;
 
-import java.util.List;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.database.InlineIndexHelper;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
-
 /**
  * Leaf page for H2 row references.
  */
-public class H2ExtrasLeafIO extends BPlusLeafIO<SearchRow> {
-    /** Payload size. */
-    private final int payloadSize;
-
-    /** */
-    public static void register() {
-        for (short payload = 1; payload <= PageIO.MAX_PAYLOAD_SIZE; payload++)
-            PageIO.registerH2ExtraLeaf(getVersions((short)(PageIO.T_H2_EX_REF_LEAF_START + payload - 1), payload));
-    }
-
-    /**
-     * @param payload Payload size.
-     * @return IOVersions for given payload.
-     */
-    @SuppressWarnings("unchecked")
-    public static IOVersions<? extends BPlusLeafIO<SearchRow>> getVersions(int payload) {
-        assert payload >= 0 && payload <= PageIO.MAX_PAYLOAD_SIZE;
-
-        if (payload == 0)
-            return H2LeafIO.VERSIONS;
-        else
-            return (IOVersions<BPlusLeafIO<SearchRow>>)PageIO.getLeafVersions((short)(payload - 1));
-    }
-
-    /**
-     * @param type Type.
-     * @param payload Payload size.
-     * @return Versions.
-     */
-    private static IOVersions<H2ExtrasLeafIO> getVersions(short type, short payload) {
-        return new IOVersions<>(new H2ExtrasLeafIO(type, 1, payload));
-    }
-
+public class H2ExtrasLeafIO extends AbstractH2ExtrasLeafIO {
     /**
      * @param type Page type.
      * @param ver Page format version.
      * @param payloadSize Payload size.
      */
-    private H2ExtrasLeafIO(short type, int ver, int payloadSize) {
-        super(type, ver, 8 + payloadSize);
-        this.payloadSize = payloadSize;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("ForLoopReplaceableByForEach")
-    @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
-        GridH2Row row0 = (GridH2Row)row;
-
-        assert row0.link() != 0;
-
-        List<InlineIndexHelper> inlineIdxs = InlineIndexHelper.getCurrentInlineIndexes();
-
-        assert inlineIdxs != null : "no inline index helpers";
-
-        int fieldOff = 0;
-
-        for (int i = 0; i < inlineIdxs.size(); i++) {
-            InlineIndexHelper idx = inlineIdxs.get(i);
-
-            int size = idx.put(pageAddr, off + fieldOff, row.getValue(idx.columnIndex()), payloadSize - fieldOff);
-
-            if (size == 0)
-                break;
-
-            fieldOff += size;
-        }
-
-        PageUtils.putLong(pageAddr, off + payloadSize, row0.link());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
-        int srcOff = srcIo.offset(srcIdx);
-
-        byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff, payloadSize);
-        long link = PageUtils.getLong(srcPageAddr, srcOff + payloadSize);
-
-        assert link != 0;
-
-        int dstOff = offset(dstIdx);
-
-        PageUtils.putBytes(dstPageAddr, dstOff, payload);
-        PageUtils.putLong(dstPageAddr, dstOff + payloadSize, link);
-    }
-
-    /** {@inheritDoc} */
-    @Override public SearchRow getLookupRow(BPlusTree<SearchRow, ?> tree, long pageAddr, int idx)
-        throws IgniteCheckedException {
-        long link = getLink(pageAddr, idx);
-
-        return ((H2Tree)tree).getRowFactory().getRow(link);
-    }
-
-    /**
-     * @param pageAddr Page address.
-     * @param idx Index.
-     * @return Link to row.
-     */
-    private long getLink(long pageAddr, int idx) {
-        return PageUtils.getLong(pageAddr, offset(idx) + payloadSize);
+    H2ExtrasLeafIO(short type, int ver, int payloadSize) {
+        super(type, ver, 8, payloadSize);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java
new file mode 100644
index 0000000..c0b2314
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2IOUtils.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.database.io;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2SearchRow;
+
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.assertMvccVersionValid;
+
+/**
+ *
+ */
+class H2IOUtils {
+    /**
+     *
+     */
+    private H2IOUtils() {}
+
+    /**
+     * @param row Row.
+     * @param pageAddr Page address.
+     * @param off Offset.
+     * @param storeMvcc {@code True} to store mvcc data.
+     */
+    static void storeRow(GridH2Row row, long pageAddr, int off, boolean storeMvcc) {
+        assert row.link() != 0;
+
+        PageUtils.putLong(pageAddr, off, row.link());
+
+        if (storeMvcc) {
+            long mvccCrdVer = row.mvccCoordinatorVersion();
+            long mvccCntr = row.mvccCounter();
+
+            assert assertMvccVersionValid(mvccCrdVer, mvccCntr);
+
+            PageUtils.putLong(pageAddr, off + 8, mvccCrdVer);
+            PageUtils.putLong(pageAddr, off + 16, mvccCntr);
+
+            long newMvccCrdVer = row.newMvccCoordinatorVersion();
+
+            PageUtils.putLong(pageAddr, off + 24, newMvccCrdVer);
+
+            if (newMvccCrdVer != 0) {
+                long newMvccCntr = row.newMvccCounter();
+
+                assert assertMvccVersionValid(newMvccCrdVer, newMvccCntr);
+
+                PageUtils.putLong(pageAddr, off + 32, newMvccCntr);
+            }
+        }
+    }
+
+    /**
+     * @param dstPageAddr Destination page address.
+     * @param dstOff Destination page offset.
+     * @param srcIo Source IO.
+     * @param srcPageAddr Source page address.
+     * @param srcIdx Source index.
+     * @param storeMvcc {@code True} to store mvcc data.
+     */
+    static void store(long dstPageAddr,
+        int dstOff,
+        BPlusIO<GridH2SearchRow> srcIo,
+        long srcPageAddr,
+        int srcIdx,
+        boolean storeMvcc)
+    {
+        H2RowLinkIO rowIo = (H2RowLinkIO)srcIo;
+
+        long link = rowIo.getLink(srcPageAddr, srcIdx);
+
+        PageUtils.putLong(dstPageAddr, dstOff, link);
+
+        if (storeMvcc) {
+            long mvccCrdVer = rowIo.getMvccCoordinatorVersion(srcPageAddr, srcIdx);
+            long mvccCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
+
+            assert assertMvccVersionValid(mvccCrdVer, mvccCntr);
+
+            PageUtils.putLong(dstPageAddr, dstOff + 8, mvccCrdVer);
+            PageUtils.putLong(dstPageAddr, dstOff + 16, mvccCntr);
+
+            long newMvccCrdVer = rowIo.getNewMvccCoordinatorVersion(srcPageAddr, srcIdx);
+
+            PageUtils.putLong(dstPageAddr, dstOff + 24, newMvccCrdVer);
+
+            if (newMvccCrdVer != 0) {
+                long newMvccCntr = rowIo.getNewMvccCounter(srcPageAddr, srcIdx);
+
+                assertMvccVersionValid(newMvccCrdVer, newMvccCntr);
+
+                PageUtils.putLong(dstPageAddr, dstOff + 32, newMvccCntr);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6150f3a0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
index a1f1ce9..9baff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/io/H2InnerIO.java
@@ -17,20 +17,12 @@
 
 package org.apache.ignite.internal.processors.query.h2.database.io;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
-import org.apache.ignite.internal.processors.query.h2.database.H2Tree;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
-import org.h2.result.SearchRow;
 
 /**
  * Inner page for H2 row references.
  */
-public class H2InnerIO extends BPlusInnerIO<SearchRow> implements H2RowLinkIO {
+public class H2InnerIO extends AbstractH2InnerIO {
     /** */
     public static final IOVersions<H2InnerIO> VERSIONS = new IOVersions<>(
         new H2InnerIO(1)
@@ -40,35 +32,6 @@ public class H2InnerIO extends BPlusInnerIO<SearchRow> implements H2RowLinkIO {
      * @param ver Page format version.
      */
     private H2InnerIO(int ver) {
-        super(T_H2_REF_INNER, ver, true, 8);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void storeByOffset(long pageAddr, int off, SearchRow row) {
-        GridH2Row row0 = (GridH2Row)row;
-
-        assert row0.link() != 0;
-
-        PageUtils.putLong(pageAddr, off, row0.link());
-    }
-
-    /** {@inheritDoc} */
-    @Override public SearchRow getLookupRow(BPlusTree<SearchRow,?> tree, long pageAddr, int idx)
-        throws IgniteCheckedException {
-        long link = getLink(pageAddr, idx);
-
-        return ((H2Tree)tree).getRowFactory().getRow(link);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<SearchRow> srcIo, long srcPageAddr, int srcIdx) {
-        long link = ((H2RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx);
-
-        PageUtils.putLong(dstPageAddr, offset(dstIdx), link);
-    }
-
-    /** {@inheritDoc} */
-    @Override public long getLink(long pageAddr, int idx) {
-        return PageUtils.getLong(pageAddr, offset(idx));
+        super(T_H2_REF_INNER, ver, 8);
     }
 }