You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2019/03/01 09:46:29 UTC

[ignite] 01/02: invokeAll

This is an automated email from the ASF dual-hosted git repository.

sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 2eddbf817fcc7360e58857a81ff26968e1ee783e
Author: sboikov <sb...@apache.org>
AuthorDate: Fri Mar 1 12:30:05 2019 +0300

    invokeAll
---
 .../cache/IgniteCacheOffheapManager.java           |   5 +-
 .../cache/IgniteCacheOffheapManagerImpl.java       |  31 ++-
 .../distributed/dht/atomic/GridDhtAtomicCache.java | 288 +++++++++++++++------
 .../atomic/GridNearAtomicAbstractUpdateFuture.java |   2 +
 .../GridNearAtomicAbstractUpdateRequest.java       |   5 +
 .../atomic/GridNearAtomicFullUpdateRequest.java    |  63 +++++
 .../dht/atomic/GridNearAtomicUpdateFuture.java     |   2 +
 .../cache/persistence/GridCacheOffheapManager.java |   3 +-
 .../processors/cache/tree/CacheDataTree.java       |   2 +-
 9 files changed, 310 insertions(+), 91 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9455c2a..d187444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPa
 import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.SearchRowEx;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -152,6 +153,8 @@ public interface IgniteCacheOffheapManager {
      */
     public CacheDataStore dataStore(GridDhtLocalPartition part);
 
+    public Comparator<KeyCacheObject> updateKeysComparator();
+
     /**
      * @param store Data store.
      * @throws IgniteCheckedException If failed.
@@ -900,7 +903,7 @@ public interface IgniteCacheOffheapManager {
          * @return Cache search row.
          * @throws IgniteCheckedException If failed.
          */
-        public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException;
+        public SearchRowEx createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException;
 
         /**
          * @return Rows comparator.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 0a621dc..a69157e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -188,6 +188,28 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
     /** */
     protected GridStripedLock partStoreLock = new GridStripedLock(Runtime.getRuntime().availableProcessors());
 
+    /** */
+    private final Comparator<KeyCacheObject> updateKeysCmp = new Comparator<KeyCacheObject>() {
+        @Override public int compare(KeyCacheObject key1, KeyCacheObject key2) {
+            try {
+                int cmp = Integer.compare(key1.partition(), key2.partition());
+
+                if (cmp != 0)
+                    return cmp;
+
+                cmp = Integer.compare(key1.hashCode(), key2.hashCode());
+
+                if (cmp != 0)
+                    return cmp;
+
+                return CacheDataTree.compareKeyBytes(key1.valueBytes(grp.cacheObjectContext()), key2.valueBytes(grp.cacheObjectContext()));
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        };
+    };
+
     /** {@inheritDoc} */
     @Override public GridAtomicLong globalRemoveId() {
         return globalRmvId;
@@ -405,6 +427,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         return store == null ? 0 : store.cacheSize(cacheId);
     }
 
+    /** {@inheritDoc} */
+    @Override public Comparator<KeyCacheObject> updateKeysComparator() {
+        return updateKeysCmp;
+    }
+
     /**
      * @param primary Primary data flag.
      * @param backup Primary data flag.
@@ -1652,10 +1679,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
-        @Override public SearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) {
+        @Override public SearchRowEx createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) {
             int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
 
-            return data != null ? new SearchRowEx<>(cacheId, key, data) : new SearchRow(cacheId, key);
+            return new SearchRowEx<>(cacheId, key, data);
         }
 
         /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5f58e71..769fbb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2543,6 +2543,49 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
         }
     }
 
+    private AtomicCacheUpdateClosure primaryUpdateClosure(
+        GridNearAtomicAbstractUpdateRequest req,
+        int i,
+        List<GridDhtCacheEntry> entries,
+        GridCacheVersion ver,
+        boolean needVal,
+        @Nullable IgniteCacheExpiryPolicy expiry
+    ) {
+        GridCacheOperation op = req.operation();
+
+        AffinityTopologyVersion topVer = req.topologyVersion();
+
+        boolean intercept = ctx.config().getInterceptor() != null;
+
+        // Possibly read value from store.
+        boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() &&
+            (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue()));
+
+        return new AtomicCacheUpdateClosure(
+                i,
+                entries.get(i),
+                topVer,
+                ver,
+                req.operation(),
+                op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i),
+                req.invokeArguments(),
+                readFromStore,
+                writeThrough() && !req.skipStore(),
+                req.keepBinary(),
+                expiry,
+                /*primary*/true,
+                /*verCheck*/false,
+                req.filter(),
+                req.conflictTtl(i),
+                req.conflictExpireTime(i),
+                req.conflictVersion(i),
+                /*conflictResolve*/true,
+                intercept,
+                null,
+                ctx.disableTriggeringCacheInterceptorOnConflict()
+        );
+    }
+
     /**
      * Updates locked entries one-by-one.
      *
@@ -2584,21 +2627,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
 
-        boolean batchUpdate = req.size() > 1;
-
         Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
 
         boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM ||
             !F.isEmptyOrNulls(req.filter());
 
-        Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> byPart =
-            batchUpdate ? new HashMap<>() : null;
+        boolean batchUpdate = req.size() > 1;
 
         BitSet retryEntries = dhtUpdRes.retryEntries();
 
         if (retryEntries != null)
             dhtUpdRes.retryEntries(null);
 
+        int curPart = -1;
+
+        int batchStart = 0;
+        int batchSize = 0;
+
         for (int i = 0; i < req.size(); i++) {
             GridDhtCacheEntry entry = locked.get(i);
 
@@ -2606,111 +2651,182 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 if (retryEntries != null && !retryEntries.get(i))
                     continue;
 
-                // Possibly read value from store.
-                boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() &&
-                    (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue()));
-
-                AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure(
-                    i,
-                    entry,
-                    topVer,
-                    ver,
-                    req.operation(),
-                    op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i),
-                    req.invokeArguments(),
-                    readFromStore,
-                    writeThrough() && !req.skipStore(),
-                    req.keepBinary(),
-                    expiry,
-                    /*primary*/true,
-                    /*verCheck*/false,
-                    req.filter(),
-                    req.conflictTtl(i),
-                    req.conflictExpireTime(i),
-                    req.conflictVersion(i),
-                    /*conflictResolve*/true,
-                    intercept,
-                    null,
-                    ctx.disableTriggeringCacheInterceptorOnConflict()
-                );
+                if (batchUpdate) {
+                    KeyCacheObject key = req.key(i);
+
+                    int part = key.partition();
+
+                    assert part >= 0 : key;
+
+                    if (curPart >=0 && curPart != part) {
+                        updateBatch(
+                            batchStart,
+                            batchSize,
+                            nearNode,
+                            hasNear,
+                            req,
+                            res,
+                            locked,
+                            ver,
+                            needVal,
+                            drType,
+                            taskName,
+                            expiry,
+                            dhtUpdRes,
+                            affAssignment,
+                            sndPrevVal);
 
-                entry.key().valueBytes(ctx.cacheObjectContext());
+                        batchStart = i;
+                        batchSize = 1;
+                    }
+                    else
+                        batchSize++;
 
-                if (batchUpdate) {
-                    GridDhtLocalPartition part = entry.localPartition();
+                    curPart = part;
 
-                    TreeMap<CacheSearchRow, AtomicCacheUpdateClosure> map = byPart.get(part);
+                    continue;
+                }
 
-                    IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part);
+                AtomicCacheUpdateClosure c = primaryUpdateClosure(req, i, locked, ver, needVal, expiry);
 
-                    if (map == null)
-                        byPart.put(part, map = new TreeMap<>(dataStore.rowsComparator()));
+                entry.key().valueBytes(ctx.cacheObjectContext());
 
-                    map.put(dataStore.createSearchRow(ctx, entry.key(), null), c);
-                }
-                else {
-                    updateSingleEntry(
-                        c,
-                        true,
-                        nearNode,
-                        hasNear,
-                        taskName,
-                        drType,
-                        req,
-                        res,
-                        dhtUpdRes,
-                        affAssignment,
-                        sndPrevVal);
-                }
+                updateSingleEntry(
+                    c,
+                    true,
+                    nearNode,
+                    hasNear,
+                    taskName,
+                    drType,
+                    req,
+                    res,
+                    dhtUpdRes,
+                    affAssignment,
+                    sndPrevVal);
             }
             catch (IgniteCheckedException e) {
                 res.addFailedKey(entry.key(), e);
             }
         }
 
-        if (!batchUpdate)
+        if (batchSize > 0) {
+            updateBatch(
+                batchStart,
+                batchSize,
+                nearNode,
+                hasNear,
+                req,
+                res,
+                locked,
+                ver,
+                needVal,
+                drType,
+                taskName,
+                expiry,
+                dhtUpdRes,
+                affAssignment,
+                sndPrevVal);
+        }
+    }
+
+    private void updateBatch(
+        int batchStart,
+        int batchSize,
+        ClusterNode nearNode,
+        boolean hasNear,
+        GridNearAtomicAbstractUpdateRequest req,
+        GridNearAtomicUpdateResponse res,
+        List<GridDhtCacheEntry> locked,
+        GridCacheVersion ver,
+        boolean needVal,
+        GridDrType drType,
+        String taskName,
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        DhtAtomicUpdateResult dhtUpdRes,
+        AffinityAssignment affAssignment,
+        boolean sndPrevVal
+    ) throws GridCacheEntryRemovedException {
+        if (batchSize == 1) {
+            try {
+                updateSingleEntry(
+                    primaryUpdateClosure(req, batchStart, locked, ver, needVal, expiry),
+                    true,
+                    nearNode,
+                    hasNear,
+                    taskName,
+                    drType,
+                    req,
+                    res,
+                    dhtUpdRes,
+                    affAssignment,
+                    sndPrevVal);
+            }
+            catch (IgniteCheckedException e) {
+                res.addFailedKey(req.key(batchStart), e);
+            }
+
             return;
+        }
 
         RuntimeException err = null;
 
-        for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> e0 : byPart.entrySet()) {
-            try {
-                Map<CacheSearchRow, AtomicCacheUpdateClosure> map = e0.getValue();
+        try {
+            List<SearchRowEx<AtomicCacheUpdateClosure>> rows = new ArrayList<>(batchSize);
 
-                try {
-                    ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map::get);
-                }
-                catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
-                    err = e;
-                }
+            GridDhtLocalPartition part = locked.get(batchStart).localPartition();
 
-                for (Map.Entry<CacheSearchRow, AtomicCacheUpdateClosure> e : map.entrySet()) {
-                    AtomicCacheUpdateClosure c = e.getValue();
+            IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part);
 
-                    if (c.operationType() == null) {
-                        dhtUpdRes.addRetryEntry(c.reqIdx);
+            SearchRowEx<AtomicCacheUpdateClosure> prev = null;
 
-                        continue;
-                    }
+            for (int i = 0; i < batchSize; i++) {
+                int idx = batchStart + i;
 
-                    updateSingleEntry(
-                        c,
-                        false,
-                        nearNode,
-                        hasNear,
-                        taskName,
-                        drType,
-                        req,
-                        res,
-                        dhtUpdRes,
-                        affAssignment,
-                        sndPrevVal);
-                }
+                AtomicCacheUpdateClosure c = primaryUpdateClosure(req, idx, locked, ver, needVal, expiry);
+
+                SearchRowEx<AtomicCacheUpdateClosure> row = dataStore.createSearchRow(ctx, c.entry().key(), c);
+
+                rows.add(row);
+
+                // Expect keys in request are already sorted.
+                assert prev == null || part.dataStore().rowsComparator().compare(row, prev) >= 0;
+
+                prev = row;
             }
-            catch (IgniteCheckedException e) {
-                for (CacheSearchRow row : e0.getValue().keySet())
-                    res.addFailedKey(row.key(), e);
+
+            try {
+                ctx.offheap().invokeAll(ctx, part, rows, GridDhtAtomicCache::rowClosure);
+            }
+            catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
+                err = e;
             }
+
+            for (int i = 0; i < rows.size(); i++) {
+                AtomicCacheUpdateClosure c = rows.get(i).data();
+
+                if (c.operationType() == null) {
+                    dhtUpdRes.addRetryEntry(c.reqIdx);
+
+                    continue;
+                }
+
+                updateSingleEntry(
+                    c,
+                    false,
+                    nearNode,
+                    hasNear,
+                    taskName,
+                    drType,
+                    req,
+                    res,
+                    dhtUpdRes,
+                    affAssignment,
+                    sndPrevVal);
+            }
+        }
+        catch (IgniteCheckedException e) {
+            for (int i = 0; i < batchSize; i++)
+                res.addFailedKey(req.key(batchStart + i), e);
         }
 
         if (err != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 983b094..fdaaae3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -296,6 +296,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
      * @param req Request.
      */
     final void sendSingleRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+        req.sort(cctx.offheap().updateKeysComparator());
+
         if (cctx.localNodeId().equals(nodeId)) {
             cache.updateAllAsyncInternal(cctx.localNode(), req,
                 new GridDhtAtomicCache.UpdateReplyClosure() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index f0d89bf..5ee7f8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 
 import java.nio.ByteBuffer;
+import java.util.Comparator;
 import java.util.List;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
@@ -197,6 +198,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes
         return flags;
     }
 
+    public void sort(Comparator<KeyCacheObject> cmp) {
+        // No-op.
+    }
+
     /**
      * @return {@code True} if originating node detected that rebalancing finished and
      *    expects that update is mapped using current affinity.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 08aa469..93ac45d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.UUID;
 import javax.cache.expiry.ExpiryPolicy;
@@ -175,6 +176,68 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
+    @Override public void sort(Comparator<KeyCacheObject> cmp) {
+        sort0(cmp, 0, keys.size() - 1);
+    }
+
+    private void sort0(Comparator<KeyCacheObject> cmp, int low, int high) {
+        if (keys.size() < 2)
+            return;
+
+        if (low >= high)
+            return;
+
+        KeyCacheObject pivot = keys.get(low + (high - low) / 2);
+
+        int i = low;
+        int j = high;
+
+        while (i <= j) {
+            while (cmp.compare(keys.get(i), pivot) < 0)
+                i++;
+
+            while (cmp.compare(keys.get(j), pivot) > 0)
+                j--;
+
+            if (i <= j) {
+                swap((ArrayList)keys, i, j);
+                swap((ArrayList)vals, i, j);
+                swap((ArrayList)entryProcessors, i, j);
+                swap((ArrayList)conflictVers, i, j);
+                swap(conflictTtls, i, j);
+                swap(conflictExpireTimes, i, j);
+
+                i++;
+                j--;
+            }
+        }
+
+        if (low < j)
+            sort0(cmp, low, j);
+
+        if (high > i)
+            sort0(cmp, i, high);
+    }
+
+    private static void swap(@Nullable ArrayList list, int i, int j) {
+        if (list == null)
+            return;
+
+        Object tmp = list.get(i);
+        list.set(i, list.get(j));
+        list.set(j, tmp);
+    }
+
+    private static void swap(@Nullable GridLongList list, int i, int j) {
+        if (list == null)
+            return;
+
+        long tmp = list.get(i);
+        list.set(i, list.get(j));
+        list.set(j, tmp);
+    }
+
+    /** {@inheritDoc} */
     @Override public void addUpdateEntry(KeyCacheObject key,
         @Nullable Object val,
         long conflictTtl,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3835d6a..e5d1304 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -680,6 +680,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         for (PrimaryRequestState reqState : mappings.values()) {
             GridNearAtomicAbstractUpdateRequest req = reqState.req;
 
+            req.sort(cctx.offheap().updateKeysComparator());
+
             if (locNodeId.equals(req.nodeId())) {
                 assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
                     ", req=" + req + ']';
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 57d3fd2..6050367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.cache.tree.SearchRowEx;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
 import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -2104,7 +2105,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
-        @Override public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException {
+        @Override public SearchRowEx createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException {
             CacheDataStore delegate = init0(false);
 
             return delegate.createSearchRow(cctx, key, data);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 95290dc..552c95a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -530,7 +530,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
      * @param bytes2 Second key bytes.
      * @return Comparison result.
      */
-    private static int compareKeyBytes(byte[] bytes1, byte[] bytes2) {
+    public static int compareKeyBytes(byte[] bytes1, byte[] bytes2) {
         int lenCmp = Integer.compare(bytes1.length, bytes2.length);
 
         if (lenCmp != 0)