You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/22 06:56:00 UTC

[1/3] ignite git commit: ignite-4652 Implemented BPlusTree.invoke

Repository: ignite
Updated Branches:
  refs/heads/ignite-3477 40f015d18 -> ee28b9cb8


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 0de3754..22bc17a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -50,17 +50,22 @@ import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.GridStripedLock;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 import org.jsr166.ConcurrentLinkedHashMap;
 
 import static org.apache.ignite.internal.pagemem.PageIdUtils.effectivePageId;
 import static org.apache.ignite.internal.processors.cache.database.tree.BPlusTree.rnd;
+import static org.apache.ignite.internal.util.IgniteTree.OperationType.NOOP;
+import static org.apache.ignite.internal.util.IgniteTree.OperationType.PUT;
+import static org.apache.ignite.internal.util.IgniteTree.OperationType.REMOVE;
 
 /**
  */
@@ -189,6 +194,40 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void _testBenchInvoke() throws IgniteCheckedException {
+        MAX_PER_PAGE = 10;
+
+        TestTree tree = createTestTree(true);
+
+        long start = System.nanoTime();
+
+        for (int i = 0; i < 10_000_000; i++) {
+            final long key = BPlusTree.randomInt(1000);
+
+//            tree.findOne(key); // 39
+//            tree.putx(key); // 22
+
+            tree.invoke(key, null, new IgniteTree.InvokeClosure<Long>() { // 25
+                @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+                    // No-op.
+                }
+
+                @Override public Long newRow() {
+                    return key;
+                }
+
+                @Override public IgniteTree.OperationType operationType() {
+                    return PUT;
+                }
+            });
+        }
+
+        X.println("   __ time: " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+    }
+
+    /**
      * @param cursor cursor to check.
      * @param iterator iterator with expected result.
      * @throws IgniteCheckedException If failed
@@ -561,6 +600,127 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
     /**
      * @throws IgniteCheckedException If failed.
      */
+    public void testRandomInvoke_1_30_1() throws IgniteCheckedException {
+        MAX_PER_PAGE = 1;
+        CNT = 30;
+
+        doTestRandomInvoke(true);
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    public void testRandomInvoke_1_30_0() throws IgniteCheckedException {
+        MAX_PER_PAGE = 1;
+        CNT = 30;
+
+        doTestRandomInvoke(false);
+    }
+
+    /**
+     * @param canGetRow Can get row from inner page.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void doTestRandomInvoke(boolean canGetRow) throws IgniteCheckedException {
+        TestTree tree = createTestTree(canGetRow);
+
+        Map<Long,Long> map = new HashMap<>();
+
+        int loops = reuseList == null ? 100_000 : 300_000;
+
+        for (int i = 0 ; i < loops; i++) {
+            final Long x = (long)BPlusTree.randomInt(CNT);
+            final int rnd = BPlusTree.randomInt(11);
+
+            if (i % 10_000 == 0) {
+//                X.println(tree.printTree());
+                X.println(" --> " + i + "  ++> " + x);
+            }
+
+            // Update map.
+            if (!map.containsKey(x)) {
+                if (rnd % 2 == 0) {
+                    map.put(x, x);
+
+//                    X.println("put0: " + x);
+                }
+                else {
+//                    X.println("noop0: " + x);
+                }
+            }
+            else {
+                if (rnd % 2 == 0) {
+//                    X.println("put1: " + x);
+                }
+                else if (rnd % 3 == 0) {
+                    map.remove(x);
+
+//                    X.println("rmv1: " + x);
+                }
+                else {
+//                    X.println("noop1: " + x);
+                }
+            }
+
+            // Consistently update tree.
+            tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() {
+
+                IgniteTree.OperationType op;
+
+                Long newRow;
+
+                @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+                    if (row == null) {
+                        if (rnd % 2 == 0) {
+                            op = PUT;
+                            newRow = x;
+                        }
+                        else {
+                            op = NOOP;
+                            newRow = null;
+                        }
+                    }
+                    else {
+                        assertEquals(x, row);
+
+                        if (rnd % 2 == 0) {
+                            op = PUT;
+                            newRow = x; // We can not replace x with y here, because keys must be equal.
+                        }
+                        else if (rnd % 3 == 0) {
+                            op = REMOVE;
+                            newRow = null;
+                        }
+                        else {
+                            op = NOOP;
+                            newRow = null;
+                        }
+                    }
+                }
+
+                @Override public Long newRow() {
+                    return newRow;
+                }
+
+                @Override public IgniteTree.OperationType operationType() {
+                    return op;
+                }
+            });
+
+            assertNoLocks();
+
+//            X.println(tree.printTree());
+
+            tree.validateTree();
+
+            if (i % 100 == 0)
+                assertEqualContents(tree, map);
+        }
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
     public void testRandomPutRemove_1_30_0() throws IgniteCheckedException {
         MAX_PER_PAGE = 1;
         CNT = 30;
@@ -840,24 +1000,32 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 //            X.println(tree.printTree());
             tree.validateTree();
 
-            if (i % 100 == 0) {
-                GridCursor<Long> cursor = tree.find(null, null);
-
-                while (cursor.next()) {
-                    x = cursor.get();
+            if (i % 100 == 0)
+                assertEqualContents(tree, map);
+        }
+    }
 
-                    assert x != null;
+    /**
+     * @param tree Tree.
+     * @param map Map.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void assertEqualContents(IgniteTree<Long, Long> tree, Map<Long,Long> map) throws IgniteCheckedException {
+        GridCursor<Long> cursor = tree.find(null, null);
 
-                    assertEquals(map.get(x), x);
+        while (cursor.next()) {
+            Long x = cursor.get();
 
-                    assertNoLocks();
-                }
+            assert x != null;
 
-                assertEquals(map.size(), tree.size());
+            assertEquals(map.get(x), x);
 
-                assertNoLocks();
-            }
+            assertNoLocks();
         }
+
+        assertEquals(map.size(), tree.size());
+
+        assertNoLocks();
     }
 
     /**
@@ -1039,27 +1207,28 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         final GridStripedLock lock = new GridStripedLock(256);
 
+        final String[] ops = {"put", "rmv", "inv_put", "inv_rmv"};
+
         IgniteInternalFuture<?> fut = multithreadedAsync(new Callable<Object>() {
             @Override public Object call() throws Exception {
                 for (int i = 0; i < loops; i++) {
-                    Long x = (long)DataStructure.randomInt(CNT);
-
-                    boolean put = DataStructure.randomInt(2) == 0;
+                    final Long x = (long)DataStructure.randomInt(CNT);
+                    final int op = DataStructure.randomInt(4);
 
                     if (i % 10000 == 0)
-                        X.println(" --> " + (put ? "put " : "rmv ") + i + "  " + x);
+                        X.println(" --> " + ops[op] + "_" + i + "  " + x);
 
                     Lock l = lock.getLock(x.longValue());
 
                     l.lock();
 
                     try {
-                        if (put) {
+                        if (op == 0) { // Put.
                             assertEquals(map.put(x, x), tree.put(x));
 
                             assertNoLocks();
                         }
-                        else {
+                        else if (op == 1) { // Remove.
                             if (map.remove(x) != null) {
                                 assertEquals(x, tree.remove(x));
 
@@ -1070,6 +1239,54 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
                             assertNoLocks();
                         }
+                        else if (op == 2) {
+                            tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() {
+                                IgniteTree.OperationType opType;
+
+                                @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+                                    opType = PUT;
+
+                                    if (row != null)
+                                        assertEquals(x, row);
+                                }
+
+                                @Override public Long newRow() {
+                                    return x;
+                                }
+
+                                @Override public IgniteTree.OperationType operationType() {
+                                    return opType;
+                                }
+                            });
+
+                            map.put(x,x);
+                        }
+                        else if (op == 3) {
+                            tree.invoke(x, null, new IgniteTree.InvokeClosure<Long>() {
+                                IgniteTree.OperationType opType;
+
+                                @Override public void call(@Nullable Long row) throws IgniteCheckedException {
+                                    if (row != null) {
+                                        assertEquals(x, row);
+                                        opType = REMOVE;
+                                    }
+                                    else
+                                        opType = NOOP;
+                                }
+
+                                @Override public Long newRow() {
+                                    return null;
+                                }
+
+                                @Override public IgniteTree.OperationType operationType() {
+                                    return opType;
+                                }
+                            });
+
+                            map.remove(x);
+                        }
+                        else
+                            fail();
                     }
                     finally {
                         l.unlock();
@@ -1240,7 +1457,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException {
+        @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx, Object ignore)
+            throws IgniteCheckedException {
             assert io.canGetRow() : io;
 
             return io.getLookupRow(this, pageAddr, idx);
@@ -1276,11 +1494,17 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onBeforeReadLock(Page page) {
+//            X.println("  onBeforeReadLock: " + U.hexLong(page.id()));
+//
+//            U.dumpStack();
+
             assertNull(beforeReadLock.put(threadId(), page.id()));
         }
 
         /** {@inheritDoc} */
         @Override public void onReadLock(Page page, long pageAddr) {
+//            X.println("  onReadLock: " + U.hexLong(page.id()));
+
             if (pageAddr != 0L) {
                 long pageId = PageIO.getPageId(pageAddr);
 
@@ -1294,6 +1518,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onReadUnlock(Page page, long pageAddr) {
+//            X.println("  onReadUnlock: " + U.hexLong(page.id()));
+
             checkPageId(page, pageAddr);
 
             long pageId = PageIO.getPageId(pageAddr);
@@ -1303,11 +1529,17 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onBeforeWriteLock(Page page) {
+//            X.println("  onBeforeWriteLock: " + U.hexLong(page.id()));
+
             assertNull(beforeWriteLock.put(threadId(), page.id()));
         }
 
         /** {@inheritDoc} */
         @Override public void onWriteLock(Page page, long pageAddr) {
+//            X.println("  onWriteLock: " + U.hexLong(page.id()));
+//
+//            U.dumpStack();
+
             if (pageAddr != 0L) {
                 checkPageId(page, pageAddr);
 
@@ -1324,6 +1556,8 @@ public class BPlusTreeSelfTest extends GridCommonAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void onWriteUnlock(Page page, long pageAddr) {
+//            X.println("  onWriteUnlock: " + U.hexLong(page.id()));
+
             assertEquals(effectivePageId(page.id()), effectivePageId(PageIO.getPageId(pageAddr)));
 
             assertEquals(Long.valueOf(page.id()), locks(false).remove(page.id()));

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index d0d495e..ebf4aee 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -368,6 +368,11 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
+        @Override public void key(KeyCacheObject key) {
+            this.key = (TestCacheObject)key;
+        }
+
+        /** {@inheritDoc} */
         @Override public CacheObject value() {
             return val;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index 2442722..d9b820a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -90,7 +90,7 @@ public abstract class H2Tree extends BPlusTree<SearchRow, GridH2Row> {
     }
 
     /** {@inheritDoc} */
-    @Override protected GridH2Row getRow(BPlusIO<SearchRow> io, long pageAddr, int idx)
+    @Override protected GridH2Row getRow(BPlusIO<SearchRow> io, long pageAddr, int idx, Object ignore)
         throws IgniteCheckedException {
         return (GridH2Row)io.getLookupRow(this, pageAddr, idx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
index 6d4e97a..ce10cdb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Row.java
@@ -66,6 +66,11 @@ public abstract class GridH2Row extends Row implements GridSearchRowPointer, Cac
     }
 
     /** {@inheritDoc} */
+    @Override public void key(KeyCacheObject key) {
+        this.key = key;
+    }
+
+    /** {@inheritDoc} */
     @Override public CacheObject value() {
         return val;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 729309a..9c59faf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -499,6 +499,11 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         }
 
         /** {@inheritDoc} */
+        @Override public void invoke(GridSearchRowPointer key, Object x, InvokeClosure<GridH2Row> c) {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
         @Override public GridH2Row put(GridH2Row val) {
             return tree.put(val, val);
         }


[2/3] ignite git commit: ignite-4652 Implemented BPlusTree.invoke

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a869b21..4a98f6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridAtomicLong;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIterator;
@@ -82,6 +83,12 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
     public Iterable<CacheDataStore> cacheDataStores();
 
     /**
+     * @param part Partition.
+     * @return Data store.
+     */
+    public CacheDataStore dataStore(GridDhtLocalPartition part);
+
+    /**
      * @param p Partition ID.
      * @param store Data store.
      */
@@ -107,6 +114,15 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
     public long expiredSize() throws IgniteCheckedException;
 
     /**
+     * @param key Key.
+     * @param part Partition.
+     * @param c Tree update closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void invoke(KeyCacheObject key, GridDhtLocalPartition part, OffheapInvokeClosure c)
+        throws IgniteCheckedException;
+
+    /**
      * @param key  Key.
      * @param val  Value.
      * @param ver  Version.
@@ -253,6 +269,16 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
     /**
      *
      */
+    interface OffheapInvokeClosure extends IgniteTree.InvokeClosure<CacheDataRow> {
+        /**
+         * @return Old row.
+         */
+        @Nullable public CacheDataRow oldRow();
+    }
+
+    /**
+     *
+     */
     interface CacheDataStore {
         /**
          * @return Partition ID.
@@ -297,6 +323,21 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
 
         /**
          * @param key Key.
+         * @param val Value.
+         * @param ver Version.
+         * @param expireTime Expire time.
+         * @param oldRow Old row.
+         * @return New row.
+         * @throws IgniteCheckedException If failed.
+         */
+        CacheDataRow createRow(KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            long expireTime,
+            @Nullable CacheDataRow oldRow) throws IgniteCheckedException;
+
+        /**
+         * @param key Key.
          * @param part Partition.
          * @param val Value.
          * @param ver Version.
@@ -313,6 +354,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager {
 
         /**
          * @param key Key.
+         * @param c Closure.
+         * @throws IgniteCheckedException If failed.
+         */
+        public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
+
+        /**
+         * @param key Key.
          * @param partId Partition number.
          * @throws IgniteCheckedException If failed.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5df99b6..b863edd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -207,7 +207,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
      * @param part Partition.
      * @return Data store for given entry.
      */
-    private CacheDataStore dataStore(GridDhtLocalPartition part) {
+    public CacheDataStore dataStore(GridDhtLocalPartition part) {
         if (cctx.isLocal())
             return locCacheDataStore;
         else {
@@ -329,6 +329,14 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
     }
 
     /** {@inheritDoc} */
+    @Override public void invoke(KeyCacheObject key,
+        GridDhtLocalPartition part,
+        OffheapInvokeClosure c)
+        throws IgniteCheckedException {
+        dataStore(part).invoke(key, c);
+    }
+
+    /** {@inheritDoc} */
     @Override public void update(
         KeyCacheObject key,
         CacheObject val,
@@ -838,6 +846,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
         protected Long initCntr = 0L;
 
         /**
+         * @param partId Partition number.
          * @param name Name.
          * @param rowStore Row store.
          * @param dataTree Data tree.
@@ -900,6 +909,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
             if (oldRow == null || indexingEnabled)
                 return false;
 
+            if (oldRow.expireTime() != dataRow.expireTime())
+                return false;
+
             CacheObjectContext coCtx = cctx.cacheObjectContext();
 
             int oldLen = oldRow.key().valueBytesLength(coCtx) + oldRow.value().valueBytesLength(coCtx);
@@ -913,6 +925,71 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
         }
 
         /** {@inheritDoc} */
+        @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c)
+            throws IgniteCheckedException {
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+            try {
+                dataTree.invoke(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY, c);
+
+                switch (c.operationType()) {
+                    case PUT: {
+                        assert c.newRow() != null : c;
+
+                        CacheDataRow oldRow = c.oldRow();
+
+                        finishUpdate(c.newRow(), oldRow);
+
+                        break;
+                    }
+
+                    case REMOVE: {
+                        CacheDataRow oldRow = c.oldRow();
+
+                        finishRemove(key, oldRow);
+
+                        break;
+                    }
+
+                    case NOOP:
+                        break;
+
+                    default:
+                        assert false : c.operationType();
+                }
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow createRow(KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            long expireTime,
+            @Nullable CacheDataRow oldRow) throws IgniteCheckedException
+        {
+            DataRow dataRow = new DataRow(key, val, ver, partId, expireTime);
+
+            if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))
+                dataRow.link(oldRow.link());
+            else {
+                CacheObjectContext coCtx = cctx.cacheObjectContext();
+
+                key.valueBytes(coCtx);
+                val.valueBytes(coCtx);
+
+                rowStore.addRow(dataRow);
+            }
+
+            assert dataRow.link() != 0 : dataRow;
+
+            return dataRow;
+        }
+
+        /** {@inheritDoc} */
         @Override public void update(KeyCacheObject key,
             int p,
             CacheObject val,
@@ -935,14 +1012,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
 
                 CacheDataRow old;
 
-                boolean rmvOld = true;
-
                 if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) {
                     old = oldRow;
 
                     dataRow.link(oldRow.link());
-
-                    rmvOld = false;
                 }
                 else {
                     rowStore.addRow(dataRow);
@@ -956,43 +1029,68 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
                     }
                     else
                         old = dataTree.put(dataRow);
-
-                    if (old == null)
-                        storageSize.incrementAndGet();
                 }
 
-                if (indexingEnabled) {
-                    GridCacheQueryManager qryMgr = cctx.queries();
+                finishUpdate(dataRow, old);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
 
-                    assert qryMgr.enabled();
+        /**
+         * @param newRow New row.
+         * @param oldRow Old row if available.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            if (oldRow == null)
+                storageSize.incrementAndGet();
 
-                    if (old != null)
-                        qryMgr.store(key, p, old.value(), old.version(), val, ver, expireTime, dataRow.link());
-                    else
-                        qryMgr.store(key, p, null, null, val, ver, expireTime, dataRow.link());
-                }
+            KeyCacheObject key = newRow.key();
+
+            long expireTime = newRow.expireTime();
 
-                if (old != null) {
-                    assert old.link() != 0 : old;
+            if (indexingEnabled) {
+                GridCacheQueryManager qryMgr = cctx.queries();
 
-                    if (pendingEntries != null && old.expireTime() != 0)
-                        pendingEntries.removex(new PendingRow(old.expireTime(), old.link()));
+                assert qryMgr.enabled();
 
-                    if (rmvOld)
-                        rowStore.removeRow(old.link());
+                if (oldRow != null) {
+                    qryMgr.store(key,
+                        partId,
+                        oldRow.value(), oldRow.version(),
+                        newRow.value(), newRow.version(),
+                        expireTime,
+                        newRow.link());
+                }
+                else {
+                    qryMgr.store(key,
+                        partId,
+                        null, null,
+                        newRow.value(), newRow.version(),
+                        expireTime,
+                        newRow.link());
                 }
+            }
 
-                if (pendingEntries != null && expireTime != 0) {
-                    pendingEntries.putx(new PendingRow(expireTime, dataRow.link()));
+            if (oldRow != null) {
+                assert oldRow.link() != 0 : oldRow;
 
-                    hasPendingEntries = true;
-                }
+                if (pendingEntries != null && oldRow.expireTime() != 0)
+                    pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link()));
 
-                updateIgfsMetrics(key, (old != null ? old.value() : null), val);
+                if (newRow.link() != oldRow.link())
+                    rowStore.removeRow(oldRow.link());
             }
-            finally {
-                busyLock.leaveBusy();
+
+            if (pendingEntries != null && expireTime != 0) {
+                pendingEntries.putx(new PendingRow(expireTime, newRow.link()));
+
+                hasPendingEntries = true;
             }
+
+            updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), newRow.value());
         }
 
         /** {@inheritDoc} */
@@ -1001,50 +1099,59 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
                 throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
 
             try {
-                CacheDataRow dataRow = dataTree.remove(new SearchRow(key));
-
-                CacheObject val = null;
-                GridCacheVersion ver = null;
+                CacheDataRow oldRow = dataTree.remove(new SearchRow(key));
 
-                if (dataRow != null) {
-                    assert dataRow.link() != 0 : dataRow;
+                finishRemove(key, oldRow);
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
+        }
 
-                    if (pendingEntries != null && dataRow.expireTime() != 0)
-                        pendingEntries.removex(new PendingRow(dataRow.expireTime(), dataRow.link()));
+        /**
+         * @param key Key.
+         * @param oldRow Removed row.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            CacheObject val = null;
+            GridCacheVersion ver = null;
 
-                    storageSize.decrementAndGet();
+            if (oldRow != null) {
+                assert oldRow.link() != 0 : oldRow;
 
-                    val = dataRow.value();
+                if (pendingEntries != null && oldRow.expireTime() != 0)
+                    pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link()));
 
-                    ver = dataRow.version();
-                }
+                storageSize.decrementAndGet();
 
-                if (indexingEnabled) {
-                    GridCacheQueryManager qryMgr = cctx.queries();
+                val = oldRow.value();
 
-                    assert qryMgr.enabled();
+                ver = oldRow.version();
+            }
 
-                    qryMgr.remove(key, partId, val, ver);
-                }
+            if (indexingEnabled) {
+                GridCacheQueryManager qryMgr = cctx.queries();
 
-                if (dataRow != null)
-                    rowStore.removeRow(dataRow.link());
+                assert qryMgr.enabled();
 
-                updateIgfsMetrics(key, (dataRow != null ? dataRow.value() : null), null);
-            }
-            finally {
-                busyLock.leaveBusy();
+                qryMgr.remove(key, partId, val, ver);
             }
+
+            if (oldRow != null)
+                rowStore.removeRow(oldRow.link());
+
+            updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), null);
         }
 
         /** {@inheritDoc} */
         @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException {
             key.valueBytes(cctx.cacheObjectContext());
 
-            CacheDataRow row = dataTree.findOne(new SearchRow(key), dataTree.noKeyC);
+            CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY);
 
             if (row != null)
-                ((CacheDataRowAdapter)row).key(key);
+                row.key(key);
 
             return row;
         }
@@ -1261,17 +1368,6 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
         /** */
         private final GridCacheContext cctx;
 
-        /** */
-        private final RowClosure<CacheSearchRow, CacheDataRow> noKeyC = new RowClosure<CacheSearchRow, CacheDataRow>() {
-            @Override public CacheDataRow row(BPlusIO<CacheSearchRow> io, long pageAddr, int idx)
-                throws IgniteCheckedException {
-                int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
-                long link = ((RowLinkIO)io).getLink(pageAddr, idx);
-
-                return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.NO_KEY);
-            }
-        };
-
         /**
          * @param name Tree name.
          * @param reuseList Reuse list.
@@ -1320,12 +1416,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
         }
 
         /** {@inheritDoc} */
-        @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx)
+        @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags)
             throws IgniteCheckedException {
             int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
             long link = ((RowLinkIO)io).getLink(pageAddr, idx);
 
-            return rowStore.dataRow(hash, link, CacheDataRowAdapter.RowData.FULL);
+            CacheDataRowAdapter.RowData x = flags != null ?
+                (CacheDataRowAdapter.RowData)flags :
+                CacheDataRowAdapter.RowData.FULL;
+
+            return rowStore.dataRow(hash, link, x);
         }
 
         /**
@@ -1705,7 +1805,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
         }
 
         /** {@inheritDoc} */
-        @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx)
+        @Override protected PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, int idx, Object ignore)
             throws IgniteCheckedException {
             return io.getLookupRow(this, pageAddr, idx);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
index 75ab8e4..cc26b21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.database;
 
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
@@ -48,4 +49,9 @@ public interface CacheDataRow extends CacheSearchRow {
      * @param link Link for this row.
      */
     public void link(long link);
+
+    /**
+     * @param key Key.
+     */
+    public void key(KeyCacheObject key);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 4bfdd99..5a62e75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -73,6 +73,19 @@ public class CacheDataRowAdapter implements CacheDataRow {
     }
 
     /**
+     * @param key Key.
+     * @param val Value.
+     * @param expireTime Expire time.
+     * @param ver Version.
+     */
+    public CacheDataRowAdapter(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime) {
+        this.key = key;
+        this.val = val;
+        this.ver = ver;
+        this.expireTime = expireTime;
+    }
+
+    /**
      * Read row from data pages.
      *
      * @param cctx Cache context.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
index 47c3254..ca4ad05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
@@ -213,7 +213,7 @@ public class MetadataStorage implements MetaStore {
 
         /** {@inheritDoc} */
         @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr,
-            final int idx) throws IgniteCheckedException {
+            final int idx, Object ignore) throws IgniteCheckedException {
             return readRow(pageAddr, ((IndexIO)io).getOffset(pageAddr, idx));
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index aa61fbd..8827407 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -120,20 +120,6 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     /** */
     private volatile TreeMetaData treeMeta;
 
-    /**
-     *
-     */
-    public static interface RowClosure<L, R> {
-        /**
-         * @param io IO.
-         * @param pageAddr Page address.
-         * @param idx Index.
-         * @return Result.
-         * @throws IgniteCheckedException If failed.
-         */
-        public R row(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
-    }
-
     /** */
     private final GridTreePrinter<Long> treePrinter = new GridTreePrinter<Long>() {
         /** */
@@ -224,22 +210,20 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             long res = doAskNeighbor(io, pageAddr, back);
 
             if (back) {
-                assert g.getClass() == Remove.class;
-
                 if (io.getForward(pageAddr) != g.backId) // See how g.backId is setup in removeDown for this check.
                     return RETRY;
 
-                g.backId = res;
+                g.backId(res);
             }
             else {
                 assert isBack == FALSE.ordinal() : isBack;
 
-                g.fwdId = res;
+                g.fwdId(res);
             }
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Get> search = new Search();
@@ -257,7 +241,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             boolean needBackIfRouting = g.backId != 0;
 
-            g.backId = 0; // Usually we'll go left down and don't need it.
+            g.backId(0L); // Usually we'll go left down and don't need it.
 
             int cnt = io.getCount(pageAddr);
             int idx = findInsertionPoint(io, pageAddr, 0, cnt, g.row, g.shift);
@@ -282,13 +266,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             assert !io.isLeaf() : io;
 
             // If idx == cnt then we go right down, else left down: getLeft(cnt) == getRight(cnt - 1).
-            g.pageId = inner(io).getLeft(pageAddr, idx);
+            g.pageId(inner(io).getLeft(pageAddr, idx));
 
             // If we see the tree in consistent state, then our right down page must be forward for our left down page,
             // we need to setup fwdId and/or backId to be able to check this invariant on lower level.
             if (idx < cnt) {
                 // Go left down here.
-                g.fwdId = inner(io).getRight(pageAddr, idx);
+                g.fwdId(inner(io).getRight(pageAddr, idx));
             }
             else {
                 // Go right down here or it is an empty branch.
@@ -301,7 +285,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
                 // Setup fwdId.
                 if (fwdId == 0)
-                    g.fwdId = 0;
+                    g.fwdId(0L);
                 else {
                     // We can do askNeighbor on forward page here because we always take locks in forward direction.
                     Result res = askNeighbor(fwdId, g, false);
@@ -312,7 +296,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
                 // Setup backId.
                 if (cnt != 0) // It is not a routing page and we are going to the right, can get backId here.
-                    g.backId = inner(io).getLeft(pageAddr, cnt - 1);
+                    g.backId(inner(io).getLeft(pageAddr, cnt - 1));
                 else if (needBackIfRouting) {
                     // Can't get backId here because of possible deadlock and it is only needed for remove operation.
                     return GO_DOWN_X;
@@ -321,7 +305,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return GO_DOWN;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Put> replace = new Replace();
@@ -331,6 +315,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      */
     private class Replace extends GetPageHandler<Put> {
         /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
         @Override public Result run0(Page page, long pageAddr, BPlusIO<L> io, Put p, int lvl)
             throws IgniteCheckedException  {
             // Check the triangle invariant.
@@ -351,15 +336,26 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             // Detach the old row if we are on a leaf page.
             if (lvl == 0) {
-                assert p.oldRow == null;
+                assert p.oldRow == null; // The old row must be set only once.
+
+                // Inner replace state must be consistent by the end of the operation.
+                assert p.needReplaceInner == FALSE || p.needReplaceInner == DONE : p.needReplaceInner;
+
+                // Need to replace inner key if now we are replacing the rightmost row and have a forward page.
+                if (canGetRowFromInner && idx + 1 == cnt && p.fwdId != 0L && p.needReplaceInner == FALSE) {
+                    // Can happen only for invoke, otherwise inner key must be replaced on the way down.
+                    assert p.invoke != null;
+
+                    // We need to restart the operation from root to perform inner replace.
+                    // On the second pass we will not get here (will avoid infinite loop) because
+                    // needReplaceInner will be DONE or our key will not be the rightmost anymore.
+                    return RETRY_ROOT;
+                }
 
                 // Get old row in leaf page to reduce contention at upper level.
                 p.oldRow = p.needOld ? getRow(io, pageAddr, idx) : (T)Boolean.TRUE;
 
                 p.finish();
-
-                // Inner replace state must be consistent by the end of the operation.
-                assert p.needReplaceInner == FALSE || p.needReplaceInner == DONE : p.needReplaceInner;
             }
 
             boolean needWal = needWalDeltaRecord(page);
@@ -371,7 +367,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Put> insert = new Insert();
@@ -405,6 +401,9 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                 p.btmLvl++; // Get high.
                 p.row = moveUpRow;
 
+                if (p.invoke != null)
+                    p.invoke.row = moveUpRow;
+
                 // Here forward page can't be concurrently removed because we keep write lock on tail which is the only
                 // page who knows about the forward page, because it was just produced by split.
                 p.rightId = io.getForward(pageAddr);
@@ -417,7 +416,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> rmvFromLeaf = new RemoveFromLeaf();
@@ -437,15 +436,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             int idx = findInsertionPoint(io, pageAddr, 0, cnt, r.row, 0);
 
-            if (idx < 0) {
-                if (!r.ceil) // We've found exact match on search but now it's gone.
-                    return RETRY;
-
-                idx = fix(idx);
-
-                if (idx == cnt) // We can not remove ceiling row here. Bad luck.
-                    return NOT_FOUND;
-            }
+            if (idx < 0)
+                return RETRY; // We've found exact match on search but now it's gone.
 
             assert idx >= 0 && idx < cnt: idx;
 
@@ -495,7 +487,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockBackAndRmvFromLeaf = new LockBackAndRmvFromLeaf();
@@ -520,7 +512,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return res;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockBackAndTail = new LockBackAndTail();
@@ -544,7 +536,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return res;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockTailForward = new LockTailForward();
@@ -560,7 +552,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final GetPageHandler<Remove> lockTail = new LockTail();
@@ -590,7 +582,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return FOUND;
         }
-    };
+    }
 
     /** */
     private final PageHandler<Void, Bool> cutRoot = new CutRoot();
@@ -620,7 +612,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return TRUE;
         }
-    };
+    }
 
     /** */
     private final PageHandler<Long, Bool> addRoot = new AddRoot();
@@ -651,7 +643,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return TRUE;
         }
-    };
+    }
 
     /** */
     private final PageHandler<Long, Bool> initRoot = new InitRoot();
@@ -681,7 +673,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
             return TRUE;
         }
-    };
+    }
 
     /**
      * @param name Tree name.
@@ -947,15 +939,16 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
     /**
      * @param row Lookup row for exact match.
-     * @param c Found row closure.
-     * @return Found result.
+     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+     * @return Found result or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
-    public final <R> R findOne(L row, RowClosure<L, R> c) throws IgniteCheckedException {
+    @SuppressWarnings("unchecked")
+    public final <R> R findOne(L row, Object x) throws IgniteCheckedException {
         checkDestroyed();
 
         try {
-            GetOne g = new GetOne(row, c);
+            GetOne g = new GetOne(row, x);
 
             doFind(g);
 
@@ -1036,7 +1029,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                             case RETRY:
                                 checkInterrupted();
 
-                                continue; // The child page got splitted, need to reread our page.
+                                continue; // The child page got split, need to reread our page.
 
                             default:
                                 return res;
@@ -1437,48 +1430,183 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @return Removed row.
      * @throws IgniteCheckedException If failed.
      */
-    @SuppressWarnings("unused")
-    public final T removeCeil(L row) throws IgniteCheckedException {
-        return doRemove(row, true, true);
+    @Override public final T remove(L row) throws IgniteCheckedException {
+        return doRemove(row, true);
     }
 
     /**
      * @param row Lookup row.
-     * @return Removed row.
      * @throws IgniteCheckedException If failed.
+     * @return {@code True} if removed row.
      */
-    @Override public final T remove(L row) throws IgniteCheckedException {
-        return doRemove(row, false, true);
+    public final boolean removex(L row) throws IgniteCheckedException {
+        Boolean res = (Boolean)doRemove(row, false);
+
+        return res != null ? res : false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void invoke(L row, Object z, InvokeClosure<T> c) throws IgniteCheckedException {
+        checkDestroyed();
+
+        Invoke x = new Invoke(row, z, c);
+
+        try {
+            for (;;) {
+                x.init();
+
+                Result res = invokeDown(x, x.rootId, 0L, 0L, x.rootLvl);
+
+                switch (res) {
+                    case RETRY:
+                    case RETRY_ROOT:
+                        checkInterrupted();
+
+                        continue;
+
+                    default:
+                        if (!x.isFinished()) {
+                            res = x.tryFinish();
+
+                            if (res == RETRY || res == RETRY_ROOT) {
+                                checkInterrupted();
+
+                                continue;
+                            }
+
+                            assert x.isFinished(): res;
+                        }
+
+                        return;
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteCheckedException("Runtime failure on search row: " + row, e);
+        }
+        catch (RuntimeException e) {
+            throw new IgniteException("Runtime failure on search row: " + row, e);
+        }
+        catch (AssertionError e) {
+            throw new AssertionError("Assertion error on search row: " + row, e);
+        }
+        finally {
+            x.releaseAll();
+        }
     }
 
     /**
-     * @param row Lookup row.
+     * @param x Invoke operation.
+     * @param pageId Page ID.
+     * @param backId Expected backward page ID if we are going to the right.
+     * @param fwdId Expected forward page ID.
+     * @param lvl Level.
+     * @return Result code.
      * @throws IgniteCheckedException If failed.
-     * @return {@code True} if removed row.
      */
-    public final boolean removex(L row) throws IgniteCheckedException {
-        Boolean res = (Boolean)doRemove(row, false, false);
+    private Result invokeDown(final Invoke x, final long pageId, final long backId, final long fwdId, final int lvl)
+        throws IgniteCheckedException {
+        assert lvl >= 0 : lvl;
 
-        return res != null ? res : false;
+        if (x.isTail(pageId, lvl))
+            return FOUND; // We've already locked this page, so return that we are ok.
+
+        final Page page = page(pageId);
+
+        try {
+            for (;;) {
+                // Init args.
+                x.pageId(pageId);
+                x.fwdId(fwdId);
+                x.backId(backId);
+
+                Result res = readPage(page, this, search, x, lvl, RETRY);
+
+                switch (res) {
+                    case GO_DOWN_X:
+                        assert backId != 0;
+                        assert x.backId == 0; // We did not setup it yet.
+
+                        x.backId(pageId); // Dirty hack to setup a check inside of askNeighbor.
+
+                        // We need to get backId here for our child page, it must be the last child of our back.
+                        res = askNeighbor(backId, x, true);
+
+                        if (res != FOUND)
+                            return res; // Retry.
+
+                        assert x.backId != pageId; // It must be updated in askNeighbor.
+
+                        // Intentional fallthrough.
+                    case GO_DOWN:
+                        res = x.tryReplaceInner(page, pageId, fwdId, lvl);
+
+                        if (res != RETRY)
+                            res = invokeDown(x, x.pageId, x.backId, x.fwdId, lvl - 1);
+
+                        if (res == RETRY_ROOT || x.isFinished())
+                            return res;
+
+                        if (res == RETRY) {
+                            checkInterrupted();
+
+                            continue;
+                        }
+
+                        // Unfinished Put does insertion on the same level.
+                        if (x.isPut())
+                            continue;
+
+                        assert x.isRemove(); // Guarded by isFinished.
+
+                        res = x.finishOrLockTail(page, pageId, backId, fwdId, lvl);
+
+                        return res;
+
+                    case NOT_FOUND:
+                        if (lvl == 0)
+                            x.invokeClosure();
+
+                        return x.onNotFound(page, pageId, fwdId, lvl);
+
+                    case FOUND:
+                        if (lvl == 0)
+                            x.invokeClosure();
+
+                        return x.onFound(page, pageId, backId, fwdId, lvl);
+
+                    default:
+                        return res;
+                }
+            }
+        }
+        finally {
+            x.levelExit();
+
+            if (x.canRelease(page, lvl))
+                page.close();
+        }
     }
 
+
     /**
      * @param row Lookup row.
-     * @param ceil If we can remove ceil row when we can not find exact.
      * @param needOld {@code True} if need return removed row.
      * @return Removed row.
      * @throws IgniteCheckedException If failed.
      */
-    private T doRemove(L row, boolean ceil, boolean needOld) throws IgniteCheckedException {
+    private T doRemove(L row, boolean needOld) throws IgniteCheckedException {
         checkDestroyed();
 
-        Remove r = new Remove(row, ceil, needOld);
+        Remove r = new Remove(row, needOld);
 
         try {
             for (;;) {
                 r.init();
 
-                switch (removeDown(r, r.rootId, 0L, 0L, r.rootLvl)) {
+                Result res = removeDown(r, r.rootId, 0L, 0L, r.rootLvl);
+
+                switch (res) {
                     case RETRY:
                     case RETRY_ROOT:
                         checkInterrupted();
@@ -1487,15 +1615,11 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
 
                     default:
                         if (!r.isFinished()) {
-                            Result res = r.finishTail();
+                            res = r.finishTail();
 
                             // If not found, then the tree grew beyond our call stack -> retry from the actual root.
                             if (res == RETRY || res == NOT_FOUND) {
-                                int root = getRootLevel();
-
-                                boolean checkRes = r.checkTailLevel(root);
-
-                                assert checkRes : "tail=" + r.tail + ", root=" + root + ", res=" + res;
+                                assert r.checkTailLevel(getRootLevel()) : "tail=" + r.tail + ", res=" + res;
 
                                 checkInterrupted();
 
@@ -1521,9 +1645,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             throw new AssertionError("Assertion error on search row: " + row, e);
         }
         finally {
-            r.releaseTail();
-
-            r.reuseFreePages();
+            r.releaseAll();
         }
     }
 
@@ -1579,12 +1701,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                             continue;
                         }
 
-                        if (res != RETRY_ROOT && !r.isFinished()) {
-                            res = r.finishTail();
+                        if (res == RETRY_ROOT || r.isFinished())
+                            return res;
 
-                            if (res == NOT_FOUND)
-                                res = r.lockTail(pageId, page, backId, fwdId, lvl);
-                        }
+                        res = r.finishOrLockTail(page, pageId, backId, fwdId, lvl);
 
                         return res;
 
@@ -1592,31 +1712,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                         // We are at the bottom.
                         assert lvl == 0 : lvl;
 
-                        if (!r.ceil) {
-                            r.finish();
-
-                            return res;
-                        }
+                        r.finish();
 
-                        // Intentional fallthrough for ceiling remove.
+                        return res;
 
                     case FOUND:
-                        // We must be at the bottom here, just need to remove row from the current page.
-                        assert lvl == 0 : lvl;
-
-                        res = r.removeFromLeaf(pageId, page, backId, fwdId);
-
-                        if (res == NOT_FOUND) {
-                            assert r.ceil : "must be a retry if not a ceiling remove";
-
-                            r.finish();
-                        }
-                        else if (res == FOUND && r.tail == null) {
-                            // Finish if we don't need to do any merges.
-                            r.finish();
-                        }
-
-                        return res;
+                        return r.tryRemoveFromLeaf(page, pageId, backId, fwdId, lvl);
 
                     default:
                         return res;
@@ -1716,7 +1817,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * {@inheritDoc}
      */
     @Override public final T put(T row) throws IgniteCheckedException {
-        return put(row, true);
+        return doPut(row, true);
     }
 
     /**
@@ -1725,7 +1826,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @return {@code True} if replaced existing row.
      */
     public boolean putx(T row) throws IgniteCheckedException {
-        Boolean res = (Boolean)put(row, false);
+        Boolean res = (Boolean)doPut(row, false);
 
         return res != null ? res : false;
     }
@@ -1736,7 +1837,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @return Old row.
      * @throws IgniteCheckedException If failed.
      */
-    private T put(T row, boolean needOld) throws IgniteCheckedException {
+    private T doPut(T row, boolean needOld) throws IgniteCheckedException {
         checkDestroyed();
 
         Put p = new Put(row, needOld);
@@ -1858,7 +1959,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     /**
      * @return {@code True} if state was changed.
      */
-    private final boolean markDestroyed() {
+    private boolean markDestroyed() {
         return destroyed.compareAndSet(false, true);
     }
 
@@ -1997,31 +2098,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                         assert p.pageId != pageId;
                         assert p.fwdId != fwdId || fwdId == 0;
 
-                        // Need to replace key in inner page. There is no race because we keep tail lock after split.
-                        if (p.needReplaceInner == TRUE) {
-                            p.needReplaceInner = FALSE; // Protect from retries.
-
-                            long oldFwdId = p.fwdId;
-                            long oldPageId = p.pageId;
-
-                            // Set old args.
-                            p.fwdId = fwdId;
-                            p.pageId = pageId;
-
-                            res = writePage(pageMem, page, this, replace, p, lvl, RETRY);
+                        res = p.tryReplaceInner(page, pageId, fwdId, lvl);
 
-                            // Restore args.
-                            p.pageId = oldPageId;
-                            p.fwdId = oldFwdId;
-
-                            if (res != FOUND)
-                                return res; // Need to retry.
-
-                            p.needReplaceInner = DONE; // We can have only single matching inner key.
-                        }
-
-                        // Go down recursively.
-                        res = putDown(p, p.pageId, p.fwdId, lvl - 1);
+                        if (res != RETRY) // Go down recursively.
+                            res = putDown(p, p.pageId, p.fwdId, lvl - 1);
 
                         if (res == RETRY_ROOT || p.isFinished())
                             return res;
@@ -2034,21 +2114,13 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                     case FOUND: // Do replace.
                         assert lvl == 0 : "This replace can happen only at the bottom level.";
 
-                        // Init args.
-                        p.pageId = pageId;
-                        p.fwdId = fwdId;
-
-                        return writePage(pageMem, page, this, replace, p, lvl, RETRY);
+                        return p.tryReplace(page, pageId, fwdId, lvl);
 
                     case NOT_FOUND: // Do insert.
                         assert lvl == p.btmLvl : "must insert at the bottom level";
                         assert p.needReplaceInner == FALSE : p.needReplaceInner + " " + lvl;
 
-                        // Init args.
-                        p.pageId = pageId;
-                        p.fwdId = fwdId;
-
-                        return writePage(pageMem, page, this, insert, p, lvl, RETRY);
+                        return p.tryInsert(page, pageId, fwdId, lvl);
 
                     default:
                         return res;
@@ -2096,29 +2168,32 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      */
     private abstract class Get {
         /** */
-        protected long rmvId;
+        long rmvId;
 
         /** Starting point root level. May be outdated. Must be modified only in {@link Get#init()}. */
-        protected int rootLvl;
+        int rootLvl;
 
         /** Starting point root ID. May be outdated. Must be modified only in {@link Get#init()}. */
-        protected long rootId;
+        long rootId;
 
         /** */
-        protected L row;
+        L row;
 
         /** In/Out parameter: Page ID. */
-        protected long pageId;
+        long pageId;
 
         /** In/Out parameter: expected forward page ID. */
-        protected long fwdId;
+        long fwdId;
 
         /** In/Out parameter: in case of right turn this field will contain backward page ID for the child. */
-        protected long backId;
+        long backId;
 
         /** */
         int shift;
 
+        /** If this operation is a part of invoke. */
+        Invoke invoke;
+
         /**
          * @param row Row.
          */
@@ -2129,6 +2204,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         }
 
         /**
+         * @param g Other operation to copy from.
+         * @return {@code this}.
+         */
+        final Get copyFrom(Get g) {
+            rmvId = g.rmvId;
+            rootLvl = g.rootLvl;
+            pageId = g.pageId;
+            fwdId = g.fwdId;
+            backId = g.backId;
+            shift = g.shift;
+
+            return this;
+        }
+
+        /**
          * Initialize operation.
          *
          * @throws IgniteCheckedException If failed.
@@ -2146,7 +2236,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @param rootLvl Root level.
          * @param rmvId Remove ID to be afraid of.
          */
-        final void restartFromRoot(long rootId, int rootLvl, long rmvId) {
+        void restartFromRoot(long rootId, int rootLvl, long rmvId) {
             this.rootId = rootId;
             this.rootLvl = rootLvl;
             this.rmvId = rmvId;
@@ -2188,6 +2278,34 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         boolean canRelease(Page page, int lvl) {
             return page != null;
         }
+
+        /**
+         * @param backId Back page ID.
+         */
+        void backId(long backId) {
+            this.backId = backId;
+        }
+
+        /**
+         * @param pageId Page ID.
+         */
+        void pageId(long pageId) {
+            this.pageId = pageId;
+        }
+
+        /**
+         * @param fwdId Forward page ID.
+         */
+        void fwdId(long fwdId) {
+            this.fwdId = fwdId;
+        }
+
+        /**
+         * @return {@code true} If the operation is finished.
+         */
+        boolean isFinished() {
+            throw new IllegalStateException();
+        }
     }
 
     /**
@@ -2195,25 +2313,26 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      */
     private final class GetOne extends Get {
         /** */
-        private final RowClosure<L, ?> c;
+        Object x;
 
         /**
          * @param row Row.
-         * @param c Row closure.
+         * @param x Implementation specific argument.
          */
-        private GetOne(L row, RowClosure<L, ?> c) {
+        private GetOne(L row, Object x) {
             super(row);
 
-            this.c = c;
+            this.x = x;
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
         @Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException {
             // Check if we are on an inner page and can't get row from it.
             if (lvl != 0 && !canGetRowFromInner)
                 return false;
 
-            row = c != null ? (L)c.row(io, pageAddr, idx) : getRow(io, pageAddr, idx);
+            row = getRow(io, pageAddr, idx, x);
 
             return true;
         }
@@ -2224,7 +2343,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      */
     private final class GetCursor extends Get {
         /** */
-        private ForwardCursor cursor;
+        ForwardCursor cursor;
 
         /**
          * @param lower Lower bound.
@@ -2261,31 +2380,31 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      */
     private final class Put extends Get {
         /** Right child page ID for split row. */
-        private long rightId;
+        long rightId;
 
         /** Replaced row if any. */
-        private T oldRow;
+        T oldRow;
 
         /**
          * This page is kept locked after split until insert to the upper level will not be finished.
          * It is needed because split row will be "in flight" and if we'll release tail, remove on
          * split row may fail.
          */
-        private Page tail;
+        Page tail;
 
         /** */
-        private long tailPageAddr;
+        long tailPageAddr;
 
         /**
          * Bottom level for insertion (insert can't go deeper). Will be incremented on split on each level.
          */
-        private short btmLvl;
+        short btmLvl;
 
         /** */
         Bool needReplaceInner = FALSE;
 
         /** */
-        private final boolean needOld;
+        final boolean needOld;
 
         /**
          * @param row Row.
@@ -2302,6 +2421,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             if (lvl == 0) // Leaf: need to stop.
                 return true;
 
+            assert btmLvl == 0; // It can not be insert.
+
             // If we can get full row from the inner page, we have to replace it with the new one. On the way down
             // we can not miss inner key even in presence of concurrent operations because of `triangle` invariant +
             // concurrent inner replace handling by retrying from root.
@@ -2348,10 +2469,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             tail(null, 0L);
         }
 
-        /**
-         * @return {@code true} If finished.
-         */
-        private boolean isFinished() {
+        /** {@inheritDoc} */
+        @Override boolean isFinished() {
             return row == null;
         }
 
@@ -2505,45 +2624,404 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
                 }
             }
         }
-    }
 
-    /**
-     * Remove operation.
-     */
-    private final class Remove extends Get implements ReuseBag {
-        /** */
-        private boolean ceil;
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryReplaceInner(Page page, long pageId, long fwdId, int lvl)
+            throws IgniteCheckedException {
+            // Need to replace key in inner page. There is no race because we keep tail lock after split.
+            if (needReplaceInner == TRUE) {
+                needReplaceInner = FALSE; // Protect from retries.
 
-        /** We may need to lock part of the tree branch from the bottom to up for multiple levels. */
-        private Tail<L> tail;
+                long oldFwdId = this.fwdId;
+                long oldPageId = this.pageId;
 
-        /** */
-        Bool needReplaceInner = FALSE;
+                // Set old args.
+                this.fwdId = fwdId;
+                this.pageId = pageId;
 
-        /** */
-        Bool needMergeEmptyBranch = FALSE;
+                Result res = writePage(pageMem, page, BPlusTree.this, replace, this, lvl, RETRY);
 
-        /** Removed row. */
-        private T rmvd;
+                // Restore args.
+                this.pageId = oldPageId;
+                this.fwdId = oldFwdId;
 
-        /** Current page. */
-        private Page page;
+                if (res == RETRY)
+                    return RETRY;
+
+                needReplaceInner = DONE; // We can have only a single matching inner key.
+
+                return FOUND;
+            }
+
+            return NOT_FOUND;
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryInsert(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException {
+            // Init args.
+            this.pageId = pageId;
+            this.fwdId = fwdId;
+
+            return writePage(pageMem, page, BPlusTree.this, insert, this, lvl, RETRY);
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        public Result tryReplace(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException {
+            // Init args.
+            this.pageId = pageId;
+            this.fwdId = fwdId;
+
+            return writePage(pageMem, page, BPlusTree.this, replace, this, lvl, RETRY);
+        }
+    }
+
+    /**
+     * Invoke operation.
+     */
+    private final class Invoke extends Get {
+        /** */
+        Object x;
+
+        /** */
+        InvokeClosure<T> clo;
+
+        /** */
+        Bool closureInvoked = FALSE;
+
+        /** */
+        T foundRow;
+
+        /** */
+        Get op;
+
+        /**
+         * @param row Row.
+         * @param x Implementation specific argument.
+         * @param clo Closure.
+         */
+        private Invoke(L row, Object x, final InvokeClosure<T> clo) {
+            super(row);
+
+            assert clo != null;
+
+            this.clo = clo;
+            this.x = x;
+        }
+
+        /** {@inheritDoc} */
+        @Override void pageId(long pageId) {
+            this.pageId = pageId;
+
+            if (op != null)
+                op.pageId = pageId;
+        }
+
+        /** {@inheritDoc} */
+        @Override void fwdId(long fwdId) {
+            this.fwdId = fwdId;
+
+            if (op != null)
+                op.fwdId = fwdId;
+        }
+
+        /** {@inheritDoc} */
+        @Override void backId(long backId) {
+            this.backId = backId;
+
+            if (op != null)
+                op.backId = backId;
+        }
+
+        /** {@inheritDoc} */
+        @Override void restartFromRoot(long rootId, int rootLvl, long rmvId) {
+            super.restartFromRoot(rootId, rootLvl, rmvId);
+
+            if (op != null)
+                op.restartFromRoot(rootId, rootLvl, rmvId);
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean found(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException {
+            // If the operation is initialized, then the closure has been called already.
+            if (op != null)
+                return op.found(io, pageAddr, idx, lvl);
+
+            if (lvl == 0) {
+                if (closureInvoked == FALSE) {
+                    closureInvoked = READY;
+
+                    foundRow = getRow(io, pageAddr, idx, x);
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean notFound(BPlusIO<L> io, long pageAddr, int idx, int lvl) throws IgniteCheckedException {
+            // If the operation is initialized, then the closure has been called already.
+            if (op != null)
+                return op.notFound(io, pageAddr, idx, lvl);
+
+            if (lvl == 0) {
+                if (closureInvoked == FALSE)
+                    closureInvoked = READY;
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void invokeClosure() throws IgniteCheckedException {
+            if (closureInvoked != READY)
+                return;
+
+            closureInvoked = DONE;
+
+            clo.call(foundRow);
+
+            switch (clo.operationType()) {
+                case PUT:
+                    T newRow = clo.newRow();
+
+                    assert newRow != null;
+
+                    op = new Put(newRow, false);
+
+                    break;
+
+                case REMOVE:
+                    assert foundRow != null;
+
+                    op = new Remove(row, false);
+
+                    break;
+
+                case NOOP:
+                    return;
+
+                default:
+                    throw new IllegalStateException();
+            }
+
+            op.copyFrom(this);
+
+            op.invoke = this;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean canRelease(Page page, int lvl) {
+            if (page == null)
+                return false;
+
+            if (op == null)
+                return true;
+
+            return op.canRelease(page, lvl);
+        }
+
+        /**
+         * @return {@code true} If it is a {@link Put} operation internally.
+         */
+        private boolean isPut() {
+            return op != null && op.getClass() == Put.class;
+        }
+
+        /**
+         * @return {@code true} If it is a {@link Remove} operation internally.
+         */
+        private boolean isRemove() {
+            return op != null && op.getClass() == Remove.class;
+        }
+
+        /**
+         * @param pageId Page ID.
+         * @param lvl Level.
+         * @return {@code true} If it is a {@link Remove} and the page is in tail.
+         */
+        private boolean isTail(long pageId, int lvl) {
+            return isRemove() && ((Remove)op).isTail(pageId, lvl);
+        }
+
+        /**
+         */
+        private void levelExit() {
+            if (isRemove())
+                ((Remove)op).page = null;
+        }
+
+        /**
+         * Release all the resources by the end of operation.
+         */
+        private void releaseAll() throws IgniteCheckedException {
+            if (isRemove())
+                ((Remove)op).releaseAll();
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result onNotFound(Page page, long pageId, long fwdId, int lvl)
+            throws IgniteCheckedException {
+            if (op == null)
+                return NOT_FOUND;
+
+            if (isRemove()) {
+                assert lvl == 0;
+
+                ((Remove)op).finish();
+
+                return NOT_FOUND;
+            }
+
+            return ((Put)op).tryInsert(page, pageId, fwdId, lvl);
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result onFound(Page page, long pageId, long backId, long fwdId, int lvl)
+            throws IgniteCheckedException {
+            if (op == null)
+                return FOUND;
+
+            if (isRemove())
+                return ((Remove)op).tryRemoveFromLeaf(page, pageId, backId, fwdId, lvl);
+
+            return  ((Put)op).tryReplace(page, pageId, fwdId, lvl);
+        }
+
+        /**
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryFinish() throws IgniteCheckedException {
+            assert op != null; // Must be guarded by isFinished.
+
+            if (isPut())
+                return RETRY;
+
+            Result res = ((Remove)op).finishTail();
+
+            if (res == NOT_FOUND)
+                res = RETRY;
+
+            assert res == FOUND || res == RETRY: res;
+
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean isFinished() {
+            if (closureInvoked != DONE)
+                return false;
+
+            if (op == null)
+                return true;
+
+            return op.isFinished();
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        Result tryReplaceInner(Page page, long pageId, long fwdId, int lvl) throws IgniteCheckedException {
+            if (!isPut())
+                return NOT_FOUND;
+
+            return ((Put)op).tryReplaceInner(page, pageId, fwdId, lvl);
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        public Result finishOrLockTail(Page page, long pageId, long backId, long fwdId, int lvl)
+            throws IgniteCheckedException {
+            return ((Remove)op).finishOrLockTail(page, pageId, backId, fwdId, lvl);
+        }
+    }
+
+    /**
+     * Remove operation.
+     */
+    private final class Remove extends Get implements ReuseBag {
+        /** We may need to lock part of the tree branch from the bottom to up for multiple levels. */
+        Tail<L> tail;
+
+        /** */
+        Bool needReplaceInner = FALSE;
+
+        /** */
+        Bool needMergeEmptyBranch = FALSE;
+
+        /** Removed row. */
+        T rmvd;
+
+        /** Current page. */
+        Page page;
 
         /** */
-        private Object freePages;
+        Object freePages;
 
         /** */
-        private final boolean needOld;
+        final boolean needOld;
 
         /**
          * @param row Row.
-         * @param ceil If we can remove ceil row when we can not find exact.
          * @param needOld {@code True} If need return old value.
          */
-        private Remove(L row, boolean ceil, boolean needOld) {
+        private Remove(L row, boolean needOld) {
             super(row);
 
-            this.ceil = ceil;
             this.needOld = needOld;
         }
 
@@ -2551,12 +3029,12 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         @SuppressWarnings("unchecked")
         @Override public long pollFreePage() {
             if (freePages == null)
-                return 0;
+                return 0L;
 
             if (freePages.getClass() == GridLongList.class) {
                 GridLongList list = ((GridLongList)freePages);
 
-                return list.isEmpty() ? 0 : list.remove();
+                return list.isEmpty() ? 0L : list.remove();
             }
 
             long res = (long)freePages;
@@ -2569,7 +3047,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
         /** {@inheritDoc} */
         @SuppressWarnings("unchecked")
         @Override public void addFreePage(long pageId) {
-            assert pageId != 0;
+            assert pageId != 0L;
 
             if (freePages == null)
                 freePages = pageId;
@@ -2957,6 +3435,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @param idx Index to remove.
          * @throws IgniteCheckedException If failed.
          */
+        @SuppressWarnings("unchecked")
         private void removeDataRowFromLeaf(Page page, BPlusIO<L> io, long pageAddr, int cnt, int idx)
             throws IgniteCheckedException {
             assert idx >= 0 && idx < cnt: idx;
@@ -3280,10 +3759,8 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
             return true;
         }
 
-        /**
-         * @return {@code true} If finished.
-         */
-        private boolean isFinished() {
+        /** {@inheritDoc} */
+        @Override boolean isFinished() {
             return row == null;
         }
 
@@ -3438,9 +3915,58 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
          * @param rootLvl Actual root level.
          * @return {@code true} If tail level is correct.
          */
-        public boolean checkTailLevel(int rootLvl) {
+        private boolean checkTailLevel(int rootLvl) {
             return tail == null || tail.lvl < rootLvl;
         }
+
+        /**
+         * @throws IgniteCheckedException If failed.
+         */
+        private void releaseAll() throws IgniteCheckedException {
+            releaseTail();
+            reuseFreePages();
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result finishOrLockTail(Page page, long pageId, long backId, long fwdId, int lvl)
+            throws IgniteCheckedException {
+            Result res = finishTail();
+
+            if (res == NOT_FOUND)
+                res = lockTail(pageId, page, backId, fwdId, lvl);
+
+            return res;
+        }
+
+        /**
+         * @param page Page.
+         * @param pageId Page ID.
+         * @param backId Back page ID.
+         * @param fwdId Forward ID.
+         * @param lvl Level.
+         * @return Result.
+         * @throws IgniteCheckedException If failed.
+         */
+        private Result tryRemoveFromLeaf(Page page, long pageId, long backId, long fwdId, int lvl)
+            throws IgniteCheckedException {
+            // We must be at the bottom here, just need to remove row from the current page.
+            assert lvl == 0 : lvl;
+
+            Result res = removeFromLeaf(pageId, page, backId, fwdId);
+
+            if (res == FOUND && tail == null) // Finish if we don't need to do any merges.
+                finish();
+
+            return res;
+        }
     }
 
     /**
@@ -3619,7 +4145,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     protected abstract int compare(BPlusIO<L> io, long pageAddr, int idx, L row) throws IgniteCheckedException;
 
     /**
-     * Get the full detached row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}.
+     * Get a full detached data row.
      *
      * @param io IO.
      * @param pageAddr Page address.
@@ -3627,7 +4153,21 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
      * @return Full detached data row.
      * @throws IgniteCheckedException If failed.
      */
-    protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException;
+    protected final T getRow(BPlusIO<L> io, long pageAddr, int idx) throws IgniteCheckedException {
+        return getRow(io, pageAddr, idx, null);
+    }
+
+    /**
+     * Get data row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}.
+     *
+     * @param io IO.
+     * @param pageAddr Page address.
+     * @param idx Index.
+     * @param x Implementation specific argument, {@code null} always means that we need to return full detached data row.
+     * @return Data row.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract T getRow(BPlusIO<L> io, long pageAddr, int idx, Object x) throws IgniteCheckedException;
 
     /**
      * Forward cursor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index fc78f69..0c71731 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -97,6 +97,11 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
     }
 
     /** {@inheritDoc} */
+    @Override protected long nextPartCounter() {
+        return locPart.nextUpdateCounter();
+    }
+
+    /** {@inheritDoc} */
     @Override public int memorySize() throws IgniteCheckedException {
         int rdrsOverhead;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
index 8dcd205..7eae0d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteTree.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.lang.GridCursor;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Interface for ignite internal tree.
@@ -34,6 +35,14 @@ public interface IgniteTree<L, T> {
     public T put(T val) throws IgniteCheckedException;
 
     /**
+     * @param key Key.
+     * @param x Implementation specific argument, {@code null} always means that we need a full detached data row.
+     * @param c Closure.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void invoke(L key, Object x, InvokeClosure<T> c) throws IgniteCheckedException;
+
+    /**
      * Returns the value to which the specified key is mapped, or {@code null} if this tree contains no mapping for the
      * key.
      *
@@ -70,4 +79,42 @@ public interface IgniteTree<L, T> {
      * @throws IgniteCheckedException If failed.
      */
     public long size() throws IgniteCheckedException;
+
+    /**
+     *
+     */
+    interface InvokeClosure<T> {
+        /**
+         *
+         * @param row Old row or {@code null} if old row not found.
+         * @throws IgniteCheckedException If failed.
+         */
+        void call(@Nullable T row) throws IgniteCheckedException;
+
+        /**
+         * @return New row for {@link OperationType#PUT} operation.
+         */
+        T newRow();
+
+        /**
+         * @return Operation type for this closure or {@code null} if it is unknown.
+         *      After method {@link #call(Object)} has been called, operation type must
+         *      be know and this method can not return {@code null}.
+         */
+        OperationType operationType();
+    }
+
+    /**
+     *
+     */
+    enum OperationType {
+        /** */
+        NOOP,
+
+        /** */
+        REMOVE,
+
+        /** */
+        PUT
+    }
 }


[3/3] ignite git commit: ignite-4652 Implemented BPlusTree.invoke

Posted by sb...@apache.org.
ignite-4652 Implemented BPlusTree.invoke


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee28b9cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee28b9cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee28b9cb

Branch: refs/heads/ignite-3477
Commit: ee28b9cb89400af6fcddd89a52fcd1adbcd5d4ff
Parents: 40f015d
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 22 09:55:50 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 22 09:55:50 2017 +0300

----------------------------------------------------------------------
 .../benchmarks/jmh/tree/BPlusTreeBenchmark.java |    3 +-
 .../internal/pagemem/wal/record/DataRecord.java |   10 +-
 .../processors/cache/GridCacheMapEntry.java     | 1729 +++++++++++-------
 .../cache/GridCacheUpdateAtomicResult.java      |   96 +-
 .../cache/IgniteCacheOffheapManager.java        |   48 +
 .../cache/IgniteCacheOffheapManagerImpl.java    |  236 ++-
 .../processors/cache/database/CacheDataRow.java |    6 +
 .../cache/database/CacheDataRowAdapter.java     |   13 +
 .../cache/database/MetadataStorage.java         |    2 +-
 .../cache/database/tree/BPlusTree.java          |  936 ++++++++--
 .../distributed/dht/GridDhtCacheEntry.java      |    5 +
 .../apache/ignite/internal/util/IgniteTree.java |   47 +
 .../processors/database/BPlusTreeSelfTest.java  |  272 ++-
 .../database/FreeListImplSelfTest.java          |    5 +
 .../processors/query/h2/database/H2Tree.java    |    2 +-
 .../processors/query/h2/opt/GridH2Row.java      |    5 +
 .../query/h2/opt/GridH2TreeIndex.java           |    5 +
 17 files changed, 2462 insertions(+), 958 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
index 7355850..dc74363 100644
--- a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/tree/BPlusTreeBenchmark.java
@@ -190,7 +190,8 @@ public class BPlusTreeBenchmark extends JmhAbstractBenchmark {
         }
 
         /** {@inheritDoc} */
-        @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx) throws IgniteCheckedException {
+        @Override protected Long getRow(BPlusIO<Long> io, long pageAddr, int idx, Object ignore)
+            throws IgniteCheckedException {
             assert io.canGetRow() : io;
 
             return io.getLookupRow(this, pageAddr, idx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index 6592852..d2747f1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -17,15 +17,10 @@
 
 package org.apache.ignite.internal.pagemem.wal.record;
 
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  *
@@ -68,6 +63,7 @@ public class DataRecord extends WALRecord {
         return writeEntries == null ? Collections.<DataEntry>emptyList() : writeEntries;
     }
 
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DataRecord.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 6dc1d04..d28ea25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -40,7 +40,9 @@ import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult.UpdateOutcome;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRowAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
 import org.apache.ignite.internal.processors.dr.GridDrType;
+import org.apache.ignite.internal.util.IgniteTree;
 import org.apache.ignite.internal.util.lang.GridClosureException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple;
@@ -83,6 +86,8 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_UNLOCKED;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
 
 /**
@@ -1535,11 +1540,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public GridCacheUpdateAtomicResult innerUpdate(
-        GridCacheVersion newVer,
+        final GridCacheVersion newVer,
         final UUID evtNodeId,
         final UUID affNodeId,
-        GridCacheOperation op,
-        @Nullable Object writeObj,
+        final GridCacheOperation op,
+        @Nullable final Object writeObj,
         @Nullable final Object[] invokeArgs,
         final boolean writeThrough,
         final boolean readThrough,
@@ -1555,42 +1560,22 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         final GridDrType drType,
         final long explicitTtl,
         final long explicitExpireTime,
-        @Nullable GridCacheVersion conflictVer,
+        @Nullable final GridCacheVersion conflictVer,
         final boolean conflictResolve,
         final boolean intercept,
         @Nullable final UUID subjId,
         final String taskName,
         @Nullable final CacheObject prevVal,
         @Nullable final Long updateCntr,
-        @Nullable GridDhtAtomicAbstractUpdateFuture fut
+        @Nullable final GridDhtAtomicAbstractUpdateFuture fut
     ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
-        assert cctx.atomic();
-
-        boolean res = true;
-
-        CacheObject oldVal;
-        CacheObject updated;
-
-        GridCacheVersion enqueueVer = null;
+        assert cctx.atomic() && !detached();
 
-        GridCacheVersionConflictContext<?, ?> conflictCtx = null;
-
-        IgniteBiTuple<Object, Exception> invokeRes = null;
-
-        // System TTL/ET which may have special values.
-        long newSysTtl;
-        long newSysExpireTime;
-
-        // TTL/ET which will be passed to entry on update.
-        long newTtl;
-        long newExpireTime;
-
-        Object key0 = null;
-        Object updated0 = null;
-
-        Long updateCntr0 = null;
+        AtomicCacheUpdateClosure c;
 
         synchronized (this) {
+            checkObsolete();
+
             boolean internal = isInternal() || !context().userCache();
 
             Map<UUID, CacheContinuousQueryListener> lsnrs = cctx.continuousQueries().updateListeners(internal, false);
@@ -1598,679 +1583,270 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM
                 || !F.isEmptyOrNulls(filter);
 
-            checkObsolete();
-
-            CacheDataRow oldRow = null;
-
-            // Load and remove from swap if it is new.
-            if (isStartVersion())
-                oldRow = unswap(retval, false);
-
-            // Prepare old value.
-            oldVal = val;
-
             // Possibly read value from store.
-            boolean readFromStore = false;
-
-            Object old0 = null;
-
-            if (readThrough && needVal && oldVal == null && (cctx.readThrough() &&
-                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) {
-                old0 = readThrough(null, key, false, subjId, taskName);
-
-                oldVal = cctx.toCacheObject(old0);
-
-                readFromStore = true;
-
-                // Detach value before index update.
-                oldVal = cctx.kernalContext().cacheObjects().prepareForCache(oldVal, cctx);
-
-                // Calculate initial TTL and expire time.
-                long initTtl;
-                long initExpireTime;
-
-                if (expiryPlc != null && oldVal != null) {
-                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
-
-                    initTtl = initTtlAndExpireTime.get1();
-                    initExpireTime = initTtlAndExpireTime.get2();
-                }
-                else {
-                    initTtl = CU.TTL_ETERNAL;
-                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
-                }
+            boolean readFromStore = readThrough && needVal && (cctx.readThrough() &&
+                (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()));
+
+            c = new AtomicCacheUpdateClosure(this,
+                newVer,
+                op,
+                writeObj,
+                invokeArgs,
+                readFromStore,
+                writeThrough,
+                keepBinary,
+                expiryPlc,
+                primary,
+                verCheck,
+                filter,
+                explicitTtl,
+                explicitExpireTime,
+                conflictVer,
+                conflictResolve,
+                intercept,
+                updateCntr);
+
+            key.valueBytes(cctx.cacheObjectContext());
 
-                if (oldVal != null)
-                    storeValue(oldVal, initExpireTime, ver, oldRow);
-                // else nothing to do, real old value was null.
-
-                update(oldVal, initExpireTime, initTtl, ver, true);
+            if (isNear()) {
+                CacheDataRow dataRow = val != null ? new CacheDataRowAdapter(key, val, ver, expireTimeExtras()) : null;
 
-                if (deletedUnlocked() && oldVal != null && !isInternal())
-                    deletedUnlocked(false);
+                c.call(dataRow);
             }
+            else
+                cctx.offheap().invoke(key, localPartition(), c);
 
-            Object transformClo = null;
-
-            // Request-level conflict resolution is needed, i.e. we do not know who will win in advance.
-            if (conflictResolve) {
-                GridCacheVersion oldConflictVer = version().conflictVersion();
-
-                // Cache is conflict-enabled.
-                if (cctx.conflictNeedResolve()) {
-                    GridCacheVersionedEntryEx newEntry;
-
-                    GridTuple3<Long, Long, Boolean> expiration = ttlAndExpireTime(expiryPlc,
-                        explicitTtl,
-                        explicitExpireTime);
-
-                    // Prepare old and new entries for conflict resolution.
-                    GridCacheVersionedEntryEx oldEntry = versionedEntry(keepBinary);
-
-                    if (op == GridCacheOperation.TRANSFORM) {
-                        transformClo = writeObj;
+            GridCacheUpdateAtomicResult updateRes = c.updateRes;
 
-                        EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+            assert updateRes != null : c;
 
-                        oldVal = this.val;
+            CacheObject oldVal = c.oldRow != null ? c.oldRow.value() : null;
+            CacheObject updateVal = null;
+            GridCacheVersion updateVer = c.newVer;
 
-                        CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(),
-                            keepBinary, this);
+            // Apply metrics.
+            if (metrics &&
+                updateRes.outcome().updateReadMetrics() &&
+                cctx.cache().configuration().isStatisticsEnabled() &&
+                needVal) {
+                // PutIfAbsent methods must not update hit/miss statistics.
+                if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
+                    cctx.cache().metrics0().onRead(oldVal != null);
+            }
 
-                        try {
-                            Object computed = entryProcessor.process(entry, invokeArgs);
+            switch (updateRes.outcome()) {
+                case VERSION_CHECK_FAILED: {
+                    if (!cctx.isNear()) {
+                        CacheObject evtVal;
 
-                            if (entry.modified())
-                                writeObj = cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue()));
-                            else
-                                writeObj = oldVal;
+                        if (op == GridCacheOperation.TRANSFORM) {
+                            EntryProcessor<Object, Object, ?> entryProcessor =
+                                (EntryProcessor<Object, Object, ?>)writeObj;
 
-                            key0 = entry.key();
+                            CacheInvokeEntry<Object, Object> entry =
+                                new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
 
-                            if (computed != null)
-                                invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
-                        }
-                        catch (Exception e) {
-                            invokeRes = new IgniteBiTuple(null, e);
+                            try {
+                                entryProcessor.process(entry, invokeArgs);
 
-                            writeObj = oldVal;
+                                evtVal = entry.modified() ?
+                                    cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
+                            }
+                            catch (Exception ignore) {
+                                evtVal = prevVal;
+                            }
                         }
-                    }
-
-                    newEntry = new GridCacheLazyPlainVersionedEntry<>(
-                        cctx,
-                        key,
-                        (CacheObject)writeObj,
-                        expiration.get1(),
-                        expiration.get2(),
-                        conflictVer != null ? conflictVer : newVer,
-                        keepBinary);
-
-                    // Resolve conflict.
-                    conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
-
-                    assert conflictCtx != null;
-
-                    boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
-                    // Use old value?
-                    if (conflictCtx.isUseOld()) {
-                        GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+                        else
+                            evtVal = (CacheObject)writeObj;
 
-                        // Handle special case with atomic comparator.
-                        if (!isNew() &&                                                                       // Not initial value,
-                            verCheck &&                                                                       // and atomic version check,
-                            oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&                 // and data centers are equal,
-                            ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
-                            cctx.writeThrough() &&                                                            // and store is enabled,
-                            primary)                                                                          // and we are primary.
-                        {
-                            CacheObject val = this.val;
+                        long updateCntr0 = nextPartCounter();
 
-                            if (val == null) {
-                                assert deletedUnlocked();
+                        if (updateCntr != null)
+                            updateCntr0 = updateCntr;
 
-                                cctx.store().remove(null, key);
-                            }
-                            else
-                                cctx.store().put(null, key, val, ver);
-                        }
+                        onUpdateFinished(updateCntr0);
 
-                        return new GridCacheUpdateAtomicResult(false,
-                            retval ? this.val : null,
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
+                        cctx.continuousQueries().onEntryUpdated(
+                            key,
+                            evtVal,
+                            prevVal,
+                            isInternal() || !context().userCache(),
+                            partition(),
+                            primary,
                             false,
-                            updateCntr0 == null ? 0 : updateCntr0);
+                            updateCntr0,
+                            null,
+                            topVer);
                     }
-                    // Will update something.
-                    else {
-                        // Merge is a local update which override passed value bytes.
-                        if (conflictCtx.isMerge()) {
-                            writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
-
-                            conflictVer = null;
-                        }
-                        else
-                            assert conflictCtx.isUseNew();
 
-                        // Update value is known at this point, so update operation type.
-                        op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
-                    }
+                    return updateRes;
                 }
-                else
-                    // Nullify conflict version on this update, so that we will use regular version during next updates.
-                    conflictVer = null;
-            }
-
-            boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
-
-            // Perform version check only in case there was no explicit conflict resolution.
-            if (conflictCtx == null) {
-                if (verCheck) {
-                    if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
-                        if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
-                            if (log.isDebugEnabled())
-                                log.debug("Received entry update with same version as current (will update store) " +
-                                    "[entry=" + this + ", newVer=" + newVer + ']');
 
-                            CacheObject val = this.val;
+                case CONFLICT_USE_OLD:
+                case FILTER_FAILED:
+                case INVOKE_NO_OP:
+                case INTERCEPTOR_CANCEL:
+                    return updateRes;
+            }
 
-                            if (val == null) {
-                                assert deletedUnlocked();
+            assert updateRes.outcome() == UpdateOutcome.SUCCESS || updateRes.outcome() == UpdateOutcome.REMOVE_NO_VAL;
 
-                                cctx.store().remove(null, key);
-                            }
-                            else
-                                cctx.store().put(null, key, val, ver);
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Received entry update with smaller version than current (will ignore) " +
-                                    "[entry=" + this + ", newVer=" + newVer + ']');
-                        }
+            CacheObject evtOld = null;
 
-                        if (!cctx.isNear()) {
-                            CacheObject evtVal;
+            if (evt && op == TRANSFORM && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
+                assert writeObj instanceof EntryProcessor : writeObj;
 
-                            if (op == GridCacheOperation.TRANSFORM) {
-                                EntryProcessor<Object, Object, ?> entryProcessor =
-                                    (EntryProcessor<Object, Object, ?>)writeObj;
+                evtOld = cctx.unwrapTemporary(oldVal);
 
-                                CacheInvokeEntry<Object, Object> entry =
-                                    new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
+                Object transformClo = EntryProcessorResourceInjectorProxy.unwrap(writeObj);
 
-                                try {
-                                    entryProcessor.process(entry, invokeArgs);
+                cctx.events().addEvent(partition(),
+                    key,
+                    evtNodeId,
+                    null,
+                    newVer,
+                    EVT_CACHE_OBJECT_READ,
+                    evtOld, evtOld != null,
+                    evtOld, evtOld != null,
+                    subjId,
+                    transformClo.getClass().getName(),
+                    taskName,
+                    keepBinary);
+            }
 
-                                    evtVal = entry.modified() ?
-                                        cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal;
-                                }
-                                catch (Exception ignore) {
-                                    evtVal = prevVal;
-                                }
-                            }
-                            else
-                                evtVal = (CacheObject)writeObj;
-
-                            updateCntr0 = nextPartCounter(topVer);
-
-                            if (updateCntr != null)
-                                updateCntr0 = updateCntr;
-
-                            onUpdateFinished(updateCntr0);
-
-                            cctx.continuousQueries().onEntryUpdated(
-                                key,
-                                evtVal,
-                                prevVal,
-                                isInternal() || !context().userCache(),
-                                partition(),
-                                primary,
-                                false,
-                                updateCntr0,
-                                null,
-                                topVer);
-                        }
+            if (c.op == GridCacheOperation.UPDATE) {
+                updateVal = val;
 
-                        return new GridCacheUpdateAtomicResult(false,
-                            retval ? this.val : null,
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
-                            false,
-                            0);
-                    }
-                }
-                else
-                    assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
-                        "Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
-            }
+                assert updateVal != null : c;
 
-            // Apply metrics.
-            if (metrics && cctx.cache().configuration().isStatisticsEnabled() && needVal) {
-                // PutIfAbsent methods mustn't update hit/miss statistics
-                if (op != GridCacheOperation.UPDATE || F.isEmpty(filter) || !cctx.putIfAbsentFilter(filter))
-                    cctx.cache().metrics0().onRead(oldVal != null);
-            }
+                drReplicate(drType, updateVal, updateVer, topVer);
 
-            // Check filter inside of synchronization.
-            if (!F.isEmptyOrNulls(filter)) {
-                boolean pass = cctx.isAllLocked(this, filter);
+                recordNodeId(affNodeId, topVer);
 
-                if (!pass) {
-                    if (expiryPlc != null && !readFromStore && hasValueUnlocked() && !cctx.putIfAbsentFilter(filter))
-                        updateTtl(expiryPlc);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
+                    if (evtOld == null)
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
-                    return new GridCacheUpdateAtomicResult(false,
-                        retval ? oldVal : null,
-                        null,
-                        invokeRes,
-                        CU.TTL_ETERNAL,
-                        CU.EXPIRE_TIME_ETERNAL,
+                    cctx.events().addEvent(partition(),
+                        key,
+                        evtNodeId,
                         null,
+                        newVer,
+                        EVT_CACHE_OBJECT_PUT,
+                        updateVal,
+                        true,
+                        evtOld,
+                        evtOld != null,
+                        subjId,
                         null,
-                        false,
-                        updateCntr0 == null ? 0 : updateCntr0);
+                        taskName,
+                        keepBinary);
                 }
             }
+            else {
+                assert c.op == GridCacheOperation.DELETE : c.op;
 
-            // Calculate new value in case we met transform.
-            if (op == GridCacheOperation.TRANSFORM) {
-                assert conflictCtx == null : "Cannot be TRANSFORM here if conflict resolution was performed earlier.";
-
-                transformClo = writeObj;
-
-                EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
-
-                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), keepBinary, this);
-
-                try {
-                    Object computed = entryProcessor.process(entry, invokeArgs);
-
-                    if (entry.modified()) {
-                        updated0 = cctx.unwrapTemporary(entry.getValue());
-                        updated = cctx.toCacheObject(updated0);
-                    }
-                    else
-                        updated = oldVal;
-
-                    key0 = entry.key();
+                clearReaders();
 
-                    if (computed != null)
-                        invokeRes = new IgniteBiTuple(cctx.unwrapTemporary(computed), null);
-                }
-                catch (Exception e) {
-                    invokeRes = new IgniteBiTuple(null, e);
+                drReplicate(drType, null, newVer, topVer);
 
-                    updated = oldVal;
-                }
+                recordNodeId(affNodeId, topVer);
 
-                if (!entry.modified()) {
-                    if (expiryPlc != null && !readFromStore && hasValueUnlocked())
-                        updateTtl(expiryPlc);
+                if (evt && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
+                    if (evtOld == null)
+                        evtOld = cctx.unwrapTemporary(oldVal);
 
-                    return new GridCacheUpdateAtomicResult(false,
-                        retval ? oldVal : null,
-                        null,
-                        invokeRes,
-                        CU.TTL_ETERNAL,
-                        CU.EXPIRE_TIME_ETERNAL,
-                        null,
+                    cctx.events().addEvent(partition(),
+                        key,
+                        evtNodeId,
+                        null, newVer,
+                        EVT_CACHE_OBJECT_REMOVED,
+                        null, false,
+                        evtOld, evtOld != null,
+                        subjId,
                         null,
-                        false,
-                        updateCntr0 == null ? 0 : updateCntr0);
+                        taskName,
+                        keepBinary);
                 }
             }
-            else
-                updated = (CacheObject)writeObj;
-
-            op = updated == null ? GridCacheOperation.DELETE : GridCacheOperation.UPDATE;
-
-            assert op == GridCacheOperation.UPDATE || (op == GridCacheOperation.DELETE && updated == null);
-
-            boolean hadVal = hasValueUnlocked();
-
-            // Incorporate conflict version into new version if needed.
-            if (conflictVer != null && conflictVer != newVer)
-                newVer = new GridCacheVersionEx(newVer.topologyVersion(),
-                    newVer.globalTime(),
-                    newVer.order(),
-                    newVer.nodeOrder(),
-                    newVer.dataCenterId(),
-                    conflictVer);
 
-            if (op == GridCacheOperation.UPDATE) {
-                // Conflict context is null if there were no explicit conflict resolution.
-                if (conflictCtx == null) {
-                    // Calculate TTL and expire time for local update.
-                    if (explicitTtl != CU.TTL_NOT_CHANGED) {
-                        // If conflict existed, expire time must be explicit.
-                        assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
-
-                        newSysTtl = newTtl = explicitTtl;
-                        newSysExpireTime = explicitExpireTime;
-
-                        newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
-                            explicitExpireTime : CU.toExpireTime(explicitTtl);
-                    }
-                    else {
-                        newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
-                            hadVal ? expiryPlc.forUpdate() : expiryPlc.forCreate();
+            if (updateRes.success())
+                updateMetrics(c.op, metrics);
 
-                        if (newSysTtl == CU.TTL_NOT_CHANGED) {
-                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                            newTtl = ttlExtras();
-                            newExpireTime = expireTimeExtras();
-                        }
-                        else if (newSysTtl == CU.TTL_ZERO) {
-                            op = GridCacheOperation.DELETE;
+            // Continuous query filter should be perform under lock.
+            if (lsnrs != null) {
+                CacheObject evtVal = cctx.unwrapTemporary(updateVal);
+                CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
 
-                            newSysTtl = CU.TTL_NOT_CHANGED;
-                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                cctx.continuousQueries().onEntryUpdated(lsnrs,
+                    key,
+                    evtVal,
+                    evtOldVal,
+                    internal,
+                    partition(),
+                    primary,
+                    false,
+                    c.updateRes.updateCounter(),
+                    fut,
+                    topVer);
+            }
 
-                            newTtl = CU.TTL_ETERNAL;
-                            newExpireTime = CU.EXPIRE_TIME_ETERNAL;
+            cctx.dataStructures().onEntryUpdated(key, c.op == GridCacheOperation.DELETE, keepBinary);
 
-                            updated = null;
-                        }
-                        else {
-                            newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
-                            newTtl = newSysTtl;
-                            newExpireTime = CU.toExpireTime(newTtl);
-                        }
-                    }
+            if (intercept) {
+                if (c.op == GridCacheOperation.UPDATE) {
+                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
+                        cctx,
+                        key,
+                        null,
+                        updateVal,
+                        null,
+                        keepBinary,
+                        c.updateRes.updateCounter()));
                 }
                 else {
-                    newSysTtl = newTtl = conflictCtx.ttl();
-                    newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+                    assert c.op == GridCacheOperation.DELETE : c.op;
+
+                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
+                        cctx,
+                        key,
+                        null,
+                        oldVal,
+                        null,
+                        keepBinary,
+                        c.updateRes.updateCounter()));
                 }
             }
-            else {
-                assert op == GridCacheOperation.DELETE;
-
-                newSysTtl = CU.TTL_NOT_CHANGED;
-                newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+        }
 
-                newTtl = CU.TTL_ETERNAL;
-                newExpireTime = CU.EXPIRE_TIME_ETERNAL;
-            }
+        onUpdateFinished(c.updateRes.updateCounter());
 
-            // TTL and expire time must be resolved at this point.
-            assert newTtl != CU.TTL_NOT_CHANGED && newTtl != CU.TTL_ZERO && newTtl >= 0;
-            assert newExpireTime != CU.EXPIRE_TIME_CALCULATE && newExpireTime >= 0;
+        return c.updateRes;
+    }
 
-            IgniteBiTuple<Boolean, Object> interceptRes = null;
+    /**
+     * @param val Value.
+     * @param cacheObj Cache object.
+     * @param keepBinary Keep binary flag.
+     * @param cpy Copy flag.
+     * @return Cache object value.
+     */
+    @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
+        if (val != null)
+            return val;
 
-            // Actual update.
-            if (op == GridCacheOperation.UPDATE) {
-                if (log.isTraceEnabled()) {
-                    log.trace("innerUpdate [key=" + key +
-                        ", entry=" + System.identityHashCode(this) + ']');
-                }
+        return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
+    }
 
-                if (intercept) {
-                    updated0 = value(updated0, updated, keepBinary, false);
+    /**
+     * @param expiry Expiration policy.
+     * @return Tuple holding initial TTL and expire time with the given expiry.
+     */
+    private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
+        assert expiry != null;
 
-                    Object interceptorVal = cctx.config().getInterceptor()
-                        .onBeforePut(new CacheLazyEntry(cctx, key, key0, oldVal, old0, keepBinary), updated0);
-
-                    if (interceptorVal == null)
-                        return new GridCacheUpdateAtomicResult(false,
-                            retval ? oldVal : null,
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
-                            false,
-                            updateCntr0 == null ? 0 : updateCntr0);
-                    else if (interceptorVal != updated0) {
-                        updated0 = cctx.unwrapTemporary(interceptorVal);
-
-                        updated = cctx.toCacheObject(updated0);
-                    }
-                }
-
-                // Try write-through.
-                if (writeThrough)
-                    // Must persist inside synchronization in non-tx mode.
-                    cctx.store().put(null, key, updated, newVer);
-
-                if (!hadVal) {
-                    boolean new0 = isNew();
-
-                    assert deletedUnlocked() || new0 || isInternal(): "Invalid entry [entry=" + this + ", locNodeId=" +
-                        cctx.localNodeId() + ']';
-
-                    if (!new0 && !isInternal())
-                        deletedUnlocked(false);
-                }
-                else {
-                    assert !deletedUnlocked() : "Invalid entry [entry=" + this +
-                        ", locNodeId=" + cctx.localNodeId() + ']';
-
-                    // Do not change size.
-                }
-
-                updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
-
-                updateCntr0 = nextPartCounter(topVer);
-
-                if (updateCntr != null)
-                    updateCntr0 = updateCntr;
-
-                logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
-
-                storeValue(updated, newExpireTime, newVer, oldRow);
-
-                update(updated, newExpireTime, newTtl, newVer, true);
-
-                drReplicate(drType, updated, newVer, topVer);
-
-                recordNodeId(affNodeId, topVer);
-
-                if (evt) {
-                    CacheObject evtOld = null;
-
-                    if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(oldVal);
-
-                        transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null,
-                            newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
-                            evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
-                            keepBinary);
-                    }
-
-                    if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
-                        if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(oldVal);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null,
-                            newVer, EVT_CACHE_OBJECT_PUT, updated, updated != null, evtOld,
-                            evtOld != null || hadVal, subjId, null, taskName, keepBinary);
-                    }
-                }
-            }
-            else {
-                if (intercept) {
-                    interceptRes = cctx.config().getInterceptor().onBeforeRemove(new CacheLazyEntry(cctx, key, key0,
-                        oldVal, old0, keepBinary, updateCntr0));
-
-                    if (cctx.cancelRemove(interceptRes))
-                        return new GridCacheUpdateAtomicResult(false,
-                            cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
-                            null,
-                            invokeRes,
-                            CU.TTL_ETERNAL,
-                            CU.EXPIRE_TIME_ETERNAL,
-                            null,
-                            null,
-                            false,
-                            updateCntr0 == null ? 0 : updateCntr0);
-                }
-
-                if (writeThrough)
-                    // Must persist inside synchronization in non-tx mode.
-                    cctx.store().remove(null, key);
-
-                updateCntr0 = nextPartCounter(topVer);
-
-                if (updateCntr != null)
-                    updateCntr0 = updateCntr;
-
-                logUpdate(op, null, newVer, 0, updateCntr0);
-
-                removeValue();
-
-                if (hadVal) {
-                    assert !deletedUnlocked();
-
-                    if (!isInternal())
-                        deletedUnlocked(true);
-                }
-                else {
-                    boolean new0 = isNew();
-
-                    assert deletedUnlocked() || new0 || isInternal() : "Invalid entry [entry=" + this + ", locNodeId=" +
-                        cctx.localNodeId() + ']';
-
-                    if (new0) {
-                        if (!isInternal())
-                            deletedUnlocked(true);
-                    }
-                }
-
-                enqueueVer = newVer;
-
-                // Clear value on backup. Entry will be removed from cache when it got evicted from queue.
-                update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
-
-                assert newSysTtl == CU.TTL_NOT_CHANGED;
-                assert newSysExpireTime == CU.EXPIRE_TIME_CALCULATE;
-
-                clearReaders();
-
-                recordNodeId(affNodeId, topVer);
-
-                drReplicate(drType, null, newVer, topVer);
-
-                if (evt) {
-                    CacheObject evtOld = null;
-
-                    if (transformClo != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_READ)) {
-                        evtOld = cctx.unwrapTemporary(oldVal);
-
-                        transformClo = EntryProcessorResourceInjectorProxy.unwrap(transformClo);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null,
-                            newVer, EVT_CACHE_OBJECT_READ, evtOld, evtOld != null || hadVal, evtOld,
-                            evtOld != null || hadVal, subjId, transformClo.getClass().getName(), taskName,
-                            keepBinary);
-                    }
-
-                    if (newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED)) {
-                        if (evtOld == null)
-                            evtOld = cctx.unwrapTemporary(oldVal);
-
-                        cctx.events().addEvent(partition(), key, evtNodeId, null, newVer,
-                            EVT_CACHE_OBJECT_REMOVED, null, false, evtOld, evtOld != null || hadVal,
-                            subjId, null, taskName, keepBinary);
-                    }
-                }
-
-                res = hadVal;
-            }
-
-            if (res)
-                updateMetrics(op, metrics);
-
-            // Continuous query filter should be perform under lock.
-            if (lsnrs != null) {
-                CacheObject evtVal = cctx.unwrapTemporary(updated);
-                CacheObject evtOldVal = cctx.unwrapTemporary(oldVal);
-
-                cctx.continuousQueries().onEntryUpdated(lsnrs, key, evtVal, evtOldVal, internal,
-                    partition(), primary, false, updateCntr0, fut, topVer);
-            }
-
-            cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary);
-
-            if (intercept) {
-                if (op == GridCacheOperation.UPDATE)
-                    cctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(
-                        cctx,
-                        key,
-                        key0,
-                        updated,
-                        updated0,
-                        keepBinary,
-                        updateCntr0));
-                else
-                    cctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(
-                        cctx,
-                        key,
-                        key0,
-                        oldVal,
-                        old0,
-                        keepBinary,
-                        updateCntr0));
-
-                if (interceptRes != null)
-                    oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
-            }
-        }
-
-        onUpdateFinished(updateCntr0);
-
-        if (log.isDebugEnabled())
-            log.debug("Updated cache entry [val=" + val + ", old=" + oldVal + ", entry=" + this + ']');
-
-        return new GridCacheUpdateAtomicResult(res,
-            oldVal,
-            updated,
-            invokeRes,
-            newSysTtl,
-            newSysExpireTime,
-            enqueueVer,
-            conflictCtx,
-            true,
-            updateCntr0 == null ? 0 : updateCntr0);
-    }
-
-    /**
-     * @param val Value.
-     * @param cacheObj Cache object.
-     * @param keepBinary Keep binary flag.
-     * @param cpy Copy flag.
-     * @return Cache object value.
-     */
-    @Nullable private Object value(@Nullable Object val, @Nullable CacheObject cacheObj, boolean keepBinary, boolean cpy) {
-        if (val != null)
-            return val;
-
-        return cctx.unwrapBinaryIfNeeded(cacheObj, keepBinary, cpy);
-    }
-
-    /**
-     * @param expiry Expiration policy.
-     * @return Tuple holding initial TTL and expire time with the given expiry.
-     */
-    private static IgniteBiTuple<Long, Long> initialTtlAndExpireTime(IgniteCacheExpiryPolicy expiry) {
-        assert expiry != null;
-
-        long initTtl = expiry.forCreate();
-        long initExpireTime;
+        long initTtl = expiry.forCreate();
+        long initExpireTime;
 
         if (initTtl == CU.TTL_ZERO) {
             initTtl = CU.TTL_MINIMUM;
@@ -2294,8 +1870,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @param expireTime Explicit expire time.
      * @return Result.
      */
-    private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime)
-        throws GridCacheEntryRemovedException {
+    private GridTuple3<Long, Long, Boolean> ttlAndExpireTime(IgniteCacheExpiryPolicy expiry, long ttl, long expireTime) {
+        assert !obsolete();
+
         boolean rmv = false;
 
         // 1. If TTL is not changed, then calculate it based on expiry.
@@ -2313,7 +1890,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         // 3. If TTL is still not changed, then either use old entry TTL or set it to "ETERNAL".
         if (ttl == CU.TTL_NOT_CHANGED) {
-            if (isNew())
+            if (isStartVersion())
                 ttl = CU.TTL_ETERNAL;
             else {
                 ttl = ttlExtras();
@@ -3027,6 +2604,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
     }
 
     /**
+     * @return Update counter.
+     */
+    protected long nextPartCounter() {
+        return 0;
+    }
+
+    /**
      * @param topVer Topology version.
      * @return Update counter.
      */
@@ -3566,13 +3150,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         assert Thread.holdsLock(this);
         assert val != null : "null values in update for key: " + key;
 
-        cctx.offheap().update(key,
-            val,
-            ver,
-            expireTime,
-            partition(),
-            localPartition(),
-            oldRow);
+        cctx.offheap().invoke(key,  localPartition(), new UpdateClosure(this, val, ver, expireTime));
     }
 
     /**
@@ -4177,6 +3755,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         /**
          * @param key Key.
+         * @param keepBinary Keep binary flag.
          */
         private LazyValueEntry(KeyCacheObject key, boolean keepBinary) {
             this.key = key;
@@ -4223,4 +3802,854 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             return "IteratorEntry [key=" + key + ']';
         }
     }
+
+    /**
+     *
+     */
+    private static class UpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
+        /** */
+        private final GridCacheMapEntry entry;
+
+        /** */
+        private final CacheObject val;
+
+        /** */
+        private final GridCacheVersion ver;
+
+        /** */
+        private final long expireTime;
+
+        /** */
+        private CacheDataRow newRow;
+
+        /** */
+        private CacheDataRow oldRow;
+
+        /** */
+        private IgniteTree.OperationType treeOp = IgniteTree.OperationType.PUT;
+
+        /**
+         * @param entry Entry.
+         * @param val New value.
+         * @param ver New version.
+         * @param expireTime New expire time.
+         */
+        UpdateClosure(GridCacheMapEntry entry, CacheObject val, GridCacheVersion ver, long expireTime) {
+            this.entry = entry;
+            this.val = val;
+            this.ver = ver;
+            this.expireTime = expireTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            this.oldRow = oldRow;
+
+            if (oldRow != null)
+                oldRow.key(entry.key);
+
+            newRow = entry.cctx.offheap().dataStore(entry.localPartition()).createRow(entry.key,
+                val,
+                ver,
+                expireTime,
+                oldRow);
+
+            treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+                IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow newRow() {
+            return newRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return treeOp;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public CacheDataRow oldRow() {
+            return oldRow;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class AtomicCacheUpdateClosure implements IgniteCacheOffheapManager.OffheapInvokeClosure {
+        /** */
+        private final GridCacheMapEntry entry;
+
+        /** */
+        private GridCacheVersion newVer;
+
+        /** */
+        private GridCacheOperation op;
+
+        /** */
+        private Object writeObj;
+
+        /** */
+        private Object[] invokeArgs;
+
+        /** */
+        private final boolean readThrough;
+
+        /** */
+        private final boolean writeThrough;
+
+        /** */
+        private final boolean keepBinary;
+
+        /** */
+        private final IgniteCacheExpiryPolicy expiryPlc;
+
+        /** */
+        private final boolean primary;
+
+        /** */
+        private final boolean verCheck;
+
+        /** */
+        private final CacheEntryPredicate[] filter;
+
+        /** */
+        private final long explicitTtl;
+
+        /** */
+        private final long explicitExpireTime;
+
+        /** */
+        private GridCacheVersion conflictVer;
+
+        /** */
+        private final boolean conflictResolve;
+
+        /** */
+        private final boolean intercept;
+
+        /** */
+        private final Long updateCntr;
+
+        /** */
+        private GridCacheUpdateAtomicResult updateRes;
+
+        /** */
+        private IgniteTree.OperationType treeOp;
+
+        /** */
+        private CacheDataRow newRow;
+
+        /** */
+        private CacheDataRow oldRow;
+
+        AtomicCacheUpdateClosure(GridCacheMapEntry entry,
+            GridCacheVersion newVer,
+            GridCacheOperation op,
+            Object writeObj,
+            Object[] invokeArgs,
+            boolean readThrough,
+            boolean writeThrough,
+            boolean keepBinary,
+            @Nullable IgniteCacheExpiryPolicy expiryPlc,
+            boolean primary,
+            boolean verCheck,
+            @Nullable CacheEntryPredicate[] filter,
+            long explicitTtl,
+            long explicitExpireTime,
+            @Nullable GridCacheVersion conflictVer,
+            boolean conflictResolve,
+            boolean intercept,
+            @Nullable Long updateCntr) {
+            assert op == UPDATE || op == DELETE || op == TRANSFORM : op;
+
+            this.entry = entry;
+            this.newVer = newVer;
+            this.op = op;
+            this.writeObj = writeObj;
+            this.invokeArgs = invokeArgs;
+            this.readThrough = readThrough;
+            this.writeThrough = writeThrough;
+            this.keepBinary = keepBinary;
+            this.expiryPlc = expiryPlc;
+            this.primary = primary;
+            this.verCheck = verCheck;
+            this.filter = filter;
+            this.explicitTtl = explicitTtl;
+            this.explicitExpireTime = explicitExpireTime;
+            this.conflictVer = conflictVer;
+            this.conflictResolve = conflictResolve;
+            this.intercept = intercept;
+            this.updateCntr = updateCntr;
+
+            switch (op) {
+                case UPDATE:
+                    treeOp = IgniteTree.OperationType.PUT;
+
+                    break;
+
+                case DELETE:
+                    treeOp = IgniteTree.OperationType.REMOVE;
+
+                    break;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public CacheDataRow oldRow() {
+            return oldRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public CacheDataRow newRow() {
+            return newRow;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteTree.OperationType operationType() {
+            return treeOp;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void call(@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            assert entry.isNear() || oldRow == null || oldRow.link() != 0 : oldRow;
+
+            if (oldRow != null)
+                oldRow.key(entry.key());
+
+            this.oldRow = oldRow;
+
+            GridCacheContext cctx = entry.context();
+
+            CacheObject oldVal;
+            CacheObject storeLoadedVal = null;
+
+            if (oldRow != null) {
+                oldVal = oldRow.value();
+
+                entry.update(oldVal, oldRow.expireTime(), 0, oldRow.version(), false);
+            }
+            else
+                oldVal = null;
+
+            if (oldVal == null && readThrough) {
+                storeLoadedVal = cctx.toCacheObject(cctx.store().load(null, entry.key));
+
+                if (storeLoadedVal != null) {
+                    oldVal = cctx.kernalContext().cacheObjects().prepareForCache(storeLoadedVal, cctx);
+
+                    entry.val = oldVal;
+
+                    if (entry.deletedUnlocked())
+                        entry.deletedUnlocked(false);
+                }
+            }
+
+            CacheInvokeEntry<Object, Object> invokeEntry = null;
+            IgniteBiTuple<Object, Exception> invokeRes = null;
+
+            boolean invoke = op == TRANSFORM;
+
+            if (invoke) {
+                invokeEntry = new CacheInvokeEntry<>(entry.key, oldVal, entry.ver, keepBinary, entry);
+
+                invokeRes = runEntryProcessor(invokeEntry);
+
+                op = writeObj == null ? DELETE : UPDATE;
+            }
+
+            CacheObject newVal = (CacheObject)writeObj;
+
+            GridCacheVersionConflictContext<?, ?> conflictCtx = null;
+
+            if (conflictResolve) {
+                conflictCtx = resolveConflict(newVal, invokeRes);
+
+                if (updateRes != null) {
+                    assert conflictCtx != null && conflictCtx.isUseOld() : conflictCtx;
+                    assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+                    return;
+                }
+            }
+
+            if (conflictCtx == null) {
+                // Perform version check only in case there was no explicit conflict resolution.
+                versionCheck(invokeRes);
+
+                if (updateRes != null) {
+                    assert treeOp == IgniteTree.OperationType.NOOP : treeOp;
+
+                    return;
+                }
+            }
+
+            if (!F.isEmptyOrNulls(filter)) {
+                boolean pass = cctx.isAllLocked(entry, filter);
+
+                if (!pass) {
+                    initResultOnCancelUpdate(storeLoadedVal, !cctx.putIfAbsentFilter(filter));
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.FILTER_FAILED,
+                        oldVal,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+            }
+
+            if (invoke) {
+                if (!invokeEntry.modified()) {
+                    initResultOnCancelUpdate(storeLoadedVal, true);
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INVOKE_NO_OP,
+                        oldVal,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+
+                op = writeObj == null ? DELETE : UPDATE;
+            }
+
+            // Incorporate conflict version into new version if needed.
+            if (conflictVer != null && conflictVer != newVer) {
+                newVer = new GridCacheVersionEx(newVer.topologyVersion(),
+                    newVer.globalTime(),
+                    newVer.order(),
+                    newVer.nodeOrder(),
+                    newVer.dataCenterId(),
+                    conflictVer);
+            }
+
+            if (op == UPDATE) {
+                assert writeObj != null;
+
+                update(conflictCtx, invokeRes, storeLoadedVal != null);
+            }
+            else {
+                assert op == DELETE && writeObj == null : op;
+
+                remove(conflictCtx, invokeRes, storeLoadedVal != null);
+            }
+
+            assert updateRes != null && treeOp != null;
+        }
+
+        /**
+         * @param storeLoadedVal Value loaded from store.
+         * @param updateExpireTime {@code True} if need update expire time.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void initResultOnCancelUpdate(@Nullable CacheObject storeLoadedVal, boolean updateExpireTime)
+            throws IgniteCheckedException {
+            boolean needUpdate = false;
+
+            if (storeLoadedVal != null) {
+                long initTtl;
+                long initExpireTime;
+
+                if (expiryPlc != null) {
+                    IgniteBiTuple<Long, Long> initTtlAndExpireTime = initialTtlAndExpireTime(expiryPlc);
+
+                    initTtl = initTtlAndExpireTime.get1();
+                    initExpireTime = initTtlAndExpireTime.get2();
+                }
+                else {
+                    initTtl = CU.TTL_ETERNAL;
+                    initExpireTime = CU.EXPIRE_TIME_ETERNAL;
+                }
+
+                entry.update(storeLoadedVal, initExpireTime, initTtl, entry.ver, true);
+
+                needUpdate = true;
+            }
+            else if (updateExpireTime && expiryPlc != null && entry.val != null){
+                long ttl = expiryPlc.forAccess();
+
+                if (ttl != CU.TTL_NOT_CHANGED) {
+                    long expireTime;
+
+                    if (ttl == CU.TTL_ZERO) {
+                        ttl = CU.TTL_MINIMUM;
+                        expireTime = CU.expireTimeInPast();
+                    }
+                    else
+                        expireTime = CU.toExpireTime(ttl);
+
+                    if (entry.expireTimeExtras() != expireTime) {
+                        entry.update(entry.val, expireTime, ttl, entry.ver, true);
+
+                        expiryPlc.ttlUpdated(entry.key, entry.ver, null);
+
+                        needUpdate = true;
+                    }
+                }
+            }
+
+            if (needUpdate) {
+                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                    storeLoadedVal,
+                    newVer,
+                    entry.expireTimeExtras(),
+                    oldRow);
+
+                treeOp = IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = IgniteTree.OperationType.NOOP;
+        }
+
+        /**
+         * @param conflictCtx Conflict context.
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
+         * @throws IgniteCheckedException If failed.
+         */
+        private void update(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+            boolean readFromStore)
+            throws IgniteCheckedException
+        {
+            GridCacheContext cctx = entry.context();
+
+            final CacheObject oldVal = entry.val;
+            CacheObject updated = (CacheObject)writeObj;
+
+            long newSysTtl;
+            long newSysExpireTime;
+
+            long newTtl;
+            long newExpireTime;
+
+            // Conflict context is null if there were no explicit conflict resolution.
+            if (conflictCtx == null) {
+                // Calculate TTL and expire time for local update.
+                if (explicitTtl != CU.TTL_NOT_CHANGED) {
+                    // If conflict existed, expire time must be explicit.
+                    assert conflictVer == null || explicitExpireTime != CU.EXPIRE_TIME_CALCULATE;
+
+                    newSysTtl = newTtl = explicitTtl;
+                    newSysExpireTime = explicitExpireTime;
+
+                    newExpireTime = explicitExpireTime != CU.EXPIRE_TIME_CALCULATE ?
+                        explicitExpireTime : CU.toExpireTime(explicitTtl);
+                }
+                else {
+                    newSysTtl = expiryPlc == null ? CU.TTL_NOT_CHANGED :
+                        entry.val != null ? expiryPlc.forUpdate() : expiryPlc.forCreate();
+
+                    if (newSysTtl == CU.TTL_NOT_CHANGED) {
+                        newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                        newTtl = entry.ttlExtras();
+                        newExpireTime = entry.expireTimeExtras();
+                    }
+                    else if (newSysTtl == CU.TTL_ZERO) {
+                        op = GridCacheOperation.DELETE;
+
+                        writeObj = null;
+
+                        remove(conflictCtx, invokeRes, readFromStore);
+
+                        return;
+                    }
+                    else {
+                        newSysExpireTime = CU.EXPIRE_TIME_CALCULATE;
+                        newTtl = newSysTtl;
+                        newExpireTime = CU.toExpireTime(newTtl);
+                    }
+                }
+            }
+            else {
+                newSysTtl = newTtl = conflictCtx.ttl();
+                newSysExpireTime = newExpireTime = conflictCtx.expireTime();
+            }
+
+            if (intercept) {
+                Object updated0 = cctx.unwrapBinaryIfNeeded(updated, keepBinary, false);
+
+                CacheLazyEntry<Object, Object> interceptEntry = new CacheLazyEntry<>(cctx,
+                    entry.key,
+                    null,
+                    oldVal,
+                    null,
+                    keepBinary);
+
+                Object interceptorVal = cctx.config().getInterceptor().onBeforePut(interceptEntry, updated0);
+
+                if (interceptorVal == null) {
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+                        oldVal,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+                else if (interceptorVal != updated0) {
+                    updated0 = cctx.unwrapTemporary(interceptorVal);
+
+                    updated = cctx.toCacheObject(updated0);
+                }
+            }
+
+            updated = cctx.kernalContext().cacheObjects().prepareForCache(updated, cctx);
+
+            if (writeThrough)
+                // Must persist inside synchronization in non-tx mode.
+                cctx.store().put(null, entry.key, updated, newVer);
+
+            if (entry.val == null) {
+                boolean new0 = entry.isStartVersion();
+
+                assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry +
+                    ", locNodeId=" + cctx.localNodeId() + ']';
+
+                if (!new0 && !entry.isInternal())
+                    entry.deletedUnlocked(false);
+            }
+            else {
+                assert !entry.deletedUnlocked() : "Invalid entry [entry=" + this +
+                    ", locNodeId=" + cctx.localNodeId() + ']';
+            }
+
+            long updateCntr0 = entry.nextPartCounter();
+
+            if (updateCntr != null)
+                updateCntr0 = updateCntr;
+
+            entry.logUpdate(op, updated, newVer, newExpireTime, updateCntr0);
+
+            if (!entry.isNear()) {
+                newRow = entry.localPartition().dataStore().createRow(entry.key,
+                    updated,
+                    newVer,
+                    newExpireTime,
+                    oldRow);
+
+                treeOp = oldRow != null && oldRow.link() == newRow.link() ?
+                    IgniteTree.OperationType.NOOP : IgniteTree.OperationType.PUT;
+            }
+            else
+                treeOp = IgniteTree.OperationType.PUT;
+
+            entry.update(updated, newExpireTime, newTtl, newVer, true);
+
+            updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.SUCCESS,
+                oldVal,
+                updated,
+                invokeRes,
+                newSysTtl,
+                newSysExpireTime,
+                null,
+                conflictCtx,
+                updateCntr0);
+        }
+
+        /**
+         * @param conflictCtx Conflict context.
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @param readFromStore {@code True} if initial entry value was {@code null} and it was read from store.
+         * @throws IgniteCheckedException If failed.
+         */
+        @SuppressWarnings("unchecked")
+        private void remove(@Nullable GridCacheVersionConflictContext<?, ?> conflictCtx,
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes,
+            boolean readFromStore)
+            throws IgniteCheckedException
+        {
+            GridCacheContext cctx = entry.context();
+
+            CacheObject oldVal = entry.val;
+
+            IgniteBiTuple<Boolean, Object> interceptRes = null;
+
+            if (intercept) {
+                CacheLazyEntry<Object, Object> intercepEntry = new CacheLazyEntry<>(cctx,
+                    entry.key,
+                    null,
+                    oldVal,
+                    null,
+                    keepBinary);
+
+                interceptRes = cctx.config().getInterceptor().onBeforeRemove(intercepEntry);
+
+                if (cctx.cancelRemove(interceptRes)) {
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.INTERCEPTOR_CANCEL,
+                        cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2())),
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+
+                    return;
+                }
+            }
+
+            if (writeThrough)
+                // Must persist inside synchronization in non-tx mode.
+                cctx.store().remove(null, entry.key);
+
+            long updateCntr0 = entry.nextPartCounter();
+
+            if (updateCntr != null)
+                updateCntr0 = updateCntr;
+
+            if (oldVal != null) {
+                assert !entry.deletedUnlocked();
+
+                if (!entry.isInternal())
+                    entry.deletedUnlocked(true);
+            }
+            else {
+                boolean new0 = entry.isStartVersion();
+
+                assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + this +
+                    ", locNodeId=" + cctx.localNodeId() + ']';
+
+                if (new0) {
+                    if (!entry.isInternal())
+                        entry.deletedUnlocked(true);
+                }
+            }
+
+            GridCacheVersion enqueueVer = newVer;
+
+            entry.update(null, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, newVer, true);
+
+            treeOp = (oldVal == null || readFromStore) ? IgniteTree.OperationType.NOOP :
+                IgniteTree.OperationType.REMOVE;
+
+            UpdateOutcome outcome = oldVal != null ? UpdateOutcome.SUCCESS : UpdateOutcome.REMOVE_NO_VAL;
+
+            if (interceptRes != null)
+                oldVal = cctx.toCacheObject(cctx.unwrapTemporary(interceptRes.get2()));
+
+            updateRes = new GridCacheUpdateAtomicResult(outcome,
+                oldVal,
+                null,
+                invokeRes,
+                CU.TTL_NOT_CHANGED,
+                CU.EXPIRE_TIME_CALCULATE,
+                enqueueVer,
+                conflictCtx,
+                updateCntr0);
+        }
+
+        /**
+         * @param newVal New entry value.
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @return Conflict context.
+         * @throws IgniteCheckedException If failed.
+         */
+        private GridCacheVersionConflictContext<?, ?> resolveConflict(
+            CacheObject newVal,
+            @Nullable IgniteBiTuple<Object, Exception> invokeRes)
+            throws IgniteCheckedException
+        {
+            GridCacheContext cctx = entry.context();
+
+            // Cache is conflict-enabled.
+            if (cctx.conflictNeedResolve()) {
+                GridCacheVersion oldConflictVer = entry.ver.conflictVersion();
+
+                // Prepare old and new entries for conflict resolution.
+                GridCacheVersionedEntryEx oldEntry = new GridCacheLazyPlainVersionedEntry<>(cctx,
+                    entry.key,
+                    entry.val,
+                    entry.ttlExtras(),
+                    entry.expireTimeExtras(),
+                    entry.ver.conflictVersion(),
+                    entry.isStartVersion(),
+                    keepBinary);
+
+                GridTuple3<Long, Long, Boolean> expiration = entry.ttlAndExpireTime(expiryPlc,
+                    explicitTtl,
+                    explicitExpireTime);
+
+                GridCacheVersionedEntryEx newEntry = new GridCacheLazyPlainVersionedEntry<>(
+                    cctx,
+                    entry.key,
+                    newVal,
+                    expiration.get1(),
+                    expiration.get2(),
+                    conflictVer != null ? conflictVer : newVer,
+                    keepBinary);
+
+                // Resolve conflict.
+                GridCacheVersionConflictContext<?, ?> conflictCtx = cctx.conflictResolve(oldEntry, newEntry, verCheck);
+
+                assert conflictCtx != null;
+
+                boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+                // Use old value?
+                if (conflictCtx.isUseOld()) {
+                    GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
+
+                    // Handle special case with atomic comparator.
+                    if (!entry.isStartVersion() &&                                                        // Not initial value,
+                        verCheck &&                                                                       // and atomic version check,
+                        oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() &&                 // and data centers are equal,
+                        ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
+                        cctx.writeThrough() &&                                                            // and store is enabled,
+                        primary)                                                                          // and we are primary.
+                    {
+                        CacheObject val = entry.val;
+
+                        if (val == null) {
+                            assert entry.deletedUnlocked();
+
+                            cctx.store().remove(null, entry.key);
+                        }
+                        else
+                            cctx.store().put(null, entry.key, val, entry.ver);
+                    }
+
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.CONFLICT_USE_OLD,
+                        entry.val,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+                }
+                // Will update something.
+                else {
+                    // Merge is a local update which override passed value bytes.
+                    if (conflictCtx.isMerge()) {
+                        writeObj = cctx.toCacheObject(conflictCtx.mergeValue());
+
+                        conflictVer = null;
+                    }
+                    else
+                        assert conflictCtx.isUseNew();
+
+                    // Update value is known at this point, so update operation type.
+                    op = writeObj != null ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE;
+                }
+
+                return conflictCtx;
+            }
+            else
+                // Nullify conflict version on this update, so that we will use regular version during next updates.
+                conflictVer = null;
+
+            return null;
+        }
+
+        /**
+         * @param invokeRes Entry processor result (for invoke operation).
+         * @throws IgniteCheckedException If failed.
+         */
+        private void versionCheck(@Nullable IgniteBiTuple<Object, Exception> invokeRes) throws IgniteCheckedException {
+            GridCacheContext cctx = entry.context();
+
+            boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
+            if (verCheck) {
+                if (!entry.isStartVersion() && ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) >= 0) {
+                    if (ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
+                        if (log.isDebugEnabled())
+                            log.debug("Received entry update with same version as current (will update store) " +
+                                "[entry=" + this + ", newVer=" + newVer + ']');
+
+                        CacheObject val = entry.val;
+
+                        if (val == null) {
+                            assert entry.deletedUnlocked();
+
+                            cctx.store().remove(null, entry.key);
+                        }
+                        else
+                            cctx.store().put(null, entry.key, val, entry.ver);
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Received entry update with smaller version than current (will ignore) " +
+                                "[entry=" + this + ", newVer=" + newVer + ']');
+                    }
+
+                    treeOp = IgniteTree.OperationType.NOOP;
+
+                    updateRes = new GridCacheUpdateAtomicResult(UpdateOutcome.VERSION_CHECK_FAILED,
+                        entry.val,
+                        null,
+                        invokeRes,
+                        CU.TTL_ETERNAL,
+                        CU.EXPIRE_TIME_ETERNAL,
+                        null,
+                        null,
+                        0);
+                }
+            }
+            else
+                assert entry.isStartVersion() || ATOMIC_VER_COMPARATOR.compare(entry.ver, newVer, ignoreTime) <= 0 :
+                    "Invalid version for inner update [isNew=" + entry.isStartVersion() + ", entry=" + this + ", newVer=" + newVer + ']';
+        }
+
+        /**
+         * @param invokeEntry Entry for {@link EntryProcessor}.
+         * @return Entry processor return value.
+         */
+        @SuppressWarnings("unchecked")
+        private IgniteBiTuple<Object, Exception> runEntryProcessor(CacheInvokeEntry<Object, Object> invokeEntry) {
+            EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
+
+            try {
+                Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+
+                if (invokeEntry.modified()) {
+                    GridCacheContext cctx = entry.context();
+
+                    writeObj = cctx.toCacheObject(cctx.unwrapTemporary(invokeEntry.getValue()));
+                }
+                else
+                    writeObj = invokeEntry.valObj;
+
+                if (computed != null)
+                    return new IgniteBiTuple<>(entry.cctx.unwrapTemporary(computed), null);
+
+                return null;
+            }
+            catch (Exception e) {
+                writeObj = invokeEntry.valObj;
+
+                return new IgniteBiTuple<>(null, e);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(AtomicCacheUpdateClosure.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ee28b9cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 2355b7c..97cb534 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -29,8 +29,8 @@ import org.jetbrains.annotations.Nullable;
  * Cache entry atomic update result.
  */
 public class GridCacheUpdateAtomicResult {
-    /** Success flag.*/
-    private final boolean success;
+    /** Update operation outcome. */
+    private final UpdateOutcome outcome;
 
     /** Old value. */
     @GridToStringInclude
@@ -54,9 +54,6 @@ public class GridCacheUpdateAtomicResult {
     @GridToStringInclude
     private final GridCacheVersionConflictContext<?, ?> conflictRes;
 
-    /** Whether update should be propagated to DHT node. */
-    private final boolean sndToDht;
-
     /** */
     private final long updateCntr;
 
@@ -66,7 +63,7 @@ public class GridCacheUpdateAtomicResult {
     /**
      * Constructor.
      *
-     * @param success Success flag.
+     * @param outcome Update outcome.
      * @param oldVal Old value.
      * @param newVal New value.
      * @param res Value computed by the {@link EntryProcessor}.
@@ -74,10 +71,9 @@ public class GridCacheUpdateAtomicResult {
      * @param conflictExpireTime Explicit DR expire time (if any).
      * @param rmvVer Version for deferred delete.
      * @param conflictRes DR resolution result.
-     * @param sndToDht Whether update should be propagated to DHT node.
      * @param updateCntr Partition update counter.
      */
-    public GridCacheUpdateAtomicResult(boolean success,
+    GridCacheUpdateAtomicResult(UpdateOutcome outcome,
         @Nullable CacheObject oldVal,
         @Nullable CacheObject newVal,
         @Nullable IgniteBiTuple<Object, Exception> res,
@@ -85,9 +81,10 @@ public class GridCacheUpdateAtomicResult {
         long conflictExpireTime,
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
-        boolean sndToDht,
         long updateCntr) {
-        this.success = success;
+        assert outcome != null;
+
+        this.outcome = outcome;
         this.oldVal = oldVal;
         this.newVal = newVal;
         this.res = res;
@@ -95,11 +92,17 @@ public class GridCacheUpdateAtomicResult {
         this.conflictExpireTime = conflictExpireTime;
         this.rmvVer = rmvVer;
         this.conflictRes = conflictRes;
-        this.sndToDht = sndToDht;
         this.updateCntr = updateCntr;
     }
 
     /**
+     * @return Update operation outcome.
+     */
+    UpdateOutcome outcome() {
+        return outcome;
+    }
+
+    /**
      * @return Value computed by the {@link EntryProcessor}.
      */
     @Nullable public IgniteBiTuple<Object, Exception> computedResult() {
@@ -110,7 +113,7 @@ public class GridCacheUpdateAtomicResult {
      * @return Success flag.
      */
     public boolean success() {
-        return success;
+        return outcome.success();
     }
 
     /**
@@ -167,7 +170,74 @@ public class GridCacheUpdateAtomicResult {
      * @return Whether update should be propagated to DHT node.
      */
     public boolean sendToDht() {
-        return sndToDht;
+        return outcome.sendToDht();
+    }
+
+    /**
+     *
+     */
+    public enum UpdateOutcome {
+        /** */
+        CONFLICT_USE_OLD(false, false, false),
+
+        /** */
+        VERSION_CHECK_FAILED(false, false, false),
+
+        /** */
+        FILTER_FAILED(false, false, true),
+
+        /** */
+        INVOKE_NO_OP(false, false, true),
+
+        /** */
+        INTERCEPTOR_CANCEL(false, false, true),
+
+        /** */
+        REMOVE_NO_VAL(false, true, true),
+
+        /** */
+        SUCCESS(true, true, true);
+
+        /** */
+        private final boolean success;
+
+        /** */
+        private final boolean sndToDht;
+
+        /** */
+        private final boolean updateReadMetrics;
+
+        /**
+         * @param success Success flag.
+         * @param sndToDht Whether update should be propagated to DHT node.
+         * @param updateReadMetrics Metrics update flag.
+         */
+        UpdateOutcome(boolean success, boolean sndToDht, boolean updateReadMetrics) {
+            this.success = success;
+            this.sndToDht = sndToDht;
+            this.updateReadMetrics = updateReadMetrics;
+        }
+
+        /**
+         * @return Success flag.
+         */
+        public boolean success() {
+            return success;
+        }
+
+        /**
+         * @return Whether update should be propagated to DHT node.
+         */
+        public boolean sendToDht() {
+            return sndToDht;
+        }
+
+        /**
+         * @return Metrics update flag.
+         */
+        public boolean updateReadMetrics() {
+            return updateReadMetrics;
+        }
     }
 
     /** {@inheritDoc} */