You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2019/03/01 09:46:29 UTC
[ignite] 01/02: invokeAll
This is an automated email from the ASF dual-hosted git repository.
sboikov pushed a commit to branch ignite-invokeAll
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 2eddbf817fcc7360e58857a81ff26968e1ee783e
Author: sboikov <sb...@apache.org>
AuthorDate: Fri Mar 1 12:30:05 2019 +0300
invokeAll
---
.../cache/IgniteCacheOffheapManager.java | 5 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 31 ++-
.../distributed/dht/atomic/GridDhtAtomicCache.java | 288 +++++++++++++++------
.../atomic/GridNearAtomicAbstractUpdateFuture.java | 2 +
.../GridNearAtomicAbstractUpdateRequest.java | 5 +
.../atomic/GridNearAtomicFullUpdateRequest.java | 63 +++++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +
.../cache/persistence/GridCacheOffheapManager.java | 3 +-
.../processors/cache/tree/CacheDataTree.java | 2 +-
9 files changed, 310 insertions(+), 91 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 9455c2a..d187444 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPa
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionRecoverState;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.SearchRowEx;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -152,6 +153,8 @@ public interface IgniteCacheOffheapManager {
*/
public CacheDataStore dataStore(GridDhtLocalPartition part);
+ public Comparator<KeyCacheObject> updateKeysComparator();
+
/**
* @param store Data store.
* @throws IgniteCheckedException If failed.
@@ -900,7 +903,7 @@ public interface IgniteCacheOffheapManager {
* @return Cache search row.
* @throws IgniteCheckedException If failed.
*/
- public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException;
+ public SearchRowEx createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException;
/**
* @return Rows comparator.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 0a621dc..a69157e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -188,6 +188,28 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** */
protected GridStripedLock partStoreLock = new GridStripedLock(Runtime.getRuntime().availableProcessors());
+ /** */
+ private final Comparator<KeyCacheObject> updateKeysCmp = new Comparator<KeyCacheObject>() {
+ @Override public int compare(KeyCacheObject key1, KeyCacheObject key2) {
+ try {
+ int cmp = Integer.compare(key1.partition(), key2.partition());
+
+ if (cmp != 0)
+ return cmp;
+
+ cmp = Integer.compare(key1.hashCode(), key2.hashCode());
+
+ if (cmp != 0)
+ return cmp;
+
+ return CacheDataTree.compareKeyBytes(key1.valueBytes(grp.cacheObjectContext()), key2.valueBytes(grp.cacheObjectContext()));
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ };
+ };
+
/** {@inheritDoc} */
@Override public GridAtomicLong globalRemoveId() {
return globalRmvId;
@@ -405,6 +427,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return store == null ? 0 : store.cacheSize(cacheId);
}
+ /** {@inheritDoc} */
+ @Override public Comparator<KeyCacheObject> updateKeysComparator() {
+ return updateKeysCmp;
+ }
+
/**
* @param primary Primary data flag.
* @param backup Primary data flag.
@@ -1652,10 +1679,10 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override public SearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) {
+ @Override public SearchRowEx createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) {
int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID;
- return data != null ? new SearchRowEx<>(cacheId, key, data) : new SearchRow(cacheId, key);
+ return new SearchRowEx<>(cacheId, key, data);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5f58e71..769fbb7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2543,6 +2543,49 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
+ private AtomicCacheUpdateClosure primaryUpdateClosure(
+ GridNearAtomicAbstractUpdateRequest req,
+ int i,
+ List<GridDhtCacheEntry> entries,
+ GridCacheVersion ver,
+ boolean needVal,
+ @Nullable IgniteCacheExpiryPolicy expiry
+ ) {
+ GridCacheOperation op = req.operation();
+
+ AffinityTopologyVersion topVer = req.topologyVersion();
+
+ boolean intercept = ctx.config().getInterceptor() != null;
+
+ // Possibly read value from store.
+ boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() &&
+ (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue()));
+
+ return new AtomicCacheUpdateClosure(
+ i,
+ entries.get(i),
+ topVer,
+ ver,
+ req.operation(),
+ op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i),
+ req.invokeArguments(),
+ readFromStore,
+ writeThrough() && !req.skipStore(),
+ req.keepBinary(),
+ expiry,
+ /*primary*/true,
+ /*verCheck*/false,
+ req.filter(),
+ req.conflictTtl(i),
+ req.conflictExpireTime(i),
+ req.conflictVersion(i),
+ /*conflictResolve*/true,
+ intercept,
+ null,
+ ctx.disableTriggeringCacheInterceptorOnConflict()
+ );
+ }
+
/**
* Updates locked entries one-by-one.
*
@@ -2584,21 +2627,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridDrType drType = replicate ? DR_PRIMARY : DR_NONE;
- boolean batchUpdate = req.size() > 1;
-
Map<UUID, CacheContinuousQueryListener> lsnrs = ctx.continuousQueries().updateListeners(!ctx.userCache(), false);
boolean needVal = lsnrs != null || intercept || retval || op == GridCacheOperation.TRANSFORM ||
!F.isEmptyOrNulls(req.filter());
- Map<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> byPart =
- batchUpdate ? new HashMap<>() : null;
+ boolean batchUpdate = req.size() > 1;
BitSet retryEntries = dhtUpdRes.retryEntries();
if (retryEntries != null)
dhtUpdRes.retryEntries(null);
+ int curPart = -1;
+
+ int batchStart = 0;
+ int batchSize = 0;
+
for (int i = 0; i < req.size(); i++) {
GridDhtCacheEntry entry = locked.get(i);
@@ -2606,111 +2651,182 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (retryEntries != null && !retryEntries.get(i))
continue;
- // Possibly read value from store.
- boolean readFromStore = !req.skipStore() && needVal && (ctx.readThrough() &&
- (op == GridCacheOperation.TRANSFORM || ctx.loadPreviousValue()));
-
- AtomicCacheUpdateClosure c = new AtomicCacheUpdateClosure(
- i,
- entry,
- topVer,
- ver,
- req.operation(),
- op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i),
- req.invokeArguments(),
- readFromStore,
- writeThrough() && !req.skipStore(),
- req.keepBinary(),
- expiry,
- /*primary*/true,
- /*verCheck*/false,
- req.filter(),
- req.conflictTtl(i),
- req.conflictExpireTime(i),
- req.conflictVersion(i),
- /*conflictResolve*/true,
- intercept,
- null,
- ctx.disableTriggeringCacheInterceptorOnConflict()
- );
+ if (batchUpdate) {
+ KeyCacheObject key = req.key(i);
+
+ int part = key.partition();
+
+ assert part >= 0 : key;
+
+ if (curPart >=0 && curPart != part) {
+ updateBatch(
+ batchStart,
+ batchSize,
+ nearNode,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ needVal,
+ drType,
+ taskName,
+ expiry,
+ dhtUpdRes,
+ affAssignment,
+ sndPrevVal);
- entry.key().valueBytes(ctx.cacheObjectContext());
+ batchStart = i;
+ batchSize = 1;
+ }
+ else
+ batchSize++;
- if (batchUpdate) {
- GridDhtLocalPartition part = entry.localPartition();
+ curPart = part;
- TreeMap<CacheSearchRow, AtomicCacheUpdateClosure> map = byPart.get(part);
+ continue;
+ }
- IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part);
+ AtomicCacheUpdateClosure c = primaryUpdateClosure(req, i, locked, ver, needVal, expiry);
- if (map == null)
- byPart.put(part, map = new TreeMap<>(dataStore.rowsComparator()));
+ entry.key().valueBytes(ctx.cacheObjectContext());
- map.put(dataStore.createSearchRow(ctx, entry.key(), null), c);
- }
- else {
- updateSingleEntry(
- c,
- true,
- nearNode,
- hasNear,
- taskName,
- drType,
- req,
- res,
- dhtUpdRes,
- affAssignment,
- sndPrevVal);
- }
+ updateSingleEntry(
+ c,
+ true,
+ nearNode,
+ hasNear,
+ taskName,
+ drType,
+ req,
+ res,
+ dhtUpdRes,
+ affAssignment,
+ sndPrevVal);
}
catch (IgniteCheckedException e) {
res.addFailedKey(entry.key(), e);
}
}
- if (!batchUpdate)
+ if (batchSize > 0) {
+ updateBatch(
+ batchStart,
+ batchSize,
+ nearNode,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ needVal,
+ drType,
+ taskName,
+ expiry,
+ dhtUpdRes,
+ affAssignment,
+ sndPrevVal);
+ }
+ }
+
+ private void updateBatch(
+ int batchStart,
+ int batchSize,
+ ClusterNode nearNode,
+ boolean hasNear,
+ GridNearAtomicAbstractUpdateRequest req,
+ GridNearAtomicUpdateResponse res,
+ List<GridDhtCacheEntry> locked,
+ GridCacheVersion ver,
+ boolean needVal,
+ GridDrType drType,
+ String taskName,
+ @Nullable IgniteCacheExpiryPolicy expiry,
+ DhtAtomicUpdateResult dhtUpdRes,
+ AffinityAssignment affAssignment,
+ boolean sndPrevVal
+ ) throws GridCacheEntryRemovedException {
+ if (batchSize == 1) {
+ try {
+ updateSingleEntry(
+ primaryUpdateClosure(req, batchStart, locked, ver, needVal, expiry),
+ true,
+ nearNode,
+ hasNear,
+ taskName,
+ drType,
+ req,
+ res,
+ dhtUpdRes,
+ affAssignment,
+ sndPrevVal);
+ }
+ catch (IgniteCheckedException e) {
+ res.addFailedKey(req.key(batchStart), e);
+ }
+
return;
+ }
RuntimeException err = null;
- for (Map.Entry<GridDhtLocalPartition, TreeMap<CacheSearchRow, AtomicCacheUpdateClosure>> e0 : byPart.entrySet()) {
- try {
- Map<CacheSearchRow, AtomicCacheUpdateClosure> map = e0.getValue();
+ try {
+ List<SearchRowEx<AtomicCacheUpdateClosure>> rows = new ArrayList<>(batchSize);
- try {
- ctx.offheap().invokeAll(ctx, e0.getKey(), map.keySet(), map::get);
- }
- catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
- err = e;
- }
+ GridDhtLocalPartition part = locked.get(batchStart).localPartition();
- for (Map.Entry<CacheSearchRow, AtomicCacheUpdateClosure> e : map.entrySet()) {
- AtomicCacheUpdateClosure c = e.getValue();
+ IgniteCacheOffheapManager.CacheDataStore dataStore = ctx.offheap().dataStore(part);
- if (c.operationType() == null) {
- dhtUpdRes.addRetryEntry(c.reqIdx);
+ SearchRowEx<AtomicCacheUpdateClosure> prev = null;
- continue;
- }
+ for (int i = 0; i < batchSize; i++) {
+ int idx = batchStart + i;
- updateSingleEntry(
- c,
- false,
- nearNode,
- hasNear,
- taskName,
- drType,
- req,
- res,
- dhtUpdRes,
- affAssignment,
- sndPrevVal);
- }
+ AtomicCacheUpdateClosure c = primaryUpdateClosure(req, idx, locked, ver, needVal, expiry);
+
+ SearchRowEx<AtomicCacheUpdateClosure> row = dataStore.createSearchRow(ctx, c.entry().key(), c);
+
+ rows.add(row);
+
+ // Expect keys in request are already sorted.
+ assert prev == null || part.dataStore().rowsComparator().compare(row, prev) >= 0;
+
+ prev = row;
}
- catch (IgniteCheckedException e) {
- for (CacheSearchRow row : e0.getValue().keySet())
- res.addFailedKey(row.key(), e);
+
+ try {
+ ctx.offheap().invokeAll(ctx, part, rows, GridDhtAtomicCache::rowClosure);
+ }
+ catch (UnregisteredClassException | UnregisteredBinaryTypeException e) {
+ err = e;
}
+
+ for (int i = 0; i < rows.size(); i++) {
+ AtomicCacheUpdateClosure c = rows.get(i).data();
+
+ if (c.operationType() == null) {
+ dhtUpdRes.addRetryEntry(c.reqIdx);
+
+ continue;
+ }
+
+ updateSingleEntry(
+ c,
+ false,
+ nearNode,
+ hasNear,
+ taskName,
+ drType,
+ req,
+ res,
+ dhtUpdRes,
+ affAssignment,
+ sndPrevVal);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ for (int i = 0; i < batchSize; i++)
+ res.addFailedKey(req.key(batchStart + i), e);
}
if (err != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 983b094..fdaaae3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -296,6 +296,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFuture
* @param req Request.
*/
final void sendSingleRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+ req.sort(cctx.offheap().updateKeysComparator());
+
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(cctx.localNode(), req,
new GridDhtAtomicCache.UpdateReplyClosure() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index f0d89bf..5ee7f8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.nio.ByteBuffer;
+import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
@@ -197,6 +198,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheIdMes
return flags;
}
+ public void sort(Comparator<KeyCacheObject> cmp) {
+ // No-op.
+ }
+
/**
* @return {@code True} if originating node detected that rebalancing finished and
* expects that update is mapped using current affinity.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 08aa469..93ac45d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
@@ -175,6 +176,68 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
+ @Override public void sort(Comparator<KeyCacheObject> cmp) {
+ sort0(cmp, 0, keys.size() - 1);
+ }
+
+ private void sort0(Comparator<KeyCacheObject> cmp, int low, int high) {
+ if (keys.size() < 2)
+ return;
+
+ if (low >= high)
+ return;
+
+ KeyCacheObject pivot = keys.get(low + (high - low) / 2);
+
+ int i = low;
+ int j = high;
+
+ while (i <= j) {
+ while (cmp.compare(keys.get(i), pivot) < 0)
+ i++;
+
+ while (cmp.compare(keys.get(j), pivot) > 0)
+ j--;
+
+ if (i <= j) {
+ swap((ArrayList)keys, i, j);
+ swap((ArrayList)vals, i, j);
+ swap((ArrayList)entryProcessors, i, j);
+ swap((ArrayList)conflictVers, i, j);
+ swap(conflictTtls, i, j);
+ swap(conflictExpireTimes, i, j);
+
+ i++;
+ j--;
+ }
+ }
+
+ if (low < j)
+ sort0(cmp, low, j);
+
+ if (high > i)
+ sort0(cmp, i, high);
+ }
+
+ private static void swap(@Nullable ArrayList list, int i, int j) {
+ if (list == null)
+ return;
+
+ Object tmp = list.get(i);
+ list.set(i, list.get(j));
+ list.set(j, tmp);
+ }
+
+ private static void swap(@Nullable GridLongList list, int i, int j) {
+ if (list == null)
+ return;
+
+ long tmp = list.get(i);
+ list.set(i, list.get(j));
+ list.set(j, tmp);
+ }
+
+ /** {@inheritDoc} */
@Override public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 3835d6a..e5d1304 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -680,6 +680,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
for (PrimaryRequestState reqState : mappings.values()) {
GridNearAtomicAbstractUpdateRequest req = reqState.req;
+ req.sort(cctx.offheap().updateKeysComparator());
+
if (locNodeId.equals(req.nodeId())) {
assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
", req=" + req + ']';
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 57d3fd2..6050367 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.cache.tree.SearchRowEx;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -2104,7 +2105,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public CacheSearchRow createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException {
+ @Override public SearchRowEx createSearchRow(GridCacheContext cctx, KeyCacheObject key, Object data) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
return delegate.createSearchRow(cctx, key, data);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
index 95290dc..552c95a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java
@@ -530,7 +530,7 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
* @param bytes2 Second key bytes.
* @return Comparison result.
*/
- private static int compareKeyBytes(byte[] bytes1, byte[] bytes2) {
+ public static int compareKeyBytes(byte[] bytes1, byte[] bytes2) {
int lenCmp = Integer.compare(bytes1.length, bytes2.length);
if (lenCmp != 0)