You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by gv...@apache.org on 2018/11/21 12:18:01 UTC

ignite git commit: IGNITE-9928: MVCC TX: Late affinity assignment support. This closes #5057.

Repository: ignite
Updated Branches:
  refs/heads/master 529b6de6a -> a8fc7af01


IGNITE-9928: MVCC TX: Late affinity assignment support. This closes #5057.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8fc7af0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8fc7af0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8fc7af0

Branch: refs/heads/master
Commit: a8fc7af015febee09b70846277f1170b64bce74e
Parents: 529b6de
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Wed Nov 21 15:17:52 2018 +0300
Committer: Igor Seliverstov <gv...@gmail.com>
Committed: Wed Nov 21 15:17:52 2018 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManager.java        |   5 +
 .../cache/IgniteCacheOffheapManagerImpl.java    |  18 ++
 .../distributed/dht/GridDhtCacheEntry.java      |   8 +-
 .../dht/GridDhtTxAbstractEnlistFuture.java      | 169 +++++++------
 .../GridDhtPartitionsExchangeFuture.java        |  33 ++-
 .../dht/topology/GridDhtLocalPartition.java     |   3 +-
 .../near/GridNearTxEnlistFuture.java            |  22 +-
 .../GridNearTxQueryResultsEnlistFuture.java     |  21 +-
 .../persistence/GridCacheOffheapManager.java    |  12 +
 .../cache/persistence/tree/BPlusTree.java       |  31 +++
 .../cache/transactions/IgniteTxAdapter.java     |  46 +++-
 .../cache/transactions/IgniteTxHandler.java     | 239 +++++++++++--------
 .../cache/mvcc/CacheMvccAbstractTest.java       |  19 +-
 .../testframework/junits/GridAbstractTest.java  |   2 +-
 .../mvcc/CacheMvccBackupsAbstractTest.java      |   2 +
 ...ccPartitionedSqlCoordinatorFailoverTest.java |  49 +++-
 16 files changed, 439 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index db09a89..cdc44a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -675,6 +675,11 @@ public interface IgniteCacheOffheapManager {
         long fullSize();
 
         /**
+         * @return {@code True} if there are no items in the store.
+         */
+        boolean isEmpty();
+
+        /**
          * Updates size metric for particular cache.
          *
          * @param cacheId Cache ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 6835795..e15009e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1533,6 +1533,24 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
             return storageSize.get();
         }
 
+        /**
+         * @return {@code True} if there are no items in the store.
+         */
+        @Override public boolean isEmpty() {
+            try {
+                /*
+                 * TODO https://issues.apache.org/jira/browse/IGNITE-10082
+                 * Using of counters is cheaper than tree operations. Return size checking after the ticked is resolved.
+                 */
+                return grp.mvccEnabled() ? dataTree.isEmpty() : storageSize.get() == 0;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to perform operation.", e);
+
+                return false;
+            }
+        }
+
         /** {@inheritDoc} */
         @Override public void updateSize(int cacheId, long delta) {
             storageSize.addAndGet(delta);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index e2ad525..8eea5a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -648,10 +648,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
                     ']');
             }
 
-                if (cctx.mvccEnabled())
-                    cctx.offheap().mvccRemoveAll(this);
-                else
-                    removeValue();
+            if (cctx.mvccEnabled())
+                cctx.offheap().mvccRemoveAll(this);
+            else
+                removeValue();
 
             // Give to GC.
             update(null, 0L, 0L, ver, true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
index 9949930..62084dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxAbstractEnlistFuture.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -386,12 +385,20 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
 
         try {
             while (true) {
+                int curPart = -1;
+                List<ClusterNode> backups = null;
+
                 while (hasNext0()) {
                     Object cur = next0();
 
-                    KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
+                    KeyCacheObject key = toKey(op, cur);
+
+                    if (curPart != key.partition())
+                        backups = backupNodes(curPart = key.partition());
+
+                    assert backups != null;
 
-                    if (!ensureFreeSlot(key)) {
+                    if (!ensureFreeSlot(key, backups)) {
                         // Can't advance further at the moment.
                         peek = cur;
 
@@ -442,7 +449,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
                                         cctx.localNodeId(),
                                         topVer,
                                         mvccSnapshot,
-                                        isMoving(key.partition()),
+                                        isMoving(key.partition(), backups),
                                         needOldVal,
                                         filter,
                                         needResult());
@@ -463,7 +470,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
                                         topVer,
                                         mvccSnapshot,
                                         op.cacheOperation(),
-                                        isMoving(key.partition()),
+                                        isMoving(key.partition(), backups),
                                         op.noCreate(),
                                         needOldVal,
                                         filter,
@@ -502,13 +509,14 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
                             res = updateFut.get();
                         else {
                             GridDhtCacheEntry entry0 = entry;
+                            List<ClusterNode> backups0 = backups;
 
                             it.beforeDetach();
 
                             updateFut.listen(new CI1<IgniteInternalFuture<GridCacheUpdateTxResult>>() {
                                 @Override public void apply(IgniteInternalFuture<GridCacheUpdateTxResult> fut) {
                                     try {
-                                        processEntry(entry0, op, fut.get(), val0);
+                                        processEntry(entry0, op, fut.get(), val0, backups0);
 
                                         continueLoop(true);
                                     }
@@ -523,7 +531,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
                         }
                     }
 
-                    processEntry(entry, op, res, val0);
+                    processEntry(entry, op, res, val0, backups);
                 }
 
                 if (!hasNext0()) {
@@ -591,6 +599,16 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
         return peek != FINISHED;
     }
 
+    /** */
+    private KeyCacheObject toKey(EnlistOperation op, Object cur) {
+        KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
+
+        if (key.partition() == -1)
+            key.partition(cctx.affinity().partition(key));
+
+        return key;
+    }
+
     /**
      * @return {@code True} if in-flight batches map is empty.
      */
@@ -611,41 +629,40 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
      * @param op Operation.
      * @param updRes Update result.
      * @param val New value.
+     * @param backups Backup nodes
      * @throws IgniteCheckedException If failed.
      */
     private void processEntry(GridDhtCacheEntry entry, EnlistOperation op,
-        GridCacheUpdateTxResult updRes, Message val) throws IgniteCheckedException {
+        GridCacheUpdateTxResult updRes, Message val, List<ClusterNode> backups) throws IgniteCheckedException {
         checkCompleted();
 
         assert updRes != null && updRes.updateFuture() == null;
 
         onEntryProcessed(entry.key(), updRes);
 
-        if (!updRes.success())
+        if (!updRes.success()
+            || updRes.filtered()
+            || op == EnlistOperation.LOCK)
             return;
 
-        if (!updRes.filtered())
-            cctx.shared().mvccCaching().addEnlisted(entry.key(), updRes.newValue(), 0, 0, lockVer,
-                updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot, cctx.cacheId(), tx, null, -1);
+        cctx.shared().mvccCaching().addEnlisted(entry.key(), updRes.newValue(), 0, 0, lockVer,
+            updRes.oldValue(), tx.local(), tx.topologyVersion(), mvccSnapshot, cctx.cacheId(), tx, null, -1);
 
-        if (op != EnlistOperation.LOCK)
-            addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), op);
+        addToBatch(entry.key(), val, updRes.mvccHistory(), entry.context().cacheId(), backups);
     }
 
     /**
      * Adds row to batch.
      * <b>IMPORTANT:</b> This method should be called from the critical section in {@link this.sendNextBatches()}
-     *
      * @param key Key.
      * @param val Value.
      * @param hist History rows.
      * @param cacheId Cache Id.
+     * @param backups Backup nodes
      */
     private void addToBatch(KeyCacheObject key, Message val, List<MvccLinkAwareSearchRow> hist,
-        int cacheId, EnlistOperation op) throws IgniteCheckedException {
-        List<ClusterNode> backups = backupNodes(key);
-
-        int part = cctx.affinity().partition(key);
+        int cacheId, List<ClusterNode> backups) throws IgniteCheckedException {
+        int part = key.partition();
 
         tx.touchPartition(cacheId, part);
 
@@ -659,7 +676,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
 
             boolean moving = isMoving(node, part);
 
-            if (skipNearNodeUpdates && node.id().equals(nearNodeId) && !moving) {
+            if (skipNearLocalUpdate(node, moving)) {
                 updateMappings(node);
 
                 if (newRemoteTx(node))
@@ -680,15 +697,13 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
             if (batch == null)
                 batches.put(node.id(), batch = new Batch(node));
 
-            if (moving && hist0 == null && !op.isInvoke()) {
+            if (moving && hist0 == null) {
                 assert !F.isEmpty(hist);
 
                 hist0 = fetchHistoryInfo(key, hist);
             }
 
-            Message m = moving && !op.isInvoke() ? hist0 : val;
-
-            batch.add(key, m);
+            batch.add(key, moving ? hist0 : val);
 
             if (batch.size() == BATCH_SIZE) {
                 assert batches != null;
@@ -770,16 +785,19 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
      * Checks if there free space in batches or free slot in in-flight batches is available for the given key.
      *
      * @param key Key.
+     * @param backups Backup nodes.
      * @return {@code True} if there is possible to add this key to batch or send ready batch.
      */
     @SuppressWarnings("ForLoopReplaceableByForEach")
-    private boolean ensureFreeSlot(KeyCacheObject key) {
+    private boolean ensureFreeSlot(KeyCacheObject key, List<ClusterNode> backups) {
         if (F.isEmpty(batches) || F.isEmpty(pending))
             return true;
 
+        int part = key.partition();
+
         // Check possibility of adding to batch and sending.
-        for (ClusterNode node : backupNodes(key)) {
-            if (skipNearNodeUpdates && node.id().equals(nearNodeId) && !isMoving(node, key.partition()))
+        for (ClusterNode node : backups) {
+            if (skipNearLocalUpdate(node, isMoving(node, part)))
                 continue;
 
             Batch batch = batches.get(node.id());
@@ -816,16 +834,14 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
         if (newRemoteTx(node))
             addNewRemoteTxNode(node);
 
-        if (!firstReqSent.contains(node)) {
-            firstReqSent.add(node);
-
+        if (firstReqSent.add(node)) {
             // If this is a first request to this node, send full info.
             req = new GridDhtTxQueryFirstEnlistRequest(cctx.cacheId(),
                 futId,
                 cctx.localNodeId(),
                 tx.topologyVersionSnapshot(),
                 lockVer,
-                mvccSnapshot,
+                mvccSnapshot.withoutActiveTransactions(),
                 tx.remainingTime(),
                 tx.taskNameHash(),
                 nearNodeId,
@@ -863,7 +879,13 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
 
         assert prev == null;
 
-        cctx.io().send(node, req, cctx.ioPolicy());
+        try {
+            cctx.io().send(node, req, cctx.ioPolicy());
+        }
+        catch (ClusterTopologyCheckedException e) {
+            // backup node left the grid, will continue.
+            onNodeLeft(node.id());
+        }
     }
 
     /** */
@@ -880,20 +902,21 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
         mapping.markQueryUpdate();
     }
 
+    /** */
+    private boolean skipNearLocalUpdate(ClusterNode node, boolean moving) {
+        return skipNearNodeUpdates && node.id().equals(nearNodeId) && !moving;
+    }
+
     /**
-     * @param key Key.
-     * @return Backup nodes for the given key.
+     * @param part Partition.
+     * @return Backup nodes for the given partition.
      */
-    @NotNull private List<ClusterNode> backupNodes(KeyCacheObject key) {
-        List<ClusterNode> dhtNodes = cctx.affinity().nodesByKey(key, tx.topologyVersion());
-
-        assert !dhtNodes.isEmpty() && dhtNodes.get(0).id().equals(cctx.localNodeId()) :
-            "localNode = " + cctx.localNodeId() + ", dhtNodes = " + dhtNodes;
+    @NotNull private List<ClusterNode> backupNodes(int part) {
+        List<ClusterNode> nodes = cctx.topology().nodes(part, tx.topologyVersion());
 
-        if (dhtNodes.size() == 1)
-            return Collections.emptyList();
+        assert nodes.size() > 0 && nodes.get(0).isLocal();
 
-        return dhtNodes.subList(1, dhtNodes.size());
+        return nodes.subList(1, nodes.size());
     }
 
     /**
@@ -933,31 +956,33 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
 
     /**
      * @param part Partition.
+     * @param backups Backup nodes.
      * @return {@code true} if the given partition is rebalancing to any backup node.
      */
-    private boolean isMoving(int part) {
+    private boolean isMoving(int part, List<ClusterNode> backups) {
+        Boolean res;
+
         if (movingParts == null)
             movingParts = new HashMap<>();
 
-        Boolean res = movingParts.get(part);
+        if ((res = movingParts.get(part)) == null)
+            movingParts.put(part, res = isMoving0(part, backups));
 
-        if (res != null)
-            return res;
-
-        List<ClusterNode> dhtNodes = cctx.affinity().nodesByPartition(part, tx.topologyVersion());
-
-        for (int i = 1; i < dhtNodes.size(); i++) {
-            ClusterNode node = dhtNodes.get(i);
-            if (isMoving(node, part)) {
-                movingParts.put(part, Boolean.TRUE);
+        return res == Boolean.TRUE;
+    }
 
-                return true;
-            }
+    /**
+     * @param part Partition.
+     * @param backups Backup nodes.
+     * @return {@code true} if the given partition is rebalancing to any backup node.
+     */
+    private Boolean isMoving0(int part, List<ClusterNode> backups) {
+        for (ClusterNode node : backups) {
+            if (isMoving(node, part))
+                return Boolean.TRUE;
         }
 
-        movingParts.put(part, Boolean.FALSE);
-
-        return false;
+        return Boolean.FALSE;
     }
 
     /**
@@ -966,9 +991,7 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
      * @return {@code true} if the given partition is rebalancing to the given node.
      */
     private boolean isMoving(ClusterNode node, int part) {
-        GridDhtPartitionState partState = cctx.topology().partitionState(node.id(), part);
-
-        return partState != GridDhtPartitionState.OWNING && partState != GridDhtPartitionState.EVICTED;
+        return cctx.topology().partitionState(node.id(), part) == GridDhtPartitionState.MOVING;
     }
 
     /** */
@@ -1021,23 +1044,17 @@ public abstract class GridDhtTxAbstractEnlistFuture<T> extends GridCacheFutureAd
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
-        boolean backupLeft = false;
-
-        Set<ClusterNode> nodes = tx.lockTransactionNodes();
-
-        if (!F.isEmpty(nodes)) {
-            for (ClusterNode node : nodes) {
-                if (node.id().equals(nodeId)) {
-                    backupLeft = true;
-
-                    break;
-                }
-            }
+        try {
+            if (nearNodeId.equals(nodeId))
+                onDone(new ClusterTopologyCheckedException("Requesting node left the grid [nodeId=" + nodeId + ']'));
+            else if (pending != null && pending.remove(nodeId) != null)
+                cctx.kernalContext().closure().runLocalSafe(() -> continueLoop(false));
+        }
+        catch (Exception e) {
+            onDone(e);
         }
 
-        return (backupLeft || nearNodeId.equals(nodeId)) && onDone(
-            new ClusterTopologyCheckedException((backupLeft ? "Backup" : "Requesting") +
-                " node left the grid [nodeId=" + nodeId + ']'));
+        return false;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 86000cd..e9d9442 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -593,7 +593,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
-     * @return First event discovery event.1
+     * @return First event discovery event.
      *
      */
     public DiscoveryEvent firstEvent() {
@@ -3566,27 +3566,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
      * failed to send update counter deltas to backup.
      */
     private void finalizePartitionCounters() {
-        long time = System.currentTimeMillis();
+        int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
 
-        try {
-            int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
+        // Reserve at least 2 threads for system operations.
+        parallelismLvl = Math.max(1, parallelismLvl - 2);
 
-            // Reserve at least 2 threads for system operations.
-            parallelismLvl = Math.max(1, parallelismLvl - 2);
+        long time = System.currentTimeMillis();
 
-            if (parallelismLvl > 1) {
-                U.doInParallel(parallelismLvl,
-                    cctx.kernalContext().getSystemExecutorService(),
-                    nonLocalCacheGroups(),
-                    grp -> {
-                        grp.topology().finalizeUpdateCounters();
+        try {
+            U.<CacheGroupContext, Void>doInParallel(
+                parallelismLvl,
+                cctx.kernalContext().getSystemExecutorService(),
+                nonLocalCacheGroups(),
+                grp -> {
+                    grp.topology().finalizeUpdateCounters();
 
-                        return null;
-                    }
-                );
-            }
-            else
-                nonLocalCacheGroups().forEach(grp -> grp.topology().finalizeUpdateCounters());
+                    return null;
+                }
+            );
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException("Failed to finalize partition counters", e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index f3516a6..833a2b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.util.deque.FastSizeDeque;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -350,7 +351,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return {@code True} if partition is empty.
      */
     public boolean isEmpty() {
-        return store.fullSize() == 0 && internalSize() == 0;
+        return store.isEmpty() && internalSize() == 0;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
index 2a7c2e4..35a8cfa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java
@@ -209,11 +209,9 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
 
                 KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
 
-                List<ClusterNode> nodes = cctx.affinity().nodesByKey(key, topVer);
+                ClusterNode node = cctx.affinity().primaryByKey(key, topVer);
 
-                ClusterNode node;
-
-                if (F.isEmpty(nodes) || ((node = nodes.get(0)) == null))
+                if (node == null)
                     throw new ClusterTopologyCheckedException("Failed to get primary node " +
                         "[topVer=" + topVer + ", key=" + key + ']');
 
@@ -240,8 +238,7 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
                     break;
                 }
 
-                batch.add(op.isDeleteOrLock() ? key : cur,
-                    op != EnlistOperation.LOCK && cctx.affinityNode() && (cctx.isReplicated() || nodes.indexOf(cctx.localNode()) > 0));
+                batch.add(op.isDeleteOrLock() ? key : cur, !node.isLocal() && isLocalBackup(op, key));
 
                 if (batch.size() == batchSize)
                     res = markReady(res, batch);
@@ -297,6 +294,16 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
         return peek != FINISHED;
     }
 
+    /** */
+    private boolean isLocalBackup(EnlistOperation op, KeyCacheObject key) {
+        if (!cctx.affinityNode() || op == EnlistOperation.LOCK)
+            return false;
+        else if (cctx.isReplicated())
+            return true;
+
+        return cctx.topology().nodes(key.partition(), tx.topologyVersion()).indexOf(cctx.localNode()) > 0;
+    }
+
     /**
      * Add batch to batch collection if it is ready.
      *
@@ -604,6 +611,9 @@ public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridC
 
         assert this.res != null && (this.res.emptyResult() || needRes || this.res.invokeResult() || !this.res.success());
 
+
+        tx.hasRemoteLocks(true);
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
index d6deed1..316aae5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxQueryResultsEnlistFuture.java
@@ -200,11 +200,9 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
 
                 KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey());
 
-                List<ClusterNode> nodes = cctx.affinity().nodesByKey(key, topVer);
+                ClusterNode node = cctx.affinity().primaryByPartition(key.partition(), topVer);
 
-                ClusterNode node;
-
-                if (F.isEmpty(nodes) || ((node = nodes.get(0)) == null))
+                if (node == null)
                     throw new ClusterTopologyCheckedException("Failed to get primary node " +
                         "[topVer=" + topVer + ", key=" + key + ']');
 
@@ -229,8 +227,7 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
                     break;
                 }
 
-                batch.add(op.isDeleteOrLock() ? key : cur,
-                    op != EnlistOperation.LOCK && cctx.affinityNode() && (cctx.isReplicated() || nodes.indexOf(cctx.localNode()) > 0));
+                batch.add(op.isDeleteOrLock() ? key : cur, !node.isLocal() && isLocalBackup(op, key));
 
                 if (batch.size() == batchSize)
                     res = markReady(res, batch);
@@ -287,6 +284,16 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
     }
 
     /** */
+    private boolean isLocalBackup(EnlistOperation op, KeyCacheObject key) {
+        if (!cctx.affinityNode() || op == EnlistOperation.LOCK)
+            return false;
+        else if (cctx.isReplicated())
+            return true;
+
+        return cctx.topology().nodes(key.partition(), tx.topologyVersion()).contains(cctx.localNode());
+    }
+
+    /** */
     private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) {
         if (!batch.ready()) {
             batch.ready(true);
@@ -571,6 +578,8 @@ public class GridNearTxQueryResultsEnlistFuture extends GridNearTxQueryAbstractE
 
         RES_UPD.getAndAdd(this, res.result());
 
+        tx.hasRemoteLocks(true);
+
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 73cb878..fb287bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1770,6 +1770,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
+        @Override public boolean isEmpty() {
+            try {
+                CacheDataStore delegate0 = init0(true);
+
+                return delegate0 == null || delegate0.isEmpty();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public long cacheSize(int cacheId) {
             try {
                 CacheDataStore delegate0 = init0(true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 17799d3..7852008 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -2098,6 +2098,37 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements
     }
 
     /**
+     * @return {@code True} in case the tree is empty.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final boolean isEmpty() throws IgniteCheckedException {
+        checkDestroyed();
+
+        for (;;) {
+            TreeMetaData treeMeta = treeMeta();
+
+            long rootId, rootPage = acquirePage(rootId = treeMeta.rootId);
+
+            long rootAddr = readLock(rootId, rootPage);
+
+            if (rootAddr == 0) {
+                checkDestroyed();
+
+                continue;
+            }
+
+            try {
+                BPlusIO<L> io = io(rootAddr);
+
+                return io.getCount(rootAddr) == 0;
+            }
+            finally {
+                readUnlock(rootId, rootPage, rootAddr);
+            }
+        }
+    }
+
+    /**
      * Returns number of elements in the tree by scanning pages of the bottom (leaf) level.
      * Since a concurrent access is permitted, there is no guarantee about
      * momentary consistency: the method may miss updates made in already scanned pages.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 52659e3..ec1b646 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -62,7 +62,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
@@ -1983,21 +1985,51 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
 
         for (Map.Entry<Integer, ? extends Map<Integer, AtomicLong>> entry : sizeDeltas.entrySet()) {
             Integer cacheId = entry.getKey();
-            Map<Integer, AtomicLong> partDeltas = entry.getValue();
+            Map<Integer, AtomicLong> deltas = entry.getValue();
 
-            assert !F.isEmpty(partDeltas);
+            assert !F.isEmpty(deltas);
 
             GridDhtPartitionTopology top = cctx.cacheContext(cacheId).topology();
 
-            for (Map.Entry<Integer, AtomicLong> e : partDeltas.entrySet()) {
-                Integer p = e.getKey();
+            // Need to reserve on backups only
+            boolean reserve = dht() && remote();
+
+            for (Map.Entry<Integer, AtomicLong> e : deltas.entrySet()) {
+                boolean invalid = false;
+                int p = e.getKey();
                 long delta = e.getValue().get();
 
-                GridDhtLocalPartition dhtPart = top.localPartition(p);
+                try {
+                    GridDhtLocalPartition part = top.localPartition(p);
+
+                    if (!reserve || part != null && part.reserve()) {
+                        assert part != null;
+
+                        try {
+                            if (part.state() != GridDhtPartitionState.RENTING)
+                                part.dataStore().updateSize(cacheId, delta);
+                            else
+                                invalid = true;
+                        }
+                        finally {
+                            if (reserve)
+                                part.release();
+                        }
+                    }
+                    else
+                        invalid = true;
+                }
+                catch (GridDhtInvalidPartitionException e1) {
+                    invalid = true;
+                }
 
-                assert dhtPart != null;
+                if (invalid) {
+                    assert reserve;
 
-                dhtPart.dataStore().updateSize(cacheId, delta);
+                    if (log.isDebugEnabled())
+                        log.debug("Trying to apply size delta for invalid partition: " +
+                            "[cacheId=" + cacheId + ", part=" + p + "]");
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 3685f7a..aefd54c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -25,7 +25,6 @@ import javax.cache.processor.EntryProcessor;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
@@ -34,7 +33,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -64,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeVal
 import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture;
@@ -1664,8 +1663,6 @@ public class IgniteTxHandler {
             GridDhtTxRemote tx = ctx.tm().tx(req.version());
 
             if (tx == null) {
-                assert !req.queryUpdate();
-
                 boolean single = req.last() && req.writes().size() == 1;
 
                 tx = new GridDhtTxRemote(
@@ -1866,117 +1863,132 @@ public class IgniteTxHandler {
 
             int part = ctx.affinity().partition(key);
 
-            GridDhtLocalPartition locPart = ctx.topology().localPartition(part, tx.topologyVersion(), false);
+            try {
+                GridDhtLocalPartition locPart = ctx.topology().localPartition(part, tx.topologyVersion(), false);
 
-            if (locPart == null || !locPart.reserve())
-                throw new ClusterTopologyException("Can not reserve partition. Please retry on stable topology.");
+                if (locPart != null && locPart.reserve()) {
+                    try {
+                        // do not process renting partitions.
+                        if (locPart.state() == GridDhtPartitionState.RENTING) {
+                            tx.addInvalidPartition(ctx, part);
 
-            try {
-                CacheObject val = null;
-                EntryProcessor entryProc = null;
-                Object[] invokeArgs = null;
+                            continue;
+                        }
 
-                boolean needOldVal = ctx.shared().mvccCaching().continuousQueryListeners(ctx, tx, key) != null;
+                        CacheObject val = null;
+                        EntryProcessor entryProc = null;
+                        Object[] invokeArgs = null;
 
-                Message val0 = vals != null ? vals.get(i) : null;
+                        boolean needOldVal = ctx.shared().mvccCaching().continuousQueryListeners(ctx, tx, key) != null;
 
-                CacheEntryInfoCollection entries =
-                    val0 instanceof CacheEntryInfoCollection ? (CacheEntryInfoCollection)val0 : null;
+                        Message val0 = vals != null ? vals.get(i) : null;
 
-                if (entries == null && !op.isDeleteOrLock() && !op.isInvoke())
-                    val = (val0 instanceof CacheObject) ? (CacheObject)val0 : null;
+                        CacheEntryInfoCollection entries =
+                            val0 instanceof CacheEntryInfoCollection ? (CacheEntryInfoCollection)val0 : null;
 
-                if(entries == null && op.isInvoke()) {
-                    assert val0 instanceof GridInvokeValue;
+                        if (entries == null && !op.isDeleteOrLock() && !op.isInvoke())
+                            val = (val0 instanceof CacheObject) ? (CacheObject)val0 : null;
 
-                    GridInvokeValue invokeVal = (GridInvokeValue)val0;
+                        if (entries == null && op.isInvoke()) {
+                            assert val0 instanceof GridInvokeValue;
 
-                    entryProc = invokeVal.entryProcessor();
-                    invokeArgs = invokeVal.invokeArgs();
-                }
+                            GridInvokeValue invokeVal = (GridInvokeValue)val0;
 
-                assert entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
+                            entryProc = invokeVal.entryProcessor();
+                            invokeArgs = invokeVal.invokeArgs();
+                        }
 
-                GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
+                        assert entryProc != null || !op.isInvoke() : "entryProc=" + entryProc + ", op=" + op;
 
-                GridCacheUpdateTxResult updRes;
+                        GridDhtCacheEntry entry = dht.entryExx(key, tx.topologyVersion());
 
-                while (true) {
-                    ctx.shared().database().checkpointReadLock();
+                        GridCacheUpdateTxResult updRes;
 
-                    try {
-                        if (entries == null) {
-                            switch (op) {
-                                case DELETE:
-                                    updRes = entry.mvccRemove(
-                                        tx,
-                                        ctx.localNodeId(),
-                                        tx.topologyVersion(),
-                                        snapshot,
-                                        false,
-                                        needOldVal,
-                                        null,
-                                        false);
-
-                                    break;
-
-                                case INSERT:
-                                case TRANSFORM:
-                                case UPSERT:
-                                case UPDATE:
-                                    updRes = entry.mvccSet(
-                                        tx,
+                        while (true) {
+                            ctx.shared().database().checkpointReadLock();
+
+                            try {
+                                if (entries == null) {
+                                    switch (op) {
+                                        case DELETE:
+                                            updRes = entry.mvccRemove(
+                                                tx,
+                                                ctx.localNodeId(),
+                                                tx.topologyVersion(),
+                                                snapshot,
+                                                false,
+                                                needOldVal,
+                                                null,
+                                                false);
+
+                                            break;
+
+                                        case INSERT:
+                                        case TRANSFORM:
+                                        case UPSERT:
+                                        case UPDATE:
+                                            updRes = entry.mvccSet(
+                                                tx,
+                                                ctx.localNodeId(),
+                                                val,
+                                                entryProc,
+                                                invokeArgs,
+                                                0,
+                                                tx.topologyVersion(),
+                                                snapshot,
+                                                op.cacheOperation(),
+                                                false,
+                                                false,
+                                                needOldVal,
+                                                null,
+                                                false);
+
+                                            break;
+
+                                        default:
+                                            throw new IgniteSQLException("Cannot acquire lock for operation [op= "
+                                                + op + "]" + "Operation is unsupported at the moment ",
+                                                IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+                                    }
+                                }
+                                else {
+                                    updRes = entry.mvccUpdateRowsWithPreloadInfo(tx,
                                         ctx.localNodeId(),
-                                        val,
-                                        entryProc,
-                                        invokeArgs,
-                                        0,
                                         tx.topologyVersion(),
-                                        snapshot,
+                                        entries.infos(),
                                         op.cacheOperation(),
-                                        false,
-                                        false,
-                                        needOldVal,
-                                        null,
-                                        false);
-
-                                    break;
-
-                                default:
-                                    throw new IgniteSQLException("Cannot acquire lock for operation [op= "
-                                        + op + "]" + "Operation is unsupported at the moment ",
-                                        IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
+                                        snapshot,
+                                        futId,
+                                        batchNum);
+                                }
+
+                                break;
+                            }
+                            catch (GridCacheEntryRemovedException ignore) {
+                                entry = dht.entryExx(key);
+                            }
+                            finally {
+                                ctx.shared().database().checkpointReadUnlock();
                             }
-                        }
-                        else {
-                            updRes = entry.mvccUpdateRowsWithPreloadInfo(tx,
-                                ctx.localNodeId(),
-                                tx.topologyVersion(),
-                                entries.infos(),
-                                op.cacheOperation(),
-                                snapshot,
-                                futId,
-                                batchNum);
                         }
 
-                        break;
-                    }
-                    catch (GridCacheEntryRemovedException ignore) {
-                        entry = dht.entryExx(key);
+                        if (!updRes.filtered())
+                            ctx.shared().mvccCaching().addEnlisted(key, updRes.newValue(), 0, 0, tx.xidVersion(),
+                                updRes.oldValue(), tx.local(), tx.topologyVersion(), snapshot, ctx.cacheId(), tx, futId, batchNum);
+
+                        assert updRes.updateFuture() == null : "Entry should not be locked on the backup";
                     }
+
                     finally {
-                        ctx.shared().database().checkpointReadUnlock();
+                        locPart.release();
                     }
                 }
-
-                if (!updRes.filtered())
-                    ctx.shared().mvccCaching().addEnlisted(key, updRes.newValue(), 0, 0, tx.xidVersion(),
-                        updRes.oldValue(), tx.local(), tx.topologyVersion(), snapshot, ctx.cacheId(), tx, futId, batchNum);
-
-                assert updRes.updateFuture() == null : "Entry should not be locked on the backup";
+                else {
+                    tx.addInvalidPartition(ctx, part);
+                }
             }
-            finally {
-                locPart.release();
+            catch (GridDhtInvalidPartitionException e) {
+                tx.addInvalidPartition(ctx, e.partition());
             }
         }
     }
@@ -2218,26 +2230,42 @@ public class IgniteTxHandler {
         if (counters == null)
             return;
 
-        int cacheId = CU.UNDEFINED_CACHE_ID;
-        GridDhtPartitionTopology top = null;
-
         for (PartitionUpdateCountersMessage counter : counters) {
-            if (counter.cacheId() != cacheId) {
-                GridCacheContext ctx0 = ctx.cacheContext(cacheId = counter.cacheId());
+            GridCacheContext ctx0 = ctx.cacheContext(counter.cacheId());
 
-                assert ctx0.mvccEnabled();
+            assert ctx0.mvccEnabled();
 
-                top = ctx0.topology();
-            }
+            GridDhtPartitionTopology top = ctx0.topology();
 
             assert top != null;
 
             for (int i = 0; i < counter.size(); i++) {
-                GridDhtLocalPartition part = top.localPartition(counter.partition(i));
+                boolean invalid = false;
+
+                try {
+                    GridDhtLocalPartition part = top.localPartition(counter.partition(i));
 
-                assert part != null;
+                    if (part != null && part.reserve()) {
+                        try {
+                            if (part.state() != GridDhtPartitionState.RENTING)
+                                part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
+                            else
+                                invalid = true;
+                        }
+                        finally {
+                            part.release();
+                        }
+                    }
+                    else
+                        invalid = true;
+                }
+                catch (GridDhtInvalidPartitionException e) {
+                    invalid = true;
+                }
 
-                part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
+                if (invalid && log.isDebugEnabled())
+                    log.debug("Received partition update counters message for invalid partition: " +
+                        "[cacheId=" + counter.cacheId() + ", part=" + counter.partition(i) + "]");
             }
         }
     }
@@ -2250,25 +2278,24 @@ public class IgniteTxHandler {
     @Nullable public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(
         IgniteInternalTx tx, ClusterNode node) {
         TxCounters txCntrs = tx.txCounters(false);
+        Collection<PartitionUpdateCountersMessage> updCntrs;
 
-        if (txCntrs == null || F.isEmpty(txCntrs.updateCounters()))
+        if (txCntrs == null || F.isEmpty(updCntrs = txCntrs.updateCounters()))
             return null;
 
-        Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters();
-
         List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
 
         AffinityTopologyVersion top = tx.topologyVersionSnapshot();
 
         for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
-            GridCacheAffinityManager affinity = ctx.cacheContext(partCntrs.cacheId()).affinity();
+            GridDhtPartitionTopology topology = ctx.cacheContext(partCntrs.cacheId()).topology();
 
             PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
 
             for (int i = 0; i < partCntrs.size(); i++) {
                 int part = partCntrs.partition(i);
 
-                if (affinity.backupByPartition(node, part, top))
+                if (topology.nodes(part, top).indexOf(node) > 0)
                     resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 7869e13..4283d1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -944,6 +944,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                                 tx.commit();
 
+                                v++;
+
                                 first = false;
                             }
 
@@ -951,10 +953,8 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                                 Map<Integer, Integer> res = readAllByMode(cache.cache, keys, readMode, INTEGER_CODEC);
 
                                 for (Integer k : keys)
-                                    assertEquals("key=" + k, v, (Object)res.get(k));
+                                    assertEquals("key=" + k, v - 1, (Object)res.get(k));
                             }
-
-                            v++;
                         }
                         catch (Exception e) {
                             handleTxException(e);
@@ -1366,6 +1366,13 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                 }
             }, readers, "reader");
 
+            GridTestUtils.runAsync(() -> {
+                while (System.currentTimeMillis() < stopTime)
+                    doSleep(1000);
+
+                stop.set(true);
+            });
+
             while (System.currentTimeMillis() < stopTime && !stop.get()) {
                 Thread.sleep(1000);
 
@@ -1407,8 +1414,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
 
                             Ignite srv = startGrid(idx);
 
+                            cache0 = new TestCache(srv.cache(DEFAULT_CACHE_NAME));
+
                             synchronized (caches) {
-                                caches.set(idx, new TestCache(srv.cache(DEFAULT_CACHE_NAME)));
+                                caches.set(idx, cache0);
                             }
 
                             awaitPartitionMapExchange();
@@ -1422,8 +1431,6 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
                 }
             }
 
-            stop.set(true);
-
             Exception ex = null;
 
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 817c56b..1f42694 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -2344,7 +2344,7 @@ public abstract class GridAbstractTest extends TestCase {
             U.sleep(millis);
         }
         catch (Exception e) {
-            throw new IgniteException();
+            throw new IgniteException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
index 998cb76..894e4bb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccBackupsAbstractTest.java
@@ -182,6 +182,8 @@ public abstract class CacheMvccBackupsAbstractTest extends CacheMvccAbstractTest
      * @throws Exception If failed.
      */
     public void testBackupsCoherenceWithLargeOperations() throws Exception {
+        fail("https://issues.apache.org/jira/browse/IGNITE-10104");
+
         disableScheduledVacuum = true;
 
         ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 1, 10)

http://git-wip-us.apache.org/repos/asf/ignite/blob/a8fc7af0/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
index b347497..eee270c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccPartitionedSqlCoordinatorFailoverTest.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cache.CacheMode;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SCAN;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.ReadMode.SQL;
 import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.DML;
-import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccAbstractTest.WriteMode.PUT;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
 
@@ -64,14 +63,6 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
     /**
      * @throws Exception If failed.
      */
-    public void testPutAllGetAll_ClientServer_Backups1_Restart_Scan() throws Exception {
-        // TODO add tests with RESTART_RND_SRV https://issues.apache.org/jira/browse/IGNITE-9928
-        putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 2, 2, 64, /*new InitIndexing(Integer.class, Integer.class)*/ null, SCAN, PUT);
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testPutAllGetAll_ClientServer_Backups1_RestartCoordinator_ScanDml_Persistence() throws Exception {
         persistence = true;
 
@@ -100,6 +91,46 @@ public class CacheMvccPartitionedSqlCoordinatorFailoverTest extends CacheMvccAbs
     /**
      * @throws Exception If failed.
      */
+    public void testPutAllGetAll_ClientServer_Backups1_RestartRandomSrv_SqlDml() throws Exception {
+        putAllGetAll(RestartMode.RESTART_RND_SRV, 3, 1, 1, DFLT_PARTITION_COUNT,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups2_RestartRandomSrv_SqlDml() throws Exception {
+        putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 1, 2, DFLT_PARTITION_COUNT,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_Server_Backups2_RestartRandomSrv_SqlDml() throws Exception {
+        putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 0, 2, DFLT_PARTITION_COUNT,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_Server_Backups1_SinglePartition_RestartRandomSrv_SqlDml() throws Exception {
+        putAllGetAll(RestartMode.RESTART_RND_SRV, 4, 0, 1, 1,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllGetAll_ClientServer_Backups1_SinglePartition_RestartRandomSrv_SqlDml() throws Exception {
+        putAllGetAll(RestartMode.RESTART_RND_SRV, 3, 1, 1, 1,
+            new InitIndexing(Integer.class, Integer.class), SQL, DML);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testUpdate_N_Objects_ClientServer_Backups2_Sql() throws Exception {
         updateNObjectsTest(7, 3, 2, 2, DFLT_PARTITION_COUNT, DFLT_TEST_TIME,
             new InitIndexing(Integer.class, Integer.class), SQL, DML, RestartMode.RESTART_CRD);