You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/10 12:44:17 UTC

[13/15] ignite git commit: ignite-5937

ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/21bc6338
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/21bc6338
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/21bc6338

Branch: refs/heads/ignite-5937
Commit: 21bc6338da4e9d794e624b0c84ae5a5615f86d53
Parents: c1b2c03
Author: sboikov <sb...@gridgain.com>
Authored: Tue Oct 10 13:31:40 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Oct 10 14:41:34 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    | 141 ++++++-----
 .../dht/preloader/GridDhtPartitionSupplier.java |  24 +-
 .../GridDhtPartitionSupplyMessage.java          |   4 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |  27 +-
 .../cache/mvcc/MvccCoordinatorVersion.java      |   2 +-
 .../cache/persistence/CacheDataRow.java         |   5 +
 .../cache/persistence/CacheDataRowAdapter.java  |   5 +
 .../cache/persistence/CacheSearchRow.java       |   6 +
 .../persistence/GridCacheOffheapManager.java    |   5 +
 .../processors/cache/persistence/RowStore.java  |   2 +
 .../persistence/freelist/FreeListImpl.java      |  11 +-
 .../cache/persistence/tree/io/DataPageIO.java   |  22 +-
 .../cache/tree/AbstractDataInnerIO.java         |  10 +-
 .../cache/tree/AbstractDataLeafIO.java          |  10 +-
 .../cache/tree/CacheDataRowStore.java           |  18 +-
 .../processors/cache/tree/CacheDataTree.java    |   8 +-
 .../internal/processors/cache/tree/DataRow.java |   7 +
 .../processors/cache/tree/MvccDataRow.java      |  32 ++-
 .../cache/tree/MvccKeyMaxVersionBound.java      |   9 +-
 .../processors/cache/tree/MvccRemoveRow.java    |  57 +++++
 .../processors/cache/tree/MvccUpdateRow.java    |  67 +++--
 .../cache/tree/MvccVersionBasedSearchRow.java   |  16 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   | 245 ++++++++++++++++++-
 .../database/FreeListImplSelfTest.java          |   5 +
 .../processors/query/h2/opt/GridH2Row.java      |   5 +
 25 files changed, 604 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 80d36c1..380ec94 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -54,9 +55,9 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.DataRow;
-import org.apache.ignite.internal.processors.cache.tree.MvccDataRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMaxVersionBound;
 import org.apache.ignite.internal.processors.cache.tree.MvccKeyMinVersionBound;
+import org.apache.ignite.internal.processors.cache.tree.MvccRemoveRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccSearchRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccUpdateRow;
 import org.apache.ignite.internal.processors.cache.tree.MvccVersionBasedSearchRow;
@@ -85,6 +86,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
 /**
  *
@@ -1374,60 +1377,43 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 // Make sure value bytes initialized.
                 key.valueBytes(coCtx);
 
-                MvccUpdateRow updateRow = new MvccUpdateRow(
+                MvccRemoveRow updateRow = new MvccRemoveRow(
                     key,
-                    null,
-                    null,
                     mvccVer,
                     partId,
                     cacheId);
 
-                rowStore.addRow(updateRow);
-
-                assert updateRow.link() != 0 : updateRow;
-
                 if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
                     updateRow.cacheId(cctx.cacheId());
 
-                GridLongList waitTxs = null;
+                dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
 
-                if (mvccVer.initialLoad()) {
-                    boolean old = dataTree.putx(updateRow);
+                MvccUpdateRow.UpdateResult res = updateRow.updateResult();
 
-                    assert !old;
+                if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+                    assert !primary : updateRow;
 
-                    incrementSize(cctx.cacheId());
+                    cleanup(updateRow.cleanupRows(), false);
                 }
                 else {
-                    dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
-
-                    boolean old = dataTree.putx(updateRow);
-
-                    assert !old;
+                    if (res == MvccUpdateRow.UpdateResult.PREV_NOT_NULL)
+                        decrementSize(cacheId);
 
-                    if (!updateRow.previousNotNull())
-                        incrementSize(cctx.cacheId());
+                    CacheSearchRow rmvRow = cleanup(updateRow.cleanupRows(), true);
 
-                    waitTxs = updateRow.activeTransactions();
-
-                    List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
-
-                    if (cleanupRows != null) {
-                        for (int i = 0; i < cleanupRows.size(); i++) {
-                            CacheSearchRow oldRow = cleanupRows.get(i);
-
-                            assert oldRow.link() != 0L : oldRow;
+                    if (rmvRow == null)
+                        rowStore.addRow(updateRow);
+                    else
+                        updateRow.link(rmvRow.link());
 
-                            boolean rmvd = dataTree.removex(oldRow);
+                    assert updateRow.link() != 0L;
 
-                            assert rmvd;
+                    boolean old = dataTree.putx(updateRow);
 
-                            rowStore.removeRow(oldRow.link());
-                        }
-                    }
+                    assert !old;
                 }
 
-                return waitTxs;
+                return updateRow.activeTransactions();
             }
             finally {
                 busyLock.leaveBusy();
@@ -1464,16 +1450,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     partId,
                     cacheId);
 
-                rowStore.addRow(updateRow);
-
-                assert updateRow.link() != 0 : updateRow;
-
                 if (grp.sharedGroup() && updateRow.cacheId() == CU.UNDEFINED_CACHE_ID)
                     updateRow.cacheId(cctx.cacheId());
 
                 GridLongList waitTxs = null;
 
                 if (mvccVer.initialLoad()) {
+                    rowStore.addRow(updateRow);
+
                     boolean old = dataTree.putx(updateRow);
 
                     assert !old;
@@ -1483,30 +1467,25 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                 else {
                     dataTree.iterate(updateRow, new MvccKeyMinVersionBound(cacheId, key), updateRow);
 
-                    boolean old = dataTree.putx(updateRow);
-
-                    assert !old;
-
-                    if (!updateRow.previousNotNull())
-                        incrementSize(cctx.cacheId());
-
-                    waitTxs = updateRow.activeTransactions();
+                    MvccUpdateRow.UpdateResult res = updateRow.updateResult();
 
-                    List<CacheSearchRow> cleanupRows = updateRow.cleanupRows();
+                    if (res == MvccUpdateRow.UpdateResult.VERSION_FOUND) {
+                        assert !primary : updateRow;
+                    }
+                    else {
+                        rowStore.addRow(updateRow);
 
-                    if (cleanupRows != null) {
-                        for (int i = 0; i < cleanupRows.size(); i++) {
-                            CacheSearchRow oldRow = cleanupRows.get(i);
+                        boolean old = dataTree.putx(updateRow);
 
-                            assert oldRow.link() != 0L : oldRow;
+                        assert !old;
 
-                            boolean rmvd = dataTree.removex(oldRow);
+                        if (res == MvccUpdateRow.UpdateResult.PREV_NULL)
+                            incrementSize(cctx.cacheId());
+                    }
 
-                            assert rmvd;
+                    cleanup(updateRow.cleanupRows(), false);
 
-                            rowStore.removeRow(oldRow.link());
-                        }
-                    }
+                    waitTxs = updateRow.activeTransactions();
                 }
 
                 return waitTxs;
@@ -1516,6 +1495,39 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             }
         }
 
+        /**
+         * @param cleanupRows Rows to cleanup.
+         * @param findRmv {@code True} if need keep removed row entry.
+         * @return Removed row entry if found.
+         * @throws IgniteCheckedException If failed.
+         */
+        @Nullable private CacheSearchRow cleanup(@Nullable List<CacheSearchRow> cleanupRows, boolean findRmv)
+            throws IgniteCheckedException {
+            CacheSearchRow rmvRow = null;
+
+            if (cleanupRows != null) {
+                for (int i = 0; i < cleanupRows.size(); i++) {
+                    CacheSearchRow oldRow = cleanupRows.get(i);
+
+                    assert oldRow.link() != 0L : oldRow;
+
+                    boolean rmvd = dataTree.removex(oldRow);
+
+                    assert rmvd;
+
+                    if (findRmv &&
+                        rmvRow == null &&
+                        versionForRemovedValue(oldRow.mvccCoordinatorVersion())) {
+                        rmvRow = oldRow;
+                    }
+                    else
+                        rowStore.removeRow(oldRow.link());
+                }
+            }
+
+            return rmvRow;
+        }
+
         /** {@inheritDoc} */
         @Override public void update(GridCacheContext cctx,
             KeyCacheObject key,
@@ -1832,18 +1844,27 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
                     while (cur.next()) {
                         CacheDataRow row = cur.get();
 
-                        if (row.mvccCoordinatorVersion() > ver.coordinatorVersion()
-                            || row.mvccCounter() > ver.counter())
+                        long rowCrdVerMasked = row.mvccCoordinatorVersion();
+
+                        long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+                        if (rowCrdVer > ver.coordinatorVersion() || row.mvccCounter() > ver.counter())
                             continue;
 
                         MvccLongList txs = ver.activeTransactions();
 
-                        if (txs != null && row.mvccCoordinatorVersion() == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
+                        if (txs != null && rowCrdVer == ver.coordinatorVersion() && txs.contains(row.mvccCounter()))
                             continue;
 
                         if (curKey != null && row.key().equals(curKey))
                             continue;
 
+                        if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+                            curKey = row.key();
+
+                            continue;
+                        }
+
                         curRow = row;
 
                         break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index 0905917..357fef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -31,9 +31,9 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.T3;
@@ -43,6 +43,7 @@ import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.IgniteSpiException;
 
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
 /**
  * Thread pool for supplying partitions to demanding nodes.
@@ -375,13 +376,24 @@ class GridDhtPartitionSupplier {
                             GridCacheEntryInfo info = grp.mvccEnabled() ?
                                 new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
 
+
                             info.key(row.key());
-                            info.expireTime(row.expireTime());
-                            info.version(row.version());
-                            info.value(row.value());
                             info.cacheId(row.cacheId());
-                            info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
-                            info.mvccCounter(row.mvccCounter());
+
+                            boolean rmvd = false;
+
+                            if (grp.mvccEnabled()) {
+                                info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
+                                info.mvccCounter(row.mvccCounter());
+
+                                rmvd = versionForRemovedValue(row.mvccCoordinatorVersion());
+                            }
+
+                            if (!rmvd) {
+                                info.value(row.value());
+                                info.version(row.version());
+                                info.expireTime(row.expireTime());
+                            }
 
                             if (preloadPred == null || preloadPred.apply(info))
                                 s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 90d11f5..6675f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -42,6 +42,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemTy
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  * Partition supply message.
  */
@@ -217,7 +219,7 @@ public class GridDhtPartitionSupplyMessage extends GridCacheGroupIdMessage imple
     void addEntry0(int p, GridCacheEntryInfo info, GridCacheSharedContext ctx, CacheObjectContext cacheObjCtx) throws IgniteCheckedException {
         assert info != null;
         assert info.key() != null : info;
-        assert info.value() != null : info;
+        assert info.value() != null || versionForRemovedValue(info.coordinatorVersion()): info;
 
         // Need to call this method to initialize info properly.
         marshalInfo(info, ctx, cacheObjCtx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 54fb3c8..9f9a7a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -146,21 +146,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
         super(ctx);
     }
 
-    public static int compareCoordinatorVersions(long crdVer1, long crdVer2) {
-        crdVer1 = CRD_VER_MASK & crdVer1;
-        crdVer2 = CRD_VER_MASK & crdVer2;
-
-        return Long.compare(crdVer1, crdVer2);
-    }
-
-    public long createVersionForRemovedValue(long crdVer) {
+    /**
+     * @param crdVer Coordinator version.
+     * @return Coordinator version with removed value flag.
+     */
+    public static long createVersionForRemovedValue(long crdVer) {
         return crdVer | RMVD_VAL_VER_MASK;
     }
 
-    public boolean versionForRemovedValue(long crdVer) {
+    /**
+     * @param crdVer Coordinator version with flags.
+     * @return {@code True} if removed value flag is set.
+     */
+    public static boolean versionForRemovedValue(long crdVer) {
         return (crdVer & RMVD_VAL_VER_MASK) != 0;
     }
 
+    /**
+     * @param crdVer Coordinator version with flags.
+     * @return Coordinator version.
+     */
+    public static long unmaskCoordinatorVersion(long crdVer) {
+        return crdVer & CRD_VER_MASK;
+    }
+
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         statCntrs = new StatCounter[7];

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
index a0fd5ee..4003b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorVersion.java
@@ -44,7 +44,7 @@ public interface MvccCoordinatorVersion extends Message {
     public long counter();
 
     /**
-     *
+     * @return {@code True} if version for initial load update.
      */
     public boolean initialLoad();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 57aeaef..b76826f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -54,4 +54,9 @@ public interface CacheDataRow extends CacheSearchRow {
      * @param key Key.
      */
     public void key(KeyCacheObject key);
+
+    /**
+     * @return {@code True} if this is row for cache remove operation (used only with mvcc).
+     */
+    public boolean removed();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 925431f..d0f2dab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -582,6 +582,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        return false;
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 5bf53d8..efdc08f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -43,7 +43,13 @@ public interface CacheSearchRow {
      */
     public int cacheId();
 
+    /**
+     * @return Mvcc coordinator version.
+     */
     public long mvccCoordinatorVersion();
 
+    /**
+     * @return Mvcc counter.
+     */
     public long mvccCounter();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 1f52309..ee651c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -837,6 +837,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         @Override public long mvccCoordinatorVersion() {
             return 0; // TODO IGNITE-3478.
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean removed() {
+            return false;  // TODO IGNITE-3478.
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index 9cc5c62..41d2c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -82,6 +82,8 @@ public class RowStore {
 
         try {
             freeList.insertDataRow(row);
+
+            assert row.link() != 0L;
         }
         finally {
             ctx.database().checkpointReadUnlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
index 3eb62ae..9bd27b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
@@ -590,12 +590,19 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
      */
     public static int getRowSize(CacheDataRow row, boolean withCacheId) throws IgniteCheckedException {
         KeyCacheObject key = row.key();
-        CacheObject val = row.value();
 
         int keyLen = key.valueBytesLength(null);
+
+        int len = keyLen + (withCacheId ? 4 : 0);
+
+        if (row.removed())
+            return len;
+
+        CacheObject val = row.value();
+
         int valLen = val.valueBytesLength(null);
 
-        return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8 + (withCacheId ? 4 : 0);
+        return len + valLen + CacheVersionIO.size(row.version(), false) + 8;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
index 628ff38..da012e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/DataPageIO.java
@@ -1040,13 +1040,19 @@ public class DataPageIO extends PageIO {
         final int payloadSize
     ) throws IgniteCheckedException {
         final int keySize = row.key().valueBytesLength(null);
-        final int valSize = row.value().valueBytesLength(null);
+
+        boolean rmvd = row.removed();
+
+        final int valSize = rmvd ? 0 : row.value().valueBytesLength(null);
 
         int written = writeFragment(row, buf, rowOff, payloadSize, EntryPart.CACHE_ID, keySize, valSize);
         written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.KEY, keySize, valSize);
-        written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
-        written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
-        written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+
+        if (!rmvd) {
+            written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.EXPIRE_TIME, keySize, valSize);
+            written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VALUE, keySize, valSize);
+            written += writeFragment(row, buf, rowOff + written, payloadSize - written, EntryPart.VERSION, keySize, valSize);
+        }
 
         assert written == payloadSize;
     }
@@ -1414,9 +1420,15 @@ public class DataPageIO extends PageIO {
             }
 
             addr += row.key().putValue(addr);
+
+            if (row.removed())
+                return;
         }
-        else
+        else {
+            assert !row.removed() : row;
+
             addr += (2 + cacheIdSize + row.key().valueBytesLength(null));
+        }
 
         addr += row.value().putValue(addr);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
index a07d012..fc82cbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataInnerIO.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInne
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
@@ -59,8 +61,8 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
         }
 
         if (storeMvccVersion()) {
-            assert row.mvccCoordinatorVersion() > 0 : row;
-            assert row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
+            assert unmaskCoordinatorVersion(row.mvccCoordinatorVersion()) > 0 : row;
+            assert row.mvccCounter() != COUNTER_NA : row;
 
             PageUtils.putLong(pageAddr, off, row.mvccCoordinatorVersion());
             off += 8;
@@ -123,7 +125,7 @@ public abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> i
             long mvcCntr = rowIo.getMvccCounter(srcPageAddr, srcIdx);
 
             assert mvccTopVer > 0 : mvccTopVer;
-            assert mvcCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+            assert mvcCntr != COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
index ef08bec..c956d22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/AbstractDataLeafIO.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
@@ -27,6 +26,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeaf
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgniteInClosure;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
@@ -61,8 +63,8 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccCrdVer = row.mvccCoordinatorVersion();
             long mvccUpdateCntr = row.mvccCounter();
 
-            assert mvccCrdVer > 0 : mvccCrdVer;
-            assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+            assert unmaskCoordinatorVersion(mvccCrdVer) > 0 : mvccCrdVer;
+            assert mvccUpdateCntr != COUNTER_NA;
 
             PageUtils.putLong(pageAddr, off, mvccCrdVer);
             off += 8;
@@ -98,7 +100,7 @@ public abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> imp
             long mvccUpdateCntr = ((RowLinkIO)srcIo).getMvccCounter(srcPageAddr, srcIdx);
 
             assert mvccUpdateTopVer >=0 : mvccUpdateCntr;
-            assert mvccUpdateCntr != CacheCoordinatorsProcessor.COUNTER_NA;
+            assert mvccUpdateCntr != COUNTER_NA;
 
             PageUtils.putLong(dstPageAddr, off, mvccUpdateTopVer);
             off += 8;

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
index f9e1eb3..85624d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataRowStore.java
@@ -25,6 +25,8 @@ import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  *
  */
@@ -65,17 +67,25 @@ public class CacheDataRowStore extends RowStore {
      * @param cacheId Cache ID.
      * @param hash Hash code.
      * @param link Link.
-     * @param mvccTopVer
-     * @param mvccCntr
+     * @param rowData Required row data.
+     * @param crdVer Mvcc coordinator version.
+     * @param mvccCntr Mvcc counter.
      * @return Search row.
      */
-    MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long mvccTopVer, long mvccCntr) {
+    MvccDataRow mvccRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData, long crdVer, long mvccCntr) {
+        if (rowData != CacheDataRowAdapter.RowData.KEY_ONLY && versionForRemovedValue(crdVer)) {
+            if (rowData == CacheDataRowAdapter.RowData.NO_KEY)
+                return MvccDataRow.removedRowNoKey(partId, cacheId, crdVer, mvccCntr);
+            else
+                rowData = CacheDataRowAdapter.RowData.KEY_ONLY;
+        }
+
         MvccDataRow dataRow = new MvccDataRow(grp,
             hash,
             link,
             partId,
             rowData,
-            mvccTopVer,
+            crdVer,
             mvccCntr);
 
         initDataRow(dataRow, cacheId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index eaeefee..6309153 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 
 import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 
 /**
  *
@@ -160,7 +161,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
 
         long mvccCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
 
-        cmp = Long.compare(row.mvccCoordinatorVersion(), mvccCrdVer);
+        cmp = Long.compare(unmaskCoordinatorVersion(row.mvccCoordinatorVersion()),
+            unmaskCoordinatorVersion(mvccCrdVer));
 
         if (cmp != 0)
             return cmp;
@@ -188,10 +190,10 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
             CacheDataRowAdapter.RowData.FULL;
 
         if (grp.mvccEnabled()) {
-            long mvccTopVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+            long mvccCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
             long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
 
-            return rowStore.mvccRow(cacheId, hash, link, x, mvccTopVer, mvccCntr);
+            return rowStore.mvccRow(cacheId, hash, link, x, mvccCrdVer, mvccCntr);
         }
         else
             return rowStore.dataRow(cacheId, hash, link, x);

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
index 29bbaaf..d1e90d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/DataRow.java
@@ -81,6 +81,13 @@ public class DataRow extends CacheDataRowAdapter {
         this.cacheId = cacheId;
     }
 
+    /**
+     *
+     */
+    protected DataRow() {
+        super(0);
+    }
+
     /** {@inheritDoc} */
     @Override public int partition() {
         return part;

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
index eb1ee10..916ea93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccDataRow.java
@@ -18,12 +18,11 @@
 package org.apache.ignite.internal.processors.cache.tree;
 
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
@@ -46,7 +45,7 @@ public class MvccDataRow extends DataRow {
     MvccDataRow(CacheGroupContext grp, int hash, long link, int part, RowData rowData, long crdVer, long mvccCntr) {
         super(grp, hash, link, part, rowData);
 
-        assert crdVer > 0 : crdVer;
+        assert unmaskCoordinatorVersion(crdVer) > 0 : crdVer;
         assert mvccCntr != CacheCoordinatorsProcessor.COUNTER_NA;
 
         this.crdVer = crdVer;
@@ -54,25 +53,32 @@ public class MvccDataRow extends DataRow {
     }
 
     /**
-     * @param key Key.
-     * @param val Value.
-     * @param ver Version.
+     *
+     */
+    private MvccDataRow() {
+        // No-op.
+    }
+
+    /**
      * @param part Partition.
      * @param cacheId Cache ID.
      * @param crdVer Mvcc coordinator version.
      * @param mvccCntr Mvcc counter.
+     * @return Row.
      */
-    public MvccDataRow(KeyCacheObject key,
-        CacheObject val,
-        GridCacheVersion ver,
+    static MvccDataRow removedRowNoKey(
         int part,
         int cacheId,
         long crdVer,
         long mvccCntr) {
-        super(key, val, ver, part, 0L, cacheId);
+        MvccDataRow row = new MvccDataRow();
 
-        this.mvccCntr = mvccCntr;
-        this.crdVer = crdVer;
+        row.cacheId = cacheId;
+        row.part = part;
+        row.crdVer = crdVer;
+        row.mvccCntr = mvccCntr;
+
+        return row;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
index aa9422d..007ac09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccKeyMaxVersionBound.java
@@ -27,6 +27,8 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  *
  */
@@ -55,7 +57,12 @@ public class MvccKeyMaxVersionBound extends SearchRow implements BPlusTree.TreeR
         int idx)
         throws IgniteCheckedException
     {
-        resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+        RowLinkIO rowIo = (RowLinkIO)io;
+
+        if (versionForRemovedValue(rowIo.getMvccCoordinatorVersion(pageAddr, idx)))
+            resRow = null;
+        else
+            resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
 
         return false;  // Stop search.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
new file mode 100644
index 0000000..8fd8a6e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccRemoveRow.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.tree;
+
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class MvccRemoveRow extends MvccUpdateRow {
+    /**
+     * @param key Key.
+     * @param mvccVer Mvcc version.
+     * @param part Partition.
+     * @param cacheId Cache ID.
+     */
+    public MvccRemoveRow(
+        KeyCacheObject key,
+        MvccCoordinatorVersion mvccVer,
+        int part,
+        int cacheId) {
+        super(key, null, null, mvccVer, part, cacheId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long mvccCoordinatorVersion() {
+        return CacheCoordinatorsProcessor.createVersionForRemovedValue(super.mvccCoordinatorVersion());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MvccRemoveRow.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
index d3303e8..794661d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccUpdateRow.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
@@ -32,12 +33,14 @@ import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+
 /**
  *
  */
 public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<CacheSearchRow, CacheDataRow> {
     /** */
-    private Boolean hasPrev;
+    private UpdateResult res;
 
     /** */
     private boolean canCleanup;
@@ -74,8 +77,8 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     /**
      * @return {@code True} if previous value was non-null.
      */
-    public boolean previousNotNull() {
-        return hasPrev != null && hasPrev;
+    public UpdateResult updateResult() {
+        return res == null ? UpdateResult.PREV_NULL : res;
     }
 
     /**
@@ -98,17 +101,18 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
      * @param idx Item index.
      * @return Always {@code true}.
      */
-    private boolean assertVersionGreater(RowLinkIO io, long pageAddr, int idx) {
-        long rowCrdVer = io.getMvccCoordinatorVersion(pageAddr, idx);
+    private boolean assertVersion(RowLinkIO io, long pageAddr, int idx) {
+        long rowCrdVer = unmaskCoordinatorVersion(io.getMvccCoordinatorVersion(pageAddr, idx));
         long rowCntr = io.getMvccCounter(pageAddr, idx);
 
-        int cmp = Long.compare(mvccCoordinatorVersion(), rowCrdVer);
+        int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
 
         if (cmp == 0)
-            cmp = Long.compare(mvccCounter(), rowCntr);
+            cmp = Long.compare(mvccVer.counter(), rowCntr);
 
-        assert cmp > 0 : "[updCrd=" + mvccCoordinatorVersion() +
-            ", updCntr=" + mvccCounter() +
+        // Can be equals if backup rebalanced value updated on primary.
+        assert cmp >= 0 : "[updCrd=" + mvccVer.coordinatorVersion() +
+            ", updCntr=" + mvccVer.counter() +
             ", rowCrd=" + rowCrdVer +
             ", rowCntr=" + rowCntr + ']';
 
@@ -124,15 +128,31 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     {
         RowLinkIO rowIo = (RowLinkIO)io;
 
-        // All previous versions should be less then new one.
-        assert assertVersionGreater(rowIo, pageAddr, idx);
+        // Assert version grows.
+        assert assertVersion(rowIo, pageAddr, idx);
 
         boolean checkActive = mvccVer.activeTransactions().size() > 0;
 
         boolean txActive = false;
 
+        long rowCrdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+        long rowCrdVer = unmaskCoordinatorVersion(rowCrdVerMasked);
+
+        if (res == null) {
+            int cmp = Long.compare(mvccVer.coordinatorVersion(), rowCrdVer);
+
+            if (cmp == 0)
+                cmp = Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCounter(pageAddr, idx));
+
+            if (cmp == 0)
+                res = UpdateResult.VERSION_FOUND;
+            else
+                res = CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked) ?
+                    UpdateResult.PREV_NULL : UpdateResult.PREV_NOT_NULL;
+        }
+
         // Suppose transactions on previous coordinator versions are done.
-        if (checkActive && mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx)) {
+        if (checkActive && mvccVer.coordinatorVersion() == rowCrdVer) {
             long rowMvccCntr = rowIo.getMvccCounter(pageAddr, idx);
 
             if (mvccVer.activeTransactions().contains(rowMvccCntr)) {
@@ -145,15 +165,12 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
             }
         }
 
-        if (hasPrev == null)
-            hasPrev = Boolean.TRUE; // TODO IGNITE-3478 support removes.
-
         if (!txActive) {
-            assert Long.compare(mvccVer.coordinatorVersion(), rowIo.getMvccCoordinatorVersion(pageAddr, idx)) >= 0;
+            assert Long.compare(mvccVer.coordinatorVersion(), rowCrdVer) >= 0;
 
             int cmp;
 
-            if (mvccVer.coordinatorVersion() == rowIo.getMvccCoordinatorVersion(pageAddr, idx))
+            if (mvccVer.coordinatorVersion() == rowCrdVer)
                 cmp = Long.compare(mvccVer.cleanupVersion(), rowIo.getMvccCounter(pageAddr, idx));
             else
                 cmp = 1;
@@ -163,10 +180,10 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
                 if (canCleanup) {
                     CacheSearchRow row = io.getLookupRow(tree, pageAddr, idx);
 
-                    assert row.link() != 0 && row.mvccCoordinatorVersion() > 0 : row;
+                    assert row.link() != 0 && row.mvccCounter() != CacheCoordinatorsProcessor.COUNTER_NA : row;
 
                     // Should not be possible to cleanup active tx.
-                    assert row.mvccCoordinatorVersion() != mvccVer.coordinatorVersion()
+                    assert rowCrdVer != mvccVer.coordinatorVersion()
                         || !mvccVer.activeTransactions().contains(row.mvccCounter());
 
                     if (cleanupRows == null)
@@ -196,4 +213,16 @@ public class MvccUpdateRow extends DataRow implements BPlusTree.TreeRowClosure<C
     @Override public String toString() {
         return S.toString(MvccUpdateRow.class, this, "super", super.toString());
     }
+
+    /**
+     *
+     */
+    public enum UpdateResult {
+        /** */
+        VERSION_FOUND,
+        /** */
+        PREV_NULL,
+        /** */
+        PREV_NOT_NULL
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
index c829afb..a1d0127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/MvccVersionBasedSearchRow.java
@@ -28,6 +28,9 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
+
 /**
  *
  */
@@ -66,16 +69,23 @@ public class MvccVersionBasedSearchRow extends SearchRow implements BPlusTree.Tr
     {
         boolean visible = true;
 
+        RowLinkIO rowIo = (RowLinkIO)io;
+
+        long crdVerMasked = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
+
         if (ver.activeTransactions().size() > 0) {
-            RowLinkIO rowIo = (RowLinkIO)io;
+            long rowCrdVer = unmaskCoordinatorVersion(crdVerMasked);
 
             // TODO IGNITE-3478 sort active transactions?
-            if (rowIo.getMvccCoordinatorVersion(pageAddr, idx) == ver.coordinatorVersion())
+            if (rowCrdVer == ver.coordinatorVersion())
                 visible = !ver.activeTransactions().contains(rowIo.getMvccCounter(pageAddr, idx));
         }
 
         if (visible) {
-            resRow = ((CacheDataTree) tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
+            if (versionForRemovedValue(crdVerMasked))
+                resRow = null;
+            else
+                resRow = ((CacheDataTree)tree).getRow(io, pageAddr, idx, CacheDataRowAdapter.RowData.NO_KEY);
 
             return false; // Stop search.
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 115e8a2..d45afe7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -45,6 +47,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -119,6 +122,9 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /** */
     private String nodeAttr;
 
+    /** */
+    private static final int PAGE_SIZE = MemoryConfiguration.DFLT_PAGE_SIZE;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -137,6 +143,12 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
         if (nodeAttr != null)
             cfg.setUserAttributes(F.asMap(nodeAttr, true));
 
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+
+        memCfg.setPageSize(PAGE_SIZE);
+
+        cfg.setMemoryConfiguration(memCfg);
+
         return cfg;
     }
 
@@ -376,6 +388,109 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSimplePutRemove() throws Exception {
+        simplePutRemove(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSimplePutRemove_LargeKeys() throws Exception {
+        simplePutRemove(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     * @param largeKeys {@code True} to use large keys (not fitting in single page).
+     */
+    private void simplePutRemove(boolean largeKeys) throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteTransactions txs = node.transactions();
+
+        final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 1));
+
+        final int KEYS = 100;
+
+        checkValues(new HashMap<>(), cache);
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < KEYS; k++)
+                cache.remove(testKey(largeKeys, k));
+
+            tx.commit();
+        }
+
+        checkValues(new HashMap<>(), cache);
+
+        Map<Object, Object> expVals = new HashMap<>();
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < KEYS; k++) {
+                Object key = testKey(largeKeys, k);
+
+                expVals.put(key, k);
+
+                cache.put(key, k);
+            }
+
+            tx.commit();
+        }
+
+        checkValues(expVals, cache);
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < KEYS; k++) {
+                if (k % 2 == 0) {
+                    Object key = testKey(largeKeys, k);
+
+                    cache.remove(key);
+
+                    expVals.remove(key);
+                }
+            }
+
+            tx.commit();
+        }
+
+        checkValues(expVals, cache);
+    }
+
+    /**
+     * @param largeKeys {@code True} to use large keys (not fitting in single page).
+     * @param idx Index.
+     * @return Key instance.
+     */
+    private static Object testKey(boolean largeKeys, int idx) {
+        if (largeKeys) {
+            int payloadSize = PAGE_SIZE + ThreadLocalRandom.current().nextInt(PAGE_SIZE * 10);
+
+            return new TestKey(idx, payloadSize);
+        }
+        else
+            return idx;
+    }
+
+    /**
+     * @param expVals Expected values.
+     * @param cache Cache.
+     */
+    private void checkValues(Map<Object, Object> expVals, IgniteCache<Object, Object> cache) {
+        Map<Object, Object> res = cache.getAll(expVals.keySet());
+
+        assertEquals(expVals, res);
+
+        res = new HashMap<>();
+
+        for (IgniteCache.Entry<Object, Object> e : cache)
+            res.put(e.getKey(), e.getValue());
+
+        assertEquals(expVals, res);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testThreadUpdatesAreVisibleForThisThread() throws Exception {
         final Ignite ignite = startGrid(0);
 
@@ -1601,7 +1716,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testRebalance1() throws Exception {
+    public void testSimpleRebalance() throws Exception {
         Ignite srv0 = startGrid(0);
 
         IgniteCache<Integer, Integer> cache =  (IgniteCache)srv0.createCache(
@@ -1664,6 +1779,32 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSimpleRebalanceWithRemovedValues() throws Exception {
+        Ignite node = startGrid(0);
+
+        IgniteTransactions txs = node.transactions();
+
+        final IgniteCache<Object, Object> cache = node.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, 64));
+
+        final int KEYS = 100;
+
+        checkValues(new HashMap<>(), cache);
+
+        try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+            for (int k = 0; k < KEYS; k++)
+                cache.remove(k);
+
+            tx.commit();
+        }
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testCoordinatorFailurePessimisticTx() throws Exception {
         testSpi = true;
 
@@ -2722,9 +2863,55 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             assertEquals(KEYS, cache.size());
         }
 
-        // TODO IGNITE-3478: test removes.
-    }
+        int size = KEYS;
+
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                final Integer key = i;
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.remove(key);
 
+                    tx.commit();
+                }
+
+                size--;
+
+                assertEquals(size, cache.size());
+            }
+        }
+
+        // Check size does not change if remove already removed keys.
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                final Integer key = i;
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.remove(key);
+
+                    tx.commit();
+                }
+
+                assertEquals(size, cache.size());
+            }
+        }
+
+        for (int i = 0; i < KEYS; i++) {
+            if (i % 2 == 0) {
+                final Integer key = i;
+
+                try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                    cache.put(key, i);
+
+                    tx.commit();
+                }
+
+                size++;
+
+                assertEquals(size, cache.size());
+            }
+        }
+    }
 
     /**
      * @throws IgniteCheckedException If failed.
@@ -2792,7 +2979,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
                 key0,
                 vers.get(0).get1());
 
-            MvccCoordinatorVersionResponse ver = version(crd.currentCoordinator().coordinatorVersion(), 100000);
+            MvccCoordinatorVersionResponse ver = version(vers.get(0).get2().coordinatorVersion(), 100000);
 
             for (int v = 0; v < vers.size(); v++) {
                 MvccCounter cntr = vers.get(v).get2();
@@ -3074,4 +3261,54 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
             return null;
         }
     }
+
+    /**
+     *
+     */
+    static class TestKey implements Serializable {
+        /** */
+        private final int key;
+
+        /** */
+        private final byte[] payload;
+
+        /**
+         * @param key Key.
+         * @param payloadSize Payload size.
+         */
+        public TestKey(int key, int payloadSize) {
+            this.key = key;
+            this.payload = new byte[payloadSize];
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey testKey = (TestKey)o;
+
+            if (key != testKey.key)
+                return false;
+
+            return Arrays.equals(payload, testKey.payload);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = key;
+
+            res = 31 * res + Arrays.hashCode(payload);
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return "TestKey [k=" + key + ", payloadLen=" + payload.length + ']';
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0897e1..600c8df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -435,6 +435,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
         @Override public long mvccCounter() {
             return 0;
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean removed() {
+            return false;
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/21bc6338/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 392301c..1819cfe 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -174,4 +174,9 @@ public abstract class GridH2Row implements SearchRow, CacheDataRow, Row {
     @Override public long mvccCounter() {
         throw new UnsupportedOperationException();
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean removed() {
+        throw new UnsupportedOperationException();
+    }
 }
\ No newline at end of file