You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/11 08:24:54 UTC

[6/7] ignite git commit: ignite-3478 Support for removes

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index c829afb..a1d0127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -28,6 +28,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  *
  */
@@ -66,16 +69,23 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Tr
     {
         boolean visible = true;
 
+        RowLinkIO rowIo = (RowLinkIO)io;
+
+        long crdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
         if (ver.activeTransactions().size() > 0) {
-            RowLinkIO rowIo = (RowLinkIO)io;
+            long rowCrdVer = unmaskCoordinatorVersion(crdVerMasked);
 
             // TODO IGNITE-3478 sort active transactions?
-            if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+            if (rowCrdVer == ver.coordinatorVersion())
                 visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
         }
 
         if (visible) {
-            resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+            if (versionForRemovedValue(crdVerMasked))
+                resRow = null;
+            else
+                resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
 
             return false; // Stop search.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ced2f9..30145ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,11 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
     /** Version which is less then any version generated on coordinator. */
     private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
-        new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L) {
-            @Override public boolean initialLoad() {
-                return true;
-            }
-        };
+        new MvccCoordinatorVersionResponse(1L, CacheCoordinatorsProcessor.START_VER, 0L);
 
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
new file mode 100644
index 0000000..ed7b62d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccClusterRestartTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheMvccClusterRestartTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setConsistentId(gridName);
+
+        cfg.setMvccEnabled(true);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+        memCfg.setPageSize(1024);
+        memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        GridTestUtils.deleteDbFiles();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        GridTestUtils.deleteDbFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        GridTestUtils.deleteDbFiles();
+
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestart1() throws Exception {
+       restart1(3, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestart2() throws Exception {
+        restart1(1, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRestart3() throws Exception {
+        restart1(3, 1);
+    }
+
+    /**
+     * @param srvBefore Number of servers before restart.
+     * @param srvAfter Number of servers after restart.
+     * @throws Exception If failed.
+     */
+    private void restart1(int srvBefore, int srvAfter) throws Exception {
+        Ignite srv0 = startGridsMultiThreaded(srvBefore);
+
+        srv0.active(true);
+
+        IgniteCache<Object, Object> cache = srv0.createCache(cacheConfiguration());
+
+        Set<Integer> keys = new HashSet<>(primaryKeys(cache, 1, 0));
+
+        try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (Integer k : keys)
+                cache.put(k, k);
+
+            tx.commit();
+        }
+
+        stopAllGrids();
+
+        srv0 = startGridsMultiThreaded(srvAfter);
+
+        srv0.active(true);
+
+        cache = srv0.cache(DEFAULT_CACHE_NAME);
+
+        Map<Object, Object> res = cache.getAll(keys);
+
+        assertEquals(keys.size(), res.size());
+
+        for (Integer k : keys)
+            assertEquals(k, cache.get(k));
+
+        try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (Integer k : keys)
+                cache.put(k, k + 1);
+
+            tx.commit();
+        }
+
+        for (Integer k : keys)
+            assertEquals(k + 1, cache.get(k));
+    }
+
+    /**
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration() {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(2);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 115e8a2..8bf9e39 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -119,6 +122,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /** */
     private String nodeAttr;
 
+    /** */
+    private static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -137,6 +143,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         if (nodeAttr != null)
             cfg.setUserAttributes(F.asMap(nodeAttr, true));
 
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        memCfg.setPageSize(PAGE_SIZE);
+
+        cfg.setMemoryConfiguration(memCfg);
+
         return cfg;
     }
 
@@ -376,6 +388,147 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSimplePutRemove() throws Exception {
+        simplePutRemove(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimplePutRemove_LargeKeys() throws Exception {
+        simplePutRemove(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     * @param largeKeys {@code True} to use large keys (not fitting in single page).
+     */
+    private void simplePutRemove(boolean largeKeys) throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteTransactions txs = node.transactions();
+
+        final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+        final int KEYS = 100;
+
+        checkValues(new HashMap<>(), cache);
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < KEYS; k++)
+                cache.remove(testKey(largeKeys, k));
+
+            tx.commit();
+        }
+
+        checkValues(new HashMap<>(), cache);
+
+        Map<Object, Object> expVals = new HashMap<>();
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < KEYS; k++) {
+                Object key = testKey(largeKeys, k);
+
+                expVals.put(key, k);
+
+                cache.put(key, k);
+            }
+
+            tx.commit();
+        }
+
+        checkValues(expVals, cache);
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < KEYS; k++) {
+                if (k % 2 == 0) {
+                    Object key = testKey(largeKeys, k);
+
+                    cache.remove(key);
+
+                    expVals.remove(key);
+                }
+            }
+
+            tx.commit();
+        }
+
+        checkValues(expVals, cache);
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        Object key = testKey(largeKeys, 0);
+
+        for (int i = 0; i < 500; i++) {
+            boolean rmvd;
+
+            try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                if (rnd.nextBoolean()) {
+                    cache.remove(key);
+
+                    rmvd = true;
+                }
+                else {
+                    cache.put(key, i);
+
+                    rmvd = false;
+                }
+
+                tx.commit();
+            }
+
+            if (rmvd) {
+                assertNull(cache.get(key));
+                assertTrue(cache.getAll(F.asSet(key)).isEmpty());
+            }
+            else {
+                assertEquals(i, cache.get(key));
+
+                Map<Object, Object> res = cache.getAll(F.asSet(key));
+
+                assertEquals(i, res.get(key));
+            }
+        }
+    }
+
+    /**
+     * @param largeKeys {@code True} to use large keys (not fitting in single page).
+     * @param idx Index.
+     * @return Key instance.
+     */
+    private static Object testKey(boolean largeKeys, int idx) {
+        if (largeKeys) {
+            int payloadSize = PAGE_SIZE + ThreadLocalRandom.current().nextInt(PAGE_SIZE * 10);
+
+            return new TestKey(idx, payloadSize);
+        }
+        else
+            return idx;
+    }
+
+    /**
+     * @param expVals Expected values.
+     * @param cache Cache.
+     */
+    private void checkValues(Map<Object, Object> expVals, IgniteCache<Object, Object> cache) {
+        for (Map.Entry<Object, Object> e : expVals.entrySet())
+            assertEquals(e.getValue(), cache.get(e.getKey()));
+
+        Map<Object, Object> res = cache.getAll(expVals.keySet());
+
+        assertEquals(expVals, res);
+
+        res = new HashMap<>();
+
+        for (IgniteCache.Entry<Object, Object> e : cache)
+            res.put(e.getKey(), e.getValue());
+
+        assertEquals(expVals, res);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testThreadUpdatesAreVisibleForThisThread() throws Exception {
         final Ignite ignite = startGrid(0);
 
@@ -1181,42 +1334,49 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode() throws Exception {
-        accountsTxGetAll(1, 0, 0, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(1, 0, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_SingleNode_SinglePartition() throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, ReadMode.GET_ALL);
+        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.GET_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxGetAll_WithRemoves_SingleNode_SinglePartition() throws Exception {
+        accountsTxGetAll(1, 0, 0, 1, true, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups0() throws Exception {
-        accountsTxGetAll(4, 2, 0, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(4, 2, 0, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups1() throws Exception {
-        accountsTxGetAll(4, 2, 1, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(4, 2, 1, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxGetAll_ClientServer_Backups2() throws Exception {
-        accountsTxGetAll(4, 2, 2, 64, ReadMode.GET_ALL);
+        accountsTxGetAll(4, 2, 2, 64, false, ReadMode.GET_ALL);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAccountsTxScan_SingleNode_SinglePartition() throws Exception {
-        accountsTxGetAll(1, 0, 0, 1, ReadMode.SCAN);
+        accountsTxGetAll(1, 0, 0, 1, false, ReadMode.SCAN);
     }
 
     /**
@@ -1224,6 +1384,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
      * @param clients Number of client nodes.
      * @param cacheBackups Number of cache backups.
      * @param cacheParts Number of cache partitions.
+     * @param withRmvs If {@code true} then in addition to puts tests also executes removes.
      * @param readMode Read mode.
      * @throws Exception If failed.
      */
@@ -1232,6 +1393,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         final int clients,
         int cacheBackups,
         int cacheParts,
+        final boolean withRmvs,
         final ReadMode readMode
     )
         throws Exception
@@ -1261,6 +1423,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             }
         };
 
+        final Set<Integer> rmvdIds = new HashSet<>();
+
         GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean> writer =
             new GridInClosure3<Integer, List<IgniteCache>, AtomicBoolean>() {
                 @Override public void apply(Integer idx, List<IgniteCache> caches, AtomicBoolean stop) {
@@ -1285,8 +1449,8 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                         keys.add(id1);
                         keys.add(id2);
 
-                        Integer cntr1;
-                        Integer cntr2;
+                        Integer cntr1 = null;
+                        Integer cntr2 = null;
 
                         try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
                             MvccTestAccount a1;
@@ -1297,28 +1461,74 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                             a1 = accounts.get(id1);
                             a2 = accounts.get(id2);
 
-                            assertNotNull(a1);
-                            assertNotNull(a2);
+                            if (!withRmvs) {
+                                assertNotNull(a1);
+                                assertNotNull(a2);
 
-                            cntr1 = a1.updateCnt + 1;
-                            cntr2 = a2.updateCnt + 1;
+                                cntr1 = a1.updateCnt + 1;
+                                cntr2 = a2.updateCnt + 1;
 
-                            cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
-                            cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+                                cache.put(id1, new MvccTestAccount(a1.val + 1, cntr1));
+                                cache.put(id2, new MvccTestAccount(a2.val - 1, cntr2));
+                            }
+                            else {
+                                if (a1 != null || a2 != null) {
+                                    if (a1 != null && a2 != null) {
+                                        Integer rmvd = null;
+
+                                        if (rnd.nextInt(10) == 0) {
+                                            synchronized (rmvdIds) {
+                                                if (rmvdIds.size() < ACCOUNTS / 2) {
+                                                    rmvd = rnd.nextBoolean() ? id1 : id2;
+
+                                                    assertTrue(rmvdIds.add(rmvd));
+                                                }
+                                            }
+                                        }
+
+                                        if (rmvd != null) {
+                                            cache.remove(rmvd);
+
+                                            cache.put(rmvd.equals(id1) ? id2 : id1,
+                                                new MvccTestAccount(a1.val + a2.val, 1));
+                                        }
+                                        else {
+                                            cache.put(id1, new MvccTestAccount(a1.val + 1, 1));
+                                            cache.put(id2, new MvccTestAccount(a2.val - 1, 1));
+                                        }
+                                    }
+                                    else {
+                                        if (a1 == null) {
+                                            cache.put(id1, new MvccTestAccount(100, 1));
+                                            cache.put(id2, new MvccTestAccount(a2.val - 100, 1));
+
+                                            assertTrue(rmvdIds.remove(id1));
+                                        }
+                                        else {
+                                            cache.put(id1, new MvccTestAccount(a1.val - 100, 1));
+                                            cache.put(id2, new MvccTestAccount(100, 1));
+
+                                            assertTrue(rmvdIds.remove(id2));
+                                        }
+                                    }
+                                }
+                            }
 
                             tx.commit();
                         }
 
-                        Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
+                        if (!withRmvs) {
+                            Map<Integer, MvccTestAccount> accounts = cache.getAll(keys);
 
-                        MvccTestAccount a1 = accounts.get(id1);
-                        MvccTestAccount a2 = accounts.get(id2);
+                            MvccTestAccount a1 = accounts.get(id1);
+                            MvccTestAccount a2 = accounts.get(id2);
 
-                        assertNotNull(a1);
-                        assertNotNull(a2);
+                            assertNotNull(a1);
+                            assertNotNull(a2);
 
-                        assertTrue(a1.updateCnt >= cntr1);
-                        assertTrue(a2.updateCnt >= cntr2);
+                            assertTrue(a1.updateCnt >= cntr1);
+                            assertTrue(a2.updateCnt >= cntr2);
+                        }
                     }
 
                     info("Writer finished, updates: " + cnt);
@@ -1354,23 +1564,26 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                         else
                             accounts = cache.getAll(keys);
 
-                        assertEquals(ACCOUNTS, accounts.size());
+                        if (!withRmvs)
+                            assertEquals(ACCOUNTS, accounts.size());
 
                         int sum = 0;
 
                         for (int i = 0; i < ACCOUNTS; i++) {
                             MvccTestAccount account = accounts.get(i);
 
-                            assertNotNull(account);
+                            if (account != null) {
+                                sum += account.val;
 
-                            sum += account.val;
+                                Integer cntr = lastUpdateCntrs.get(i);
 
-                            Integer cntr = lastUpdateCntrs.get(i);
+                                if (cntr != null)
+                                    assertTrue(cntr <= account.updateCnt);
 
-                            if (cntr != null)
-                                assertTrue(cntr <= account.updateCnt);
-
-                            lastUpdateCntrs.put(i, cntr);
+                                lastUpdateCntrs.put(i, cntr);
+                            }
+                            else
+                                assertTrue(withRmvs);
                         }
 
                         assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
@@ -1386,9 +1599,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                         for (int i = 0; i < ACCOUNTS; i++) {
                             MvccTestAccount account = accounts.get(i);
 
-                            info("Account [id=" + i + ", val=" + account.val + ']');
+                            assertTrue(account != null || withRmvs);
 
-                            sum += account.val;
+                            info("Account [id=" + i + ", val=" + (account != null ? account.val : null) + ']');
+
+                            if (account != null)
+                                sum += account.val;
                         }
 
                         info("Sum: " + sum);
@@ -1601,7 +1817,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testRebalance1() throws Exception {
+    public void testSimpleRebalance() throws Exception {
         Ignite srv0 = startGrid(0);
 
         IgniteCache<Integer, Integer> cache =  (IgniteCache)srv0.createCache(
@@ -1664,6 +1880,58 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSimpleRebalanceWithRemovedValues() throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteTransactions txs = node.transactions();
+
+        final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < 100; k++)
+                cache.remove(k);
+
+            tx.commit();
+        }
+
+        Map<Object, Object> expVals = new HashMap<>();
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 100; k < 200; k++) {
+                cache.put(k, k);
+
+                expVals.put(k, k);
+            }
+
+            tx.commit();
+        }
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 100; k < 200; k++) {
+                if (k % 2 == 0) {
+                    cache.remove(k);
+
+                    expVals.remove(k);
+                }
+            }
+
+            tx.commit();
+        }
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        checkValues(expVals, jcache(1));
+
+        stopGrid(0);
+
+        checkValues(expVals, jcache(1));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCoordinatorFailurePessimisticTx() throws Exception {
         testSpi = true;
 
@@ -2722,9 +2990,55 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             assertEquals(KEYS, cache.size());
         }
 
-        // TODO IGNITE-3478: test removes.
-    }
+        int size = KEYS;
+
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                final Integer key = i;
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.remove(key);
+
+                    tx.commit();
+                }
+
+                size--;
+
+                assertEquals(size, cache.size());
+            }
+        }
+
+        // Check size does not change if remove already removed keys.
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                final Integer key = i;
 
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.remove(key);
+
+                    tx.commit();
+                }
+
+                assertEquals(size, cache.size());
+            }
+        }
+
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                final Integer key = i;
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key, i);
+
+                    tx.commit();
+                }
+
+                size++;
+
+                assertEquals(size, cache.size());
+            }
+        }
+    }
 
     /**
      * @throws IgniteCheckedException If failed.
@@ -2792,7 +3106,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                 key0,
                 vers.get(0).get1());
 
-            MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+            MvccCoordinatorVersionResponse ver = version(vers.get(0).get2().coordinatorVersion(), 100000);
 
             for (int v = 0; v < vers.size(); v++) {
                 MvccCounter cntr = vers.get(v).get2();
@@ -3074,4 +3388,54 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             return null;
         }
     }
+
+    /**
+     *
+     */
+    static class TestKey implements Serializable {
+        /** */
+        private final int key;
+
+        /** */
+        private final byte[] payload;
+
+        /**
+         * @param key Key.
+         * @param payloadSize Payload size.
+         */
+        public TestKey(int key, int payloadSize) {
+            this.key = key;
+            this.payload = new byte[payloadSize];
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey testKey = (TestKey)o;
+
+            if (key != testKey.key)
+                return false;
+
+            return Arrays.equals(payload, testKey.payload);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = key;
+
+            res = 31 * res + Arrays.hashCode(payload);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']';
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0897e1..600c8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -435,6 +435,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
         @Override public long mvccCounter() {
             return 0;
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean removed() {
+            return false;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/970cf47a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 392301c..1819cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -174,4 +174,9 @@ public abstract class GridH2Row implements SearchRow, CacheDataRow, Row {
     @Override public long mvccCounter() {
         throw new UnsupportedOperationException();
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        throw new UnsupportedOperationException();
+    }
 }
\ No newline at end of file