You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by is...@apache.org on 2018/10/19 14:40:33 UTC

[1/2] ignite git commit: IGNITE-5935: MVCC TX: Tx recovery protocol

Repository: ignite
Updated Branches:
  refs/heads/master 82d2efe1a -> 5939a9476


http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
new file mode 100644
index 0000000..f7fe9cb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ * Represents partition update counters delivery to remote nodes.
+ */
+public class PartitionCountersNeighborcastFuture extends GridCacheCompoundIdentityFuture<Void> {
+    /** */
+    private final IgniteUuid futId = IgniteUuid.randomUuid();
+    /** */
+    private boolean trackable = true;
+    /** */
+    private final GridCacheSharedContext<?, ?> cctx;
+    /** */
+    private final IgniteInternalTx tx;
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    public PartitionCountersNeighborcastFuture(
+        IgniteInternalTx tx, GridCacheSharedContext<?, ?> cctx) {
+        super(null);
+
+        this.tx = tx;
+
+        this.cctx = cctx;
+
+        log = cctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY);
+    }
+
+    /**
+     * Starts processing.
+     */
+    public void init() {
+        if (log.isInfoEnabled()) {
+            log.info("Starting delivery partition countres to remote nodes [txId=" + tx.nearXidVersion() +
+                ", futId=" + futId);
+        }
+
+        HashSet<UUID> siblings = siblingBackups();
+
+        cctx.mvcc().addFuture(this, futId);
+
+        for (UUID peer : siblings) {
+            List<PartitionUpdateCountersMessage> cntrs = cctx.tm().txHandler()
+                .filterUpdateCountersForBackupNode(tx, cctx.node(peer));
+
+            if (F.isEmpty(cntrs))
+                continue;
+
+            MiniFuture miniFut = new MiniFuture(peer);
+
+            try {
+                cctx.io().send(peer, new PartitionCountersNeighborcastRequest(cntrs, futId), SYSTEM_POOL);
+
+                add(miniFut);
+            }
+            catch (IgniteCheckedException e) {
+                if (!(e instanceof ClusterTopologyCheckedException))
+                    log.warning("Failed to send partition counters to remote node [node=" + peer + ']', e);
+                else
+                    logNodeLeft(peer);
+
+                miniFut.onDone();
+            }
+        }
+
+        markInitialized();
+    }
+
+    /** */
+    private HashSet<UUID> siblingBackups() {
+        Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+        assert txNodes != null;
+
+        UUID locNodeId = cctx.localNodeId();
+
+        HashSet<UUID> siblings = new HashSet<>();
+
+        txNodes.values().stream()
+            .filter(backups -> backups.contains(locNodeId))
+            .forEach(siblings::addAll);
+
+        siblings.remove(locNodeId);
+
+        return siblings;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+        boolean comp = super.onDone(res, err);
+
+        if (comp)
+            cctx.mvcc().removeFuture(futId);
+
+        return comp;
+    }
+
+    /**
+     * Processes a response from a remote peer. Completes a mini future for that peer.
+     *
+     * @param nodeId Remote peer node id.
+     */
+    public void onResult(UUID nodeId) {
+        if (log.isInfoEnabled())
+            log.info("Remote peer acked partition counters delivery [futId=" + futId +
+                ", node=" + nodeId + ']');
+
+        completeMini(nodeId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onNodeLeft(UUID nodeId) {
+        logNodeLeft(nodeId);
+
+        // if a left node is one of remote peers then a mini future for it is completed successfully
+        completeMini(nodeId);
+
+        return true;
+    }
+
+    /** */
+    private void completeMini(UUID nodeId) {
+        for (IgniteInternalFuture<?> fut : futures()) {
+            assert fut instanceof MiniFuture;
+
+            MiniFuture mini = (MiniFuture)fut;
+
+            if (mini.nodeId.equals(nodeId)) {
+                cctx.kernalContext().closure().runLocalSafe(mini::onDone);
+
+                break;
+            }
+        }
+    }
+
+    /** */
+    private void logNodeLeft(UUID nodeId) {
+        if (log.isInfoEnabled()) {
+            log.info("Failed during partition counters delivery to remote node. " +
+                "Node left cluster (will ignore) [futId=" + futId +
+                ", node=" + nodeId + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean trackable() {
+        return trackable;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markNotTrackable() {
+        trackable = false;
+    }
+
+    /**
+     * Component of compound parent future. Represents interaction with one of remote peers.
+     */
+    private static class MiniFuture extends GridFutureAdapter<Void> {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private MiniFuture(UUID nodeId) {
+            this.nodeId = nodeId;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
index e1a0bd6..550ec09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCounters.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Values which should be tracked during transaction execution and applied on commit.
@@ -69,7 +70,7 @@ public class TxCounters {
     /**
      * @return Final update counters.
      */
-    public Collection<PartitionUpdateCountersMessage> updateCounters() {
+    @Nullable public Collection<PartitionUpdateCountersMessage> updateCounters() {
         return updCntrs;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 4569f65..fbfd99b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -1423,7 +1423,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                 if (cache != null && !cache.isLocal() && cache.context().userCache())
                     req.addUpdateCounters(ctx.localNodeId(),
-                        toCountersMap(cache.context().topology().localUpdateCounters(false)));
+                        toCountersMap(cache.context().topology().localUpdateCounters(false, false)));
             }
         }
 
@@ -1564,7 +1564,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                                 if (cache != null && !cache.isLocal() && cache.context().userCache()) {
                                     CachePartitionPartialCountersMap cntrsMap =
-                                        cache.context().topology().localUpdateCounters(false);
+                                        cache.context().topology().localUpdateCounters(false, false);
 
                                     cntrs = U.marshal(marsh, cntrsMap);
                                 }
@@ -2504,7 +2504,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
                         if (cctx != null && cntrsPerNode != null && !cctx.isLocal() && cctx.affinityNode())
                             cntrsPerNode.put(ctx.localNodeId(),
-                                toCountersMap(cctx.topology().localUpdateCounters(false)));
+                                toCountersMap(cctx.topology().localUpdateCounters(false, false)));
 
                         routine.handler().updateCounters(topVer, cntrsPerNode, cntrs);
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 8bdfafe..a7880a2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -314,4 +314,4 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
     private boolean ignoredMessage(GridIoMessage msg) {
         return ignoreMsgCls != null && ignoreMsgCls.isAssignableFrom(msg.message().getClass());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
index 7514555..3f55e9c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest.java
@@ -23,10 +23,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -40,6 +42,8 @@ import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
@@ -58,6 +62,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
 
 /**
@@ -268,8 +273,10 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
 
                         assertNotNull(cache);
 
-                        assertEquals("Failed to check entry value on node: " + checkNodeId,
-                            fullFailure ? initVal : val, cache.localPeek(key));
+                        if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) {
+                            assertEquals("Failed to check entry value on node: " + checkNodeId,
+                                fullFailure ? initVal : val, cache.localPeek(key));
+                        }
 
                         return null;
                     }
@@ -278,8 +285,22 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
         }
 
         for (Map.Entry<Integer, String> e : map.entrySet()) {
-            for (Ignite g : G.allGrids())
-                assertEquals(fullFailure ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(e.getKey()));
+            long cntr0 = -1;
+
+            for (Ignite g : G.allGrids()) {
+                Integer key = e.getKey();
+
+                assertEquals(fullFailure ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(key));
+
+                if (g.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(((IgniteEx)g).localNode(), key)) {
+                    long nodeCntr = updateCoutner(g, key);
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertEquals(cntr0, nodeCntr);
+                }
+            }
         }
     }
 
@@ -402,6 +423,9 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
 
             assertFalse(e.getValue().isEmpty());
 
+            if (atomicityMode() == TRANSACTIONAL_SNAPSHOT)
+                continue;
+
             for (ClusterNode node : e.getValue()) {
                 final UUID checkNodeId = node.id();
 
@@ -425,8 +449,22 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
         }
 
         for (Map.Entry<Integer, String> e : map.entrySet()) {
-            for (Ignite g : G.allGrids())
-                assertEquals(!commmit ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(e.getKey()));
+            long cntr0 = -1;
+
+            for (Ignite g : G.allGrids()) {
+                Integer key = e.getKey();
+
+                assertEquals(!commmit ? initVal : e.getValue(), g.cache(DEFAULT_CACHE_NAME).get(key));
+
+                if (g.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(((IgniteEx)g).localNode(), key)) {
+                    long nodeCntr = updateCoutner(g, key);
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertEquals(cntr0, nodeCntr);
+                }
+            }
         }
     }
 
@@ -529,4 +567,21 @@ public abstract class IgniteTxPessimisticOriginatingNodeFailureAbstractSelfTest
         else
             return false;
     }
-}
\ No newline at end of file
+
+    /** */
+    private static long updateCoutner(Ignite ign, Object key) {
+        return dataStore(((IgniteEx)ign).cachex(DEFAULT_CACHE_NAME).context(), key)
+            .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter)
+            .orElse(0L);
+    }
+
+    /** */
+    private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(
+        GridCacheContext<?, ?> cctx, Object key) {
+        int p = cctx.affinity().partition(key);
+
+        return StreamSupport.stream(cctx.offheap().cacheDataStores().spliterator(), false)
+            .filter(ds -> ds.partId() == p)
+            .findFirst();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
index 07bbf6c..81d4796 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedTxOriginatingNodeFailureSelfTest.java
@@ -148,4 +148,4 @@ public class GridCachePartitionedTxOriginatingNodeFailureSelfTest extends
 
         testTxOriginatingNodeFails(keys, false);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
index 23304a4..bb3fff0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
@@ -34,4 +34,4 @@ public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
 
         return ccfg;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
index 00f9729..b0d083d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.StreamSupport;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -33,10 +36,13 @@ import org.apache.ignite.cache.affinity.Affinity;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -55,6 +61,7 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.transactions.Transaction;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.internal.processors.cache.ExchangeContext.IGNITE_EXCHANGE_COMPATIBILITY_VER_1;
 import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
@@ -114,6 +121,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(false, false, true);
     }
 
@@ -121,6 +130,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(true, false, true);
     }
 
@@ -128,6 +139,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRollback1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(false, true, true);
     }
 
@@ -135,8 +148,11 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryNodeFailureRollback2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryNodeFailure(true, true, true);
     }
+
     /**
      * @throws Exception If failed.
      */
@@ -245,8 +261,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         GridTestUtils.waitForCondition(new GridAbsPredicate() {
             @Override public boolean apply() {
                 try {
-                    checkKey(key1, rollback ? null : key1Nodes);
-                    checkKey(key2, rollback ? null : key2Nodes);
+                    checkKey(key1, rollback, key1Nodes, 0);
+                    checkKey(key2, rollback, key2Nodes, 0);
 
                     return true;
                 }
@@ -258,14 +274,16 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             }
         }, 5000);
 
-        checkKey(key1, rollback ? null : key1Nodes);
-        checkKey(key2, rollback ? null : key2Nodes);
+        checkKey(key1, rollback, key1Nodes, 0);
+        checkKey(key2, rollback, key2Nodes, 0);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(false, false, true);
     }
 
@@ -273,6 +291,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(true, false, true);
     }
 
@@ -280,6 +300,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRollback1() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(false, true, true);
     }
 
@@ -287,6 +309,8 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
      * @throws Exception If failed.
      */
     public void testOptimisticPrimaryAndOriginatingNodeFailureRollback2() throws Exception {
+        if (atomicityMode() == TRANSACTIONAL_SNAPSHOT) return;
+
         primaryAndOriginatingNodeFailure(true, true, true);
     }
 
@@ -327,14 +351,14 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
     private void primaryAndOriginatingNodeFailure(final boolean locBackupKey,
         final boolean rollback,
         boolean optimistic)
-        throws Exception
-    {
+        throws Exception {
         // TODO IGNITE-6174: when exchanges can be merged test fails because of IGNITE-6174.
         System.setProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1, "true");
 
         try {
-            IgniteCache<Integer, Integer> cache0 = jcache(0);
-            IgniteCache<Integer, Integer> cache2 = jcache(2);
+            int orig = 0;
+
+            IgniteCache<Integer, Integer> origCache = jcache(orig);
 
             Affinity<Integer> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
 
@@ -342,7 +366,7 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
             for (int key = 0; key < 10_000; key++) {
                 if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
-                    if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+                    if (locBackupKey == aff.isBackup(ignite(orig).cluster().localNode(), key)) {
                         key0 = key;
 
                         break;
@@ -353,27 +377,27 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
             assertNotNull(key0);
 
             final Integer key1 = key0;
-            final Integer key2 = primaryKey(cache2);
+            final Integer key2 = primaryKey(jcache(2));
 
-            int backups = cache0.getConfiguration(CacheConfiguration.class).getBackups();
+            int backups = origCache.getConfiguration(CacheConfiguration.class).getBackups();
 
             final Collection<ClusterNode> key1Nodes =
-                (locBackupKey && backups < 2) ? null : aff.mapKeyToPrimaryAndBackups(key1);
+                (locBackupKey && backups < 2) ? Collections.emptyList() : aff.mapKeyToPrimaryAndBackups(key1);
             final Collection<ClusterNode> key2Nodes = aff.mapKeyToPrimaryAndBackups(key2);
 
-            TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+            TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(orig).configuration().getCommunicationSpi();
 
-            IgniteTransactions txs = ignite(0).transactions();
+            IgniteTransactions txs = ignite(orig).transactions();
 
             Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
 
             log.info("Put key1 [key1=" + key1 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key1)) + ']');
 
-            cache0.put(key1, key1);
+            origCache.put(key1, key1);
 
             log.info("Put key2 [key2=" + key2 + ", nodes=" + U.nodeIds(aff.mapKeyToPrimaryAndBackups(key2)) + ']');
 
-            cache0.put(key2, key2);
+            origCache.put(key2, key2);
 
             log.info("Start prepare.");
 
@@ -399,13 +423,13 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
             log.info("Stop originating node.");
 
-            stopGrid(0);
+            stopGrid(orig);
 
             GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {
                     try {
-                        checkKey(key1, rollback ? null : key1Nodes);
-                        checkKey(key2, rollback ? null : key2Nodes);
+                        checkKey(key1, rollback, key1Nodes, 0);
+                        checkKey(key2, rollback, key2Nodes, 0);
 
                         return true;
                     } catch (AssertionError e) {
@@ -416,24 +440,23 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
                 }
             }, 5000);
 
-            checkKey(key1, rollback ? null : key1Nodes);
-            checkKey(key2, rollback ? null : key2Nodes);
+            checkKey(key1, rollback, key1Nodes, 0);
+            checkKey(key2, rollback, key2Nodes, 0);
         }
         finally {
             System.clearProperty(IGNITE_EXCHANGE_COMPATIBILITY_VER_1);
         }
     }
 
-    /**
-     * @param key Key.
-     * @param keyNodes Key nodes.
-     */
-    private void checkKey(Integer key, Collection<ClusterNode> keyNodes) {
-        if (keyNodes == null) {
-            for (Ignite ignite : G.allGrids()) {
-                IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
+    /** */
+    private void checkKey(Integer key, boolean rollback, Collection<ClusterNode> keyNodes, long initUpdCntr) {
+        if (rollback) {
+            if (atomicityMode() != TRANSACTIONAL_SNAPSHOT) {
+                for (Ignite ignite : G.allGrids()) {
+                    IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
-                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+                    assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+                }
             }
 
             for (Ignite ignite : G.allGrids()) {
@@ -441,10 +464,34 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
 
                 assertNull("Unexpected value for: " + ignite.name(), cache.get(key));
             }
+
+            boolean found = keyNodes.isEmpty();
+
+            long cntr0 = -1;
+
+            for (ClusterNode node : keyNodes) {
+                try {
+                    long nodeCntr = updateCoutner(grid(node), key);
+
+                    found = true;
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertEquals(cntr0, nodeCntr);
+                }
+                catch (IgniteIllegalStateException ignore) {
+                    // No-op.
+                }
+            }
+
+            assertTrue("Failed to find key node.", found);
         }
-        else {
+        else if (!keyNodes.isEmpty()) {
             boolean found = false;
 
+            long cntr0 = -1;
+
             for (ClusterNode node : keyNodes) {
                 try {
                     Ignite ignite = grid(node);
@@ -454,6 +501,13 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
                     ignite.cache(DEFAULT_CACHE_NAME);
 
                     assertEquals("Unexpected value for: " + ignite.name(), key, key);
+
+                    long nodeCntr = updateCoutner(ignite, key);
+
+                    if (cntr0 == -1)
+                        cntr0 = nodeCntr;
+
+                    assertTrue(nodeCntr == cntr0 && nodeCntr > initUpdCntr);
                 }
                 catch (IgniteIllegalStateException ignore) {
                     // No-op.
@@ -498,6 +552,23 @@ public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends
         assertTrue("Failed to wait for tx.", wait);
     }
 
+    /** */
+    private static long updateCoutner(Ignite ign, Object key) {
+        return dataStore(((IgniteEx)ign).cachex(DEFAULT_CACHE_NAME).context(), key)
+            .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter)
+            .orElse(0L);
+    }
+
+    /** */
+    private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(
+        GridCacheContext<?, ?> cctx, Object key) {
+        int p = cctx.affinity().partition(key);
+
+        return StreamSupport.stream(cctx.offheap().cacheDataStores().spliterator(), false)
+            .filter(ds -> ds.partId() == p)
+            .findFirst();
+    }
+
     /**
      *
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
index 79308c8..8730c5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedTxOriginatingNodeFailureSelfTest.java
@@ -35,4 +35,4 @@ public class GridCacheReplicatedTxOriginatingNodeFailureSelfTest extends
     @Override protected Class<?> ignoreMessageClass() {
         return GridDistributedTxPrepareRequest.class;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 35da7a4..ca3c09f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -74,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestQueryC
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTx;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridInClosure3;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -3181,7 +3181,7 @@ public class CacheMvccTransactionsTest extends CacheMvccAbstractTest {
         MvccProcessorImpl crd = mvccProcessor(node);
 
         // Start query to prevent cleanup.
-        IgniteInternalFuture<MvccSnapshot> fut = crd.requestSnapshotAsync();
+        IgniteInternalFuture<MvccSnapshot> fut = crd.requestSnapshotAsync((IgniteInternalTx)null);
 
         fut.get();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 91c702e..0fef7b2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -521,7 +521,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
             Affinity<Object> aff = grid(i).affinity(DEFAULT_CACHE_NAME);
 
             CachePartitionPartialCountersMap act = grid(i).cachex(DEFAULT_CACHE_NAME).context().topology()
-                .localUpdateCounters(false);
+                .localUpdateCounters(false, false);
 
             for (Map.Entry<Integer, Long> e : updCntrs.entrySet()) {
                 if (aff.mapPartitionToPrimaryAndBackups(e.getKey()).contains(grid(i).localNode())) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index a2c6c83..0cdd0c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -61,4 +61,4 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
 
         return suite;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java
new file mode 100644
index 0000000..01f50cc
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTxRecoveryTest.java
@@ -0,0 +1,654 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.IntPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.CLIENT;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.NodeMode.SERVER;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.COMMIT;
+import static org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest.TxEndResult.ROLLBAK;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
+
+/** */
+public class CacheMvccTxRecoveryTest extends CacheMvccAbstractTest {
+    /** */
+    public enum TxEndResult {
+        /** */ COMMIT,
+        /** */ ROLLBAK
+    }
+
+    /** */
+    public enum NodeMode {
+        /** */ SERVER,
+        /** */ CLIENT
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        throw new RuntimeException("Is not supposed to be used");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitNearFailure1() throws Exception {
+        checkRecoveryNearFailure(COMMIT, CLIENT);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitNearFailure2() throws Exception {
+        checkRecoveryNearFailure(COMMIT, SERVER);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackNearFailure1() throws Exception {
+        checkRecoveryNearFailure(ROLLBAK, CLIENT);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackNearFailure2() throws Exception {
+        checkRecoveryNearFailure(ROLLBAK, SERVER);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitPrimaryFailure1() throws Exception {
+        checkRecoveryPrimaryFailure(COMMIT, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackPrimaryFailure1() throws Exception {
+        checkRecoveryPrimaryFailure(ROLLBAK, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommitPrimaryFailure2() throws Exception {
+        checkRecoveryPrimaryFailure(COMMIT, true);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryRollbackPrimaryFailure2() throws Exception {
+        checkRecoveryPrimaryFailure(ROLLBAK, true);
+    }
+
+    /** */
+    private void checkRecoveryNearFailure(TxEndResult endRes, NodeMode nearNodeMode) throws Exception {
+        int gridCnt = 4;
+        int baseCnt = gridCnt - 1;
+
+        boolean commit = endRes == COMMIT;
+
+        startGridsMultiThreaded(baseCnt);
+
+        // tweak client/server near
+        client = nearNodeMode == CLIENT;
+
+        IgniteEx nearNode = startGrid(baseCnt);
+
+        IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg()
+            .setBackups(1));
+
+        Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME);
+
+        List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        assert keys.size() == 2;
+
+        TestRecordingCommunicationSpi nearComm
+            = (TestRecordingCommunicationSpi)nearNode.configuration().getCommunicationSpi();
+
+        if (!commit)
+            nearComm.blockMessages(GridNearTxPrepareRequest.class, grid(1).name());
+
+        GridTestUtils.runAsync(() -> {
+            // run in separate thread to exclude tx from thread-local map
+            GridNearTxLocal nearTx
+                = ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
+
+            for (Integer k : keys)
+                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+            List<IgniteInternalTx> txs = IntStream.range(0, baseCnt)
+                .mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion()))
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+
+            IgniteInternalFuture<?> prepareFut = nearTx.prepareNearTxLocal();
+
+            if (commit)
+                prepareFut.get();
+            else
+                assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
+
+            // drop near
+            nearNode.close();
+
+            assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK)));
+
+            return null;
+        }).get();
+
+        if (commit) {
+            assertConditionEventually(() -> {
+                int rowsCnt = grid(0).cache(DEFAULT_CACHE_NAME)
+                    .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+                return rowsCnt == keys.size();
+            });
+        }
+        else {
+            int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+            assertEquals(0, rowsCnt);
+        }
+
+        assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> true));
+    }
+
+    /** */
+    private void checkRecoveryPrimaryFailure(TxEndResult endRes, boolean mvccCrd) throws Exception {
+        int gridCnt = 4;
+        int baseCnt = gridCnt - 1;
+
+        boolean commit = endRes == COMMIT;
+
+        startGridsMultiThreaded(baseCnt);
+
+        client = true;
+
+        IgniteEx nearNode = startGrid(baseCnt);
+
+        IgniteCache<Object, Object> cache = nearNode.getOrCreateCache(basicCcfg()
+            .setBackups(1));
+
+        Affinity<Object> aff = nearNode.affinity(DEFAULT_CACHE_NAME);
+
+        List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(0).localNode(), i) && aff.isBackup(grid(1).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(grid(1).localNode(), i) && aff.isBackup(grid(2).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        assert keys.size() == 2;
+
+        int victim, victimBackup;
+
+        if (mvccCrd) {
+            victim = 0;
+            victimBackup = 1;
+        }
+        else {
+            victim = 1;
+            victimBackup = 2;
+        }
+
+        TestRecordingCommunicationSpi victimComm = (TestRecordingCommunicationSpi)grid(victim).configuration().getCommunicationSpi();
+
+        if (commit)
+            victimComm.blockMessages(GridNearTxFinishResponse.class, nearNode.name());
+        else
+            victimComm.blockMessages(GridDhtTxPrepareRequest.class, grid(victimBackup).name());
+
+        GridNearTxLocal nearTx
+            = ((TransactionProxyImpl)nearNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
+
+        for (Integer k : keys)
+            cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+        List<IgniteInternalTx> txs = IntStream.range(0, baseCnt)
+            .filter(i -> i != victim)
+            .mapToObj(i -> txsOnNode(grid(i), nearTx.xidVersion()))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+
+        IgniteInternalFuture<IgniteInternalTx> commitFut = nearTx.commitAsync();
+
+        if (commit)
+            assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == COMMITTED));
+        else
+            assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
+
+        // drop victim
+        grid(victim).close();
+
+        awaitPartitionMapExchange();
+
+        assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == (commit ? COMMITTED : ROLLED_BACK)));
+
+        assert victimComm.hasBlockedMessages();
+
+        if (commit) {
+            assertConditionEventually(() -> {
+                int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
+                    .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+                return rowsCnt == keys.size();
+            });
+        }
+        else {
+            int rowsCnt = G.allGrids().get(0).cache(DEFAULT_CACHE_NAME)
+                .query(new SqlFieldsQuery("select * from Integer")).getAll().size();
+
+            assertEquals(0, rowsCnt);
+        }
+
+        assertTrue(commitFut.isDone());
+
+        assertPartitionCountersAreConsistent(keys, grids(baseCnt, i -> i != victim));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testRecoveryCommit() throws Exception {
+        startGridsMultiThreaded(2);
+
+        client = true;
+
+        IgniteEx ign = startGrid(2);
+
+        IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg());
+
+        AtomicInteger keyCntr = new AtomicInteger();
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        ign.cluster().forServers().nodes()
+            .forEach(node -> keys.add(keyForNode(ign.affinity(DEFAULT_CACHE_NAME), keyCntr, node)));
+
+        GridTestUtils.runAsync(() -> {
+            // run in separate thread to exclude tx from thread-local map
+            Transaction tx = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+            for (Integer k : keys)
+                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+            ((TransactionProxyImpl)tx).tx().prepareNearTxLocal().get();
+
+            return null;
+        }).get();
+
+        // drop near
+        stopGrid(2, true);
+
+        IgniteEx srvNode = grid(0);
+
+        assertConditionEventually(
+            () -> srvNode.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll().size() == 2
+        );
+
+        assertPartitionCountersAreConsistent(keys, G.allGrids());
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testCountersNeighborcastServerFailed() throws Exception {
+        int srvCnt = 4;
+
+        startGridsMultiThreaded(srvCnt);
+
+        client = true;
+
+        IgniteEx ign = startGrid(srvCnt);
+
+        IgniteCache<Object, Object> cache = ign.getOrCreateCache(basicCcfg()
+            .setBackups(2));
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        int vid = 3;
+
+        IgniteEx victim = grid(vid);
+
+        Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(0).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        for (int i = 0; i < 100; i++) {
+            if (aff.isPrimary(victim.localNode(), i) && !aff.isBackup(grid(1).localNode(), i)) {
+                keys.add(i);
+                break;
+            }
+        }
+
+        assert keys.size() == 2 && !keys.contains(99);
+
+        // prevent prepare on one backup
+        ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi())
+            .blockMessages(GridDhtTxPrepareRequest.class, grid(0).name());
+
+        GridNearTxLocal nearTx = ((TransactionProxyImpl)ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)).tx();
+
+        for (Integer k : keys)
+            cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(k));
+
+        List<IgniteInternalTx> txs = IntStream.range(0, srvCnt)
+            .mapToObj(this::grid)
+            .filter(g -> g != victim)
+            .map(g -> txsOnNode(g, nearTx.xidVersion()))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+
+        nearTx.commitAsync();
+
+        // await tx partially prepared
+        assertConditionEventually(() -> txs.stream().anyMatch(tx -> tx.state() == PREPARED));
+
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CountDownLatch latch2 = new CountDownLatch(1);
+
+        IgniteInternalFuture<Object> backgroundTxFut = GridTestUtils.runAsync(() -> {
+            try (Transaction ignored = ign.transactions().txStart()) {
+                boolean upd = false;
+
+                for (int i = 100; i < 200; i++) {
+                    if (!aff.isPrimary(victim.localNode(), i)) {
+                        cache.put(i, 11);
+                        upd = true;
+                        break;
+                    }
+                }
+
+                assert upd;
+
+                latch1.countDown();
+
+                latch2.await();
+            }
+
+            return null;
+        });
+
+        latch1.await();
+
+        // drop primary
+        victim.close();
+
+        // do all assertions before rebalance
+        assertConditionEventually(() -> txs.stream().allMatch(tx -> tx.state() == ROLLED_BACK));
+
+        List<IgniteEx> liveNodes = grids(srvCnt, i -> i != vid);
+
+        assertPartitionCountersAreConsistent(keys, liveNodes);
+
+        latch2.countDown();
+
+        backgroundTxFut.get();
+
+        assertTrue(liveNodes.stream()
+            .map(node -> node.cache(DEFAULT_CACHE_NAME).query(new SqlFieldsQuery("select * from Integer")).getAll())
+            .allMatch(Collection::isEmpty));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testUpdateCountersGapIsClosed() throws Exception {
+        int srvCnt = 3;
+
+        startGridsMultiThreaded(srvCnt);
+
+        client = true;
+
+        IgniteEx ign = startGrid(srvCnt);
+
+        IgniteCache<Object, Object> cache = ign.getOrCreateCache(
+            basicCcfg().setBackups(2));
+
+        int vid = 1;
+
+        IgniteEx victim = grid(vid);
+
+        ArrayList<Integer> keys = new ArrayList<>();
+
+        Integer part = null;
+
+        Affinity<Object> aff = ign.affinity(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 2000; i++) {
+            int p = aff.partition(i);
+            if (aff.isPrimary(victim.localNode(), i)) {
+                if (part == null) part = p;
+                if (p == part) keys.add(i);
+                if (keys.size() == 2) break;
+            }
+        }
+
+        assert keys.size() == 2;
+
+        Transaction txA = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ);
+
+        // prevent first transaction prepare on backups
+        ((TestRecordingCommunicationSpi)victim.configuration().getCommunicationSpi())
+            .blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+                final AtomicInteger limiter = new AtomicInteger();
+
+                @Override public boolean apply(ClusterNode node, Message msg) {
+                    if (msg instanceof GridDhtTxPrepareRequest)
+                        return limiter.getAndIncrement() < 2;
+
+                    return false;
+                }
+            });
+
+        cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(0)));
+
+        txA.commitAsync();
+
+        GridCacheVersion aXidVer = ((TransactionProxyImpl)txA).tx().xidVersion();
+
+        assertConditionEventually(() -> txsOnNode(victim, aXidVer).stream()
+            .anyMatch(tx -> tx.state() == PREPARING));
+
+        GridTestUtils.runAsync(() -> {
+            try (Transaction txB = ign.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                cache.query(new SqlFieldsQuery("insert into Integer(_key, _val) values(?, 42)").setArgs(keys.get(1)));
+
+                txB.commit();
+            }
+        }).get();
+
+        long victimUpdCntr = updateCounter(victim.cachex(DEFAULT_CACHE_NAME).context(), keys.get(0));
+
+        List<IgniteEx> backupNodes = grids(srvCnt, i -> i != vid);
+
+        List<IgniteInternalTx> backupTxsA = backupNodes.stream()
+            .map(node -> txsOnNode(node, aXidVer))
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+
+        // drop primary
+        victim.close();
+
+        assertConditionEventually(() -> backupTxsA.stream().allMatch(tx -> tx.state() == ROLLED_BACK));
+
+        backupNodes.stream()
+            .map(node -> node.cache(DEFAULT_CACHE_NAME))
+            .forEach(c -> {
+                assertEquals(1, c.query(new SqlFieldsQuery("select * from Integer")).getAll().size());
+            });
+
+        backupNodes.forEach(node -> {
+            for (Integer k : keys)
+                assertEquals(victimUpdCntr, updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), k));
+        });
+    }
+
+    /** */
+    private static CacheConfiguration<Object, Object> basicCcfg() {
+        return new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setAtomicityMode(TRANSACTIONAL_SNAPSHOT)
+            .setCacheMode(PARTITIONED)
+            .setIndexedTypes(Integer.class, Integer.class);
+    }
+
+    /** */
+    private static List<IgniteInternalTx> txsOnNode(IgniteEx node, GridCacheVersion xidVer) {
+        List<IgniteInternalTx> txs = node.context().cache().context().tm().activeTransactions().stream()
+            .peek(tx -> assertEquals(xidVer, tx.nearXidVersion()))
+            .collect(Collectors.toList());
+
+        assert !txs.isEmpty();
+
+        return txs;
+    }
+
+    /** */
+    private static void assertConditionEventually(GridAbsPredicate p)
+        throws IgniteInterruptedCheckedException {
+        if (!GridTestUtils.waitForCondition(p, 5_000))
+            fail();
+    }
+
+    /** */
+    private List<IgniteEx> grids(int cnt, IntPredicate p) {
+        return IntStream.range(0, cnt).filter(p).mapToObj(this::grid).collect(Collectors.toList());
+    }
+
+    /** */
+    private void assertPartitionCountersAreConsistent(Iterable<Integer> keys, Iterable<? extends Ignite> nodes) {
+        for (Integer key : keys) {
+            long cntr0 = -1;
+
+            for (Ignite n : nodes) {
+                IgniteEx node = ((IgniteEx)n);
+
+                if (node.affinity(DEFAULT_CACHE_NAME).isPrimaryOrBackup(node.localNode(), key)) {
+                    long cntr = updateCounter(node.cachex(DEFAULT_CACHE_NAME).context(), key);
+//                    System.err.println(node.localNode().consistentId() + " " + key + " -> " + cntr);
+                    if (cntr0 == -1)
+                        cntr0 = cntr;
+
+                    assertEquals(cntr0, cntr);
+                }
+            }
+        }
+    }
+
+    /** */
+    private static long updateCounter(GridCacheContext<?, ?> cctx, Object key) {
+        return dataStore(cctx, key)
+            .map(IgniteCacheOffheapManager.CacheDataStore::updateCounter)
+            .get();
+    }
+
+    /** */
+    private static Optional<IgniteCacheOffheapManager.CacheDataStore> dataStore(
+        GridCacheContext<?, ?> cctx, Object key) {
+        int p = cctx.affinity().partition(key);
+        IgniteCacheOffheapManager offheap = cctx.offheap();
+        return StreamSupport.stream(offheap.cacheDataStores().spliterator(), false)
+            .filter(ds -> ds.partId() == p)
+            .findFirst();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
index cf68546..a0d492c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexRebuildWithMvccEnabledSelfTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -84,7 +85,7 @@ public class GridIndexRebuildWithMvccEnabledSelfTest extends GridIndexRebuildSel
      * @throws IgniteCheckedException if failed.
      */
     private static void lockVersion(IgniteEx node) throws IgniteCheckedException {
-        node.context().coordinators().requestSnapshotAsync().get();
+        node.context().coordinators().requestSnapshotAsync((IgniteInternalTx)null).get();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
index ce2a130..15045c9 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccSqlTestSuite.java
@@ -18,6 +18,12 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest;
 import org.apache.ignite.internal.processors.cache.index.MvccEmptyTransactionSelfTest;
 import org.apache.ignite.internal.processors.cache.index.SqlTransactionsCommandsWithMvccEnabledSelfTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccBasicContinuousQueryTest;
@@ -60,10 +66,13 @@ import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlLockTimeoutT
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccSqlUpdateCountersTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccStreamingInsertTest;
 import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxNodeMappingTest;
+import org.apache.ignite.internal.processors.cache.mvcc.CacheMvccTxRecoveryTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadBulkOpsTest;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccRepeatableReadOperationsTest;
 import org.apache.ignite.internal.processors.query.h2.GridIndexRebuildWithMvccEnabledSelfTest;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+
 /**
  *
  */
@@ -140,6 +149,55 @@ public class IgniteCacheMvccSqlTestSuite extends TestSuite {
         suite.addTestSuite(CacheMvccContinuousWithTransformerPartitionedSelfTest.class);
         suite.addTestSuite(CacheMvccContinuousWithTransformerReplicatedSelfTest.class);
 
+        // Transaction recovery.
+        suite.addTestSuite(CacheMvccTxRecoveryTest.class);
+
+        suite.addTestSuite(MvccPartitionedPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(MvccPartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(MvccColocatedTxPessimisticOriginatingNodeFailureRecoveryTest.class);
+        suite.addTestSuite(MvccReplicatedTxPessimisticOriginatingNodeFailureRecoveryTest.class);
+
         return suite;
     }
+
+    /** */
+    public static class MvccPartitionedPrimaryNodeFailureRecoveryTest
+        extends IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+    }
+
+    /** */
+    public static class MvccPartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
+        extends IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected NearCacheConfiguration nearConfiguration() {
+            return null;
+        }
+    }
+
+    /** */
+    public static class MvccColocatedTxPessimisticOriginatingNodeFailureRecoveryTest
+        extends GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+    }
+
+    /** */
+    public static class MvccReplicatedTxPessimisticOriginatingNodeFailureRecoveryTest
+        extends GridCacheReplicatedTxPessimisticOriginatingNodeFailureSelfTest {
+        /** {@inheritDoc} */
+        @Override protected CacheAtomicityMode atomicityMode() {
+            return TRANSACTIONAL_SNAPSHOT;
+        }
+    }
 }


[2/2] ignite git commit: IGNITE-5935: MVCC TX: Tx recovery protocol

Posted by is...@apache.org.
IGNITE-5935: MVCC TX: Tx recovery protocol

This closes #4920


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5939a947
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5939a947
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5939a947

Branch: refs/heads/master
Commit: 5939a94763c8a3e92b66b3f591a816dd6c49f35a
Parents: 82d2efe
Author: ipavlukhin <vo...@gmail.com>
Authored: Fri Oct 19 17:40:12 2018 +0300
Committer: Igor Sapego <is...@apache.org>
Committed: Fri Oct 19 17:40:12 2018 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |   6 +-
 .../communication/GridIoMessageFactory.java     |  18 +
 .../GridCachePartitionExchangeManager.java      |   4 +-
 .../cache/IgniteCacheOffheapManager.java        |   6 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    |   5 +
 .../cache/PartitionUpdateCounter.java           |  30 +-
 .../distributed/GridCacheTxRecoveryFuture.java  |  11 -
 .../GridDistributedTxRemoteAdapter.java         |  52 +-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   4 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |  38 --
 .../distributed/dht/GridDhtTxPrepareFuture.java |   2 +-
 .../topology/GridClientPartitionTopology.java   |   3 +-
 .../dht/topology/GridDhtLocalPartition.java     |   7 +
 .../dht/topology/GridDhtPartitionTopology.java  |   5 +-
 .../topology/GridDhtPartitionTopologyImpl.java  |   6 +-
 .../processors/cache/mvcc/MvccProcessor.java    |   7 -
 .../cache/mvcc/MvccProcessorImpl.java           | 217 +++++-
 .../processors/cache/mvcc/MvccUtils.java        |   6 +-
 .../mvcc/msg/MvccRecoveryFinishedMessage.java   | 116 ++++
 .../PartitionCountersNeighborcastRequest.java   | 145 ++++
 .../PartitionCountersNeighborcastResponse.java  | 114 ++++
 .../persistence/GridCacheOffheapManager.java    |  13 +
 .../cache/transactions/IgniteTxAdapter.java     |   3 +-
 .../cache/transactions/IgniteTxHandler.java     | 125 ++++
 .../cache/transactions/IgniteTxManager.java     | 123 +++-
 .../PartitionCountersNeighborcastFuture.java    | 211 ++++++
 .../cache/transactions/TxCounters.java          |   3 +-
 .../continuous/GridContinuousProcessor.java     |   6 +-
 ...xOriginatingNodeFailureAbstractSelfTest.java |   2 +-
 ...cOriginatingNodeFailureAbstractSelfTest.java |  69 +-
 ...itionedTxOriginatingNodeFailureSelfTest.java |   2 +-
 ...woBackupsPrimaryNodeFailureRecoveryTest.java |   2 +-
 ...ePrimaryNodeFailureRecoveryAbstractTest.java | 133 +++-
 ...licatedTxOriginatingNodeFailureSelfTest.java |   2 +-
 .../cache/mvcc/CacheMvccTransactionsTest.java   |   4 +-
 ...ContinuousQueryFailoverAbstractSelfTest.java |   2 +-
 .../IgniteCacheTxRecoverySelfTestSuite.java     |   2 +-
 .../cache/mvcc/CacheMvccTxRecoveryTest.java     | 654 +++++++++++++++++++
 ...GridIndexRebuildWithMvccEnabledSelfTest.java |   3 +-
 .../testsuites/IgniteCacheMvccSqlTestSuite.java |  58 ++
 40 files changed, 1980 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 2599d7a..7492e51 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -43,9 +43,7 @@ import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistRequest;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxEnlistResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -171,7 +169,7 @@ public class MessageCodeGenerator {
 
 //        gen.generateAll(true);
 
-        gen.generateAndWrite(GridNearTxEnlistResponse.class);
+        gen.generateAndWrite(MvccRecoveryFinishedMessage.class);
 
 //        gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e405d7d..3f4eb18 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -134,9 +134,12 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccAckRequestTxAndQ
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -1096,6 +1099,21 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
+            case 164:
+                msg = new MvccRecoveryFinishedMessage();
+
+                break;
+
+            case 165:
+                msg = new PartitionCountersNeighborcastRequest();
+
+                break;
+
+            case 166:
+                msg = new PartitionCountersNeighborcastResponse();
+
+                break;
+
                 // [-3..119] [124..129] [-23..-27] [-36..-55]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6af9678..0b8dd75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1405,7 +1405,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     grp.affinity().similarAffinityKey());
 
                 if (sndCounters) {
-                    CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true);
+                    CachePartitionPartialCountersMap cntrsMap = grp.topology().localUpdateCounters(true, true);
 
                     m.addPartitionUpdateCounters(grp.groupId(),
                         newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));
@@ -1429,7 +1429,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 top.similarAffinityKey());
 
             if (sndCounters) {
-                CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true);
+                CachePartitionPartialCountersMap cntrsMap = top.localUpdateCounters(true, true);
 
                 m.addPartitionUpdateCounters(top.groupId(),
                     newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap));

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 044830c..e9ec025 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -1067,8 +1067,12 @@ public interface IgniteCacheOffheapManager {
          * Return PendingTree for data store.
          *
          * @return PendingTree instance.
-         * @throws IgniteCheckedException
          */
         PendingEntriesTree pendingTree();
+
+        /**
+         * Flushes pending update counters closing all possible gaps.
+         */
+        void finalizeUpdateCountres();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e40cc53..e547784 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -1556,6 +1556,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
         }
 
         /** {@inheritDoc} */
+        @Override public void finalizeUpdateCountres() {
+            pCntr.finalizeUpdateCountres();
+        }
+
+        /** {@inheritDoc} */
         @Override public String name() {
             return name;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
index b5960ab..fe44708 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/PartitionUpdateCounter.java
@@ -17,8 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
 import org.jetbrains.annotations.NotNull;
@@ -31,7 +30,7 @@ public class PartitionUpdateCounter {
     private IgniteLogger log;
 
     /** Queue of counter update tasks*/
-    private final Queue<Item> queue = new PriorityQueue<>();
+    private final TreeSet<Item> queue = new TreeSet<>();
 
     /** Counter. */
     private final AtomicLong cntr = new AtomicLong();
@@ -161,21 +160,34 @@ public class PartitionUpdateCounter {
      * @return Retrieves the minimum update counter task from queue.
      */
     private Item poll() {
-        return queue.poll();
+        return queue.pollFirst();
     }
 
     /**
      * @return Checks the minimum update counter task from queue.
      */
     private Item peek() {
-        return queue.peek();
+        return queue.isEmpty() ? null : queue.first();
+
     }
 
     /**
      * @param item Adds update task to priority queue.
      */
     private void offer(Item item) {
-        queue.offer(item);
+        queue.add(item);
+    }
+
+    /**
+     * Flushes pending update counters closing all possible gaps.
+     */
+    public synchronized void finalizeUpdateCountres() {
+        Item last = queue.pollLast();
+
+        if (last != null)
+            update(last.start + last.delta);
+
+        queue.clear();
     }
 
     /**
@@ -199,11 +211,7 @@ public class PartitionUpdateCounter {
 
         /** {@inheritDoc} */
         @Override public int compareTo(@NotNull Item o) {
-            int cmp = Long.compare(this.start, o.start);
-
-            assert cmp != 0;
-
-            return cmp;
+            return Long.compare(this.start, o.start);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 3fb1e4f..5e0deb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -146,17 +146,6 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
      */
     @SuppressWarnings("ConstantConditions")
     public void prepare() {
-        if (tx.txState().mvccEnabled()) { // TODO IGNITE-5935
-            U.error(log, "Cannot commit MVCC enabled transaction by recovery procedure. " +
-                "Operation is usupported at the moment [tx=" + CU.txString(tx) + ']');
-
-            onDone(false);
-
-            markInitialized();
-
-            return;
-        }
-
         if (nearTxCheck) {
             UUID nearNodeId = tx.eventNodeId();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 4db4685..3cabaec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -50,9 +50,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturnCompletableWra
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
@@ -770,15 +767,15 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                         // Apply update counters.
                         if (txCntrs != null)
-                            applyPartitionsUpdatesCounters(txCntrs.updateCounters());
+                            cctx.tm().txHandler().applyPartitionsUpdatesCounters(txCntrs.updateCounters());
 
-                            cctx.mvccCaching().onTxFinished(this, true);
+                        cctx.mvccCaching().onTxFinished(this, true);
 
-                            if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) {
-                                // Set new update counters for data entries received from persisted tx entries.
-                                List<DataEntry> entriesWithCounters = dataEntries.stream()
-                                        .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
-                                        .collect(Collectors.toList());
+                        if (!near() && !F.isEmpty(dataEntries) && cctx.wal() != null) {
+                            // Set new update counters for data entries received from persisted tx entries.
+                            List<DataEntry> entriesWithCounters = dataEntries.stream()
+                                .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter()))
+                                .collect(Collectors.toList());
 
                             cctx.wal().log(new DataRecord(entriesWithCounters));
                         }
@@ -921,7 +918,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
                 TxCounters counters = txCounters(false);
 
                 if (counters != null)
-                    applyPartitionsUpdatesCounters(counters.updateCounters());
+                    cctx.tm().txHandler().applyPartitionsUpdatesCounters(counters.updateCounters());
 
                 state(ROLLED_BACK);
 
@@ -996,39 +993,6 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         }
     }
 
-    /**
-     * Applies partition counters updates for mvcc transactions.
-     *
-     * @param counters Counters values to be updated.
-     */
-    private void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters) {
-        if (counters == null)
-            return;
-
-        int cacheId = CU.UNDEFINED_CACHE_ID;
-        GridDhtPartitionTopology top = null;
-
-        for (PartitionUpdateCountersMessage counter : counters) {
-            if (counter.cacheId() != cacheId) {
-                GridCacheContext ctx0 = cctx.cacheContext(cacheId = counter.cacheId());
-
-                assert ctx0.mvccEnabled();
-
-                top = ctx0.topology();
-            }
-
-            assert top != null;
-
-            for (int i = 0; i < counter.size(); i++) {
-                GridDhtLocalPartition part = top.localPartition(counter.partition(i));
-
-                assert part != null;
-
-                part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 9f96b46..d0fbd90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -375,7 +375,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 false,
                 false,
                 tx.mvccSnapshot(),
-                tx.filterUpdateCountersForBackupNode(n));
+                cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
 
             try {
                 cctx.io().send(n, req, tx.ioPolicy());
@@ -488,7 +488,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity
                 false,
                 false,
                 mvccSnapshot,
-                commit ? null : tx.filterUpdateCountersForBackupNode(n));
+                commit ? null : cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
 
             req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 483990f..86f9c3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
 import java.io.Externalizable;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -34,7 +33,6 @@ import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -48,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridLeanMap;
@@ -944,41 +941,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
         return prepFut;
     }
 
-    /**
-     * @param node Backup node.
-     * @return Partition counters map for the given backup node.
-     */
-    public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(ClusterNode node) {
-        TxCounters txCntrs = txCounters(false);
-
-        if (txCntrs == null || F.isEmpty(txCntrs.updateCounters()))
-            return null;
-
-        Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters();
-
-        List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
-
-        AffinityTopologyVersion top = topologyVersionSnapshot();
-
-        for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
-            GridCacheAffinityManager affinity = cctx.cacheContext(partCntrs.cacheId()).affinity();
-
-            PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
-
-            for (int i = 0; i < partCntrs.size(); i++) {
-                int part = partCntrs.partition(i);
-
-                if (affinity.backupByPartition(node, part, top))
-                    resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
-            }
-
-            if (resCntrs.size() > 0)
-                res.add(resCntrs);
-        }
-
-        return res;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index c505677..609bff8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1398,7 +1398,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                 tx.storeWriteThrough(),
                 retVal,
                 mvccSnapshot,
-                tx.filterUpdateCountersForBackupNode(n));
+                cctx.tm().txHandler().filterUpdateCountersForBackupNode(tx, n));
 
             req.queryUpdate(dhtMapping.queryUpdate());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
index 9140322..cd6e254 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridClientPartitionTopology.java
@@ -1234,7 +1234,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
+        boolean finalizeCntrsBeforeCollecting) {
         return CachePartitionPartialCountersMap.EMPTY;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
index 253a56a..2ddc0d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtLocalPartition.java
@@ -1371,6 +1371,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
     }
 
     /**
+     * Flushes pending update counters closing all possible gaps.
+     */
+    public void finalizeUpdateCountres() {
+        store.finalizeUpdateCountres();
+    }
+
+    /**
      * Removed entry holder.
      */
     private static class RemovedEntryHolder {

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
index b6cb5bb..25b284e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopology.java
@@ -350,9 +350,12 @@ public interface GridDhtPartitionTopology {
     public CachePartitionFullCountersMap fullUpdateCounters();
 
     /**
+     * @param skipZeros {@code True} for adding zero counter to map.
+     * @param finalizeCntrsBeforeCollecting {@code True} indicates that partition counters should be finalized.
      * @return Partition update counters.
      */
-    public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros);
+    public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
+        boolean finalizeCntrsBeforeCollecting);
 
     /**
      * @return Partition cache sizes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
index 94bb7f1..1f338d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/GridDhtPartitionTopologyImpl.java
@@ -2657,7 +2657,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     }
 
     /** {@inheritDoc} */
-    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
+    @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros,
+        boolean finalizeCntrsBeforeCollecting) {
         lock.readLock().lock();
 
         try {
@@ -2678,6 +2679,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
                 if (part == null)
                     continue;
 
+                if (finalizeCntrsBeforeCollecting)
+                    part.finalizeUpdateCountres();
+
                 long updCntr = part.updateCounter();
                 long initCntr = part.initialUpdateCounter();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index a09468f..a926acf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -180,13 +180,6 @@ public interface MvccProcessor extends GridProcessor {
     /**
      * Requests snapshot on Mvcc coordinator.
      *
-     * @return Snapshot future.
-     */
-    IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync();
-
-    /**
-     * Requests snapshot on Mvcc coordinator.
-     *
      * @param tx Transaction.
      * @return Snapshot future.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index f17c137..9fcafb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -20,14 +20,17 @@ package org.apache.ignite.internal.processors.cache.mvcc;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -68,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccActiveQueriesMes
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccFutureResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
 import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccWaitTxsRequest;
@@ -189,8 +193,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
     /** */
     private final GridAtomicLong committedCntr = new GridAtomicLong(MVCC_INITIAL_CNTR);
 
-    /** */
-    private final Map<Long, Long> activeTxs = new HashMap<>();
+    /**
+     * Contains active transactions on mvcc coordinator. Key is mvcc counter.
+     * Access is protected by "this" monitor.
+     */
+    private final Map<Long, ActiveTx> activeTxs = new HashMap<>();
 
     /** Active query trackers. */
     private final Map<Long, MvccQueryTracker> activeTrackers = new ConcurrentHashMap<>();
@@ -223,6 +230,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
     private volatile boolean mvccSupported = true;
 
     /**
+     * Maps failed node id to votes accumulator for that node.
+     */
+    private final ConcurrentHashMap<UUID, RecoveryBallotBox> recoveryBallotBoxes = new ConcurrentHashMap<>();
+
+    /**
      * @param ctx Context.
      */
     public MvccProcessorImpl(GridKernalContext ctx) {
@@ -363,8 +375,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
 
     /** {@inheritDoc} */
     @Override public void onExchangeDone(boolean newCrd, DiscoCache discoCache, Map<UUID, GridLongList> activeQueries) {
-        if (!newCrd)
+        if (!newCrd) {
+            if (curCrd != null && ctx.localNodeId().equals(curCrd.nodeId()) && discoCache != null)
+                cleanupOrphanedServerTransactions(discoCache.serverNodes());
+
             return;
+        }
 
         ctx.cache().context().tm().rollbackMvccTxOnCoordinatorChange();
 
@@ -391,6 +407,33 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         }
     }
 
+    /**
+     * Cleans up active transacitons lost near node which is server. Executed on coordinator.
+     * @param liveSrvs Live server nodes at the moment of cleanup.
+     */
+    private void cleanupOrphanedServerTransactions(Collection<ClusterNode> liveSrvs) {
+        Set<UUID> ids = liveSrvs.stream()
+            .map(ClusterNode::id)
+            .collect(Collectors.toSet());
+
+        List<Long> forRmv = new ArrayList<>();
+
+        synchronized (this) {
+            for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) {
+                // If node started tx is not known as live then remove such tx from active list
+                ActiveTx activeTx = entry.getValue();
+
+                if (activeTx.getClass() == ActiveServerTx.class && !ids.contains(activeTx.nearNodeId))
+                    forRmv.add(entry.getKey());
+            }
+        }
+
+        for (Long txCntr : forRmv)
+            // Committed counter is increased because it is not known if transaction was committed or not and we must
+            // bump committed counter for committed transaction as it is used in (read-only) query snapshot.
+            onTxDone(txCntr, true);
+    }
+
     /** {@inheritDoc} */
     @Override public void processClientActiveQueries(UUID nodeId, @Nullable GridLongList activeQueries) {
         prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
@@ -530,17 +573,12 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         if (!ctx.localNodeId().equals(crd.nodeId()) || !initFut.isDone())
             return null;
         else if (tx != null)
-            return assignTxSnapshot(0L);
+            return assignTxSnapshot(0L, ctx.localNodeId(), false);
         else
             return activeQueries.assignQueryCounter(ctx.localNodeId(), 0L);
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync() {
-        return requestSnapshotAsync((IgniteInternalTx)null);
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteInternalFuture<MvccSnapshot> requestSnapshotAsync(IgniteInternalTx tx) {
         MvccSnapshotFuture fut = new MvccSnapshotFuture();
 
@@ -585,7 +623,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                 });
             }
             else if (tx != null)
-                lsnr.onResponse(assignTxSnapshot(0L));
+                lsnr.onResponse(assignTxSnapshot(0L, ctx.localNodeId(), false));
             else
                 lsnr.onResponse(activeQueries.assignQueryCounter(ctx.localNodeId(), 0L));
 
@@ -741,9 +779,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                 first = false;
             }
 
-            for (MvccSnapshotResponseListener lsnr : map.values()) {
+            for (MvccSnapshotResponseListener lsnr : map.values())
                 U.warn(log, ">>> " + lsnr.toString());
-            }
         }
 
         first = true;
@@ -909,10 +946,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         return activeQryTrackers;
     }
 
-    /**
-     * @return Counter.
-     */
-    private MvccSnapshotResponse assignTxSnapshot(long futId) {
+    /** */
+    private MvccSnapshotResponse assignTxSnapshot(long futId, UUID nearId, boolean client) {
         assert initFut.isDone();
         assert crdVer != 0;
         assert ctx.localNodeId().equals(currentCoordinatorId());
@@ -926,14 +961,16 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             tracking = ver;
             cleanup = committedCntr.get() + 1;
 
-            for (Map.Entry<Long, Long> txVer : activeTxs.entrySet()) {
-                cleanup = Math.min(txVer.getValue(), cleanup);
-                tracking = Math.min(txVer.getKey(), tracking);
+            for (Map.Entry<Long, ActiveTx> entry : activeTxs.entrySet()) {
+                cleanup = Math.min(entry.getValue().tracking, cleanup);
+                tracking = Math.min(entry.getKey(), tracking);
 
-                res.addTx(txVer.getKey());
+                res.addTx(entry.getKey());
             }
 
-            boolean add = activeTxs.put(ver, tracking) == null;
+            ActiveTx activeTx = client ? new ActiveTx(tracking, nearId) : new ActiveServerTx(tracking, nearId);
+
+            boolean add = activeTxs.put(ver, activeTx) == null;
 
             assert add : ver;
         }
@@ -950,10 +987,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         return res;
     }
 
-    /**
-     * @param txCntr Counter assigned to transaction.
-     */
-    private void onTxDone(Long txCntr, boolean committed) {
+    /** */
+    private void onTxDone(Long txCntr, boolean increaseCommittedCntr) {
         assert initFut.isDone();
 
         GridFutureAdapter fut;
@@ -961,7 +996,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         synchronized (this) {
             activeTxs.remove(txCntr);
 
-            if (committed)
+            if (increaseCommittedCntr)
                 committedCntr.setIfGreater(txCntr);
         }
 
@@ -1352,10 +1387,14 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             return;
         }
 
-        MvccSnapshotResponse res = assignTxSnapshot(msg.futureId());
+        MvccSnapshotResponse res = assignTxSnapshot(msg.futureId(), nodeId, node.isClient());
+
+        boolean finishFailed = true;
 
         try {
             sendMessage(node.id(), res);
+
+            finishFailed = false;
         }
         catch (ClusterTopologyCheckedException e) {
             if (log.isDebugEnabled())
@@ -1364,6 +1403,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to send tx snapshot response [msg=" + msg + ", node=" + nodeId + ']', e);
         }
+
+        if (finishFailed)
+            onTxDone(res.counter(), false);
     }
 
     /**
@@ -1390,9 +1432,9 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                 log.debug("Failed to send query counter response, node left [msg=" + msg + ", node=" + nodeId + ']');
         }
         catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
-
             onQueryDone(nodeId, res.tracking());
+
+            U.error(log, "Failed to send query counter response [msg=" + msg + ", node=" + nodeId + ']', e);
         }
     }
 
@@ -1713,6 +1755,23 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             activeQueries.onNodeFailed(nodeId);
 
             prevCrdQueries.onNodeFailed(nodeId);
+
+            recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
+                // Put synthetic vote from another failed node
+                ballotBox.vote(nodeId);
+
+                tryFinishRecoveryVoting(nearNodeId, ballotBox);
+            });
+
+            if (discoEvt.eventNode().isClient()) {
+                RecoveryBallotBox ballotBox = recoveryBallotBoxes
+                    .computeIfAbsent(nodeId, uuid -> new RecoveryBallotBox());
+
+                ballotBox
+                    .voters(discoEvt.topologyNodes().stream().map(ClusterNode::id).collect(Collectors.toList()));
+
+                tryFinishRecoveryVoting(nodeId, ballotBox);
+            }
         }
 
         /** {@inheritDoc} */
@@ -1767,6 +1826,8 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
                 processNewCoordinatorQueryAckRequest(nodeId, (MvccAckRequestQueryId)msg);
             else if (msg instanceof MvccActiveQueriesMessage)
                 processCoordinatorActiveQueriesMessage(nodeId, (MvccActiveQueriesMessage)msg);
+            else if (msg instanceof MvccRecoveryFinishedMessage)
+                processRecoveryFinishedMessage(nodeId, ((MvccRecoveryFinishedMessage)msg));
             else
                 U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
         }
@@ -1777,6 +1838,82 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         }
     }
 
+    /**
+     * Accumulates transaction recovery votes for a node left the cluster.
+     * Transactions started by the left node are considered not active
+     * when each cluster server node aknowledges that is has finished transactions for the left node.
+     */
+    private static class RecoveryBallotBox {
+        /** */
+        private List<UUID> voters;
+        /** */
+        private final Set<UUID> ballots = new HashSet<>();
+
+        /**
+         * @param voters Nodes which can have transaction started by the left node.
+         */
+        private synchronized void voters(List<UUID> voters) {
+            this.voters = voters;
+        }
+
+        /**
+         * @param nodeId Voting node id.
+         *
+         */
+        private synchronized void vote(UUID nodeId) {
+            ballots.add(nodeId);
+        }
+
+        /**
+         * @return {@code True} if all nodes expected to vote done it.
+         */
+        private synchronized boolean isVotingDone() {
+            if (voters == null)
+                return false;
+
+            return ballots.containsAll(voters);
+        }
+    }
+
+    /**
+     * Process message that one node has finished with transactions for the left node.
+     * @param nodeId Node sent the message.
+     * @param msg Message.
+     */
+    private void processRecoveryFinishedMessage(UUID nodeId, MvccRecoveryFinishedMessage msg) {
+        UUID nearNodeId = msg.nearNodeId();
+
+        RecoveryBallotBox ballotBox = recoveryBallotBoxes.computeIfAbsent(nearNodeId, uuid -> new RecoveryBallotBox());
+
+        ballotBox.vote(nodeId);
+
+        tryFinishRecoveryVoting(nearNodeId, ballotBox);
+    }
+
+    /**
+     * Finishes recovery on coordinator by removing transactions started by the left node
+     * @param nearNodeId Left node.
+     * @param ballotBox Votes accumulator for the left node.
+     */
+    private void tryFinishRecoveryVoting(UUID nearNodeId, RecoveryBallotBox ballotBox) {
+        if (ballotBox.isVotingDone()) {
+            List<Long> recoveredTxs;
+
+            synchronized (this) {
+                recoveredTxs = activeTxs.entrySet().stream()
+                    .filter(e -> e.getValue().nearNodeId.equals(nearNodeId))
+                    .map(Map.Entry::getKey)
+                    .collect(Collectors.toList());
+            }
+
+            // Committed counter is increased because it is not known if transaction was committed or not and we must
+            // bump committed counter for committed transaction as it is used in (read-only) query snapshot.
+            recoveredTxs.forEach(txCntr -> onTxDone(txCntr, true));
+
+            recoveryBallotBoxes.remove(nearNodeId);
+        }
+    }
+
     /** */
     private interface Waiter {
         /**
@@ -2324,4 +2461,26 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             }
         }
     }
+
+    /** */
+    private static class ActiveTx {
+        /** */
+        private final long tracking;
+        /** */
+        private final UUID nearNodeId;
+
+        /** */
+        private ActiveTx(long tracking, UUID nearNodeId) {
+            this.tracking = tracking;
+            this.nearNodeId = nearNodeId;
+        }
+    }
+
+    /** */
+    private static class ActiveServerTx extends ActiveTx {
+        /** */
+        private ActiveServerTx(long tracking, UUID nearNodeId) {
+            super(tracking, nearNodeId);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 9441c17..972d4d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -242,7 +242,11 @@ public class MvccUtils {
         if (mvccCntr > snapshotCntr) // we don't see future updates
             return false;
 
-        if (mvccCntr == snapshotCntr) {
+        // Basically we can make fast decision about visibility if found rows from the same transaction.
+        // But we can't make such decision for read-only queries,
+        // because read-only queries use last committed version in it's snapshot which could be actually aborted
+        // (during transaction recovery we do not know whether recovered transaction was committed or aborted).
+        if (mvccCntr == snapshotCntr && snapshotOpCntr != MVCC_READ_OP_CNTR) {
             assert opCntr <= snapshotOpCntr : "rowVer=" + mvccVersion(mvccCrd, mvccCntr, opCntr) + ", snapshot=" + snapshot;
 
             return opCntr < snapshotOpCntr; // we don't see own pending updates

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java
new file mode 100644
index 0000000..a4ea103
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/MvccRecoveryFinishedMessage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/** */
+public class MvccRecoveryFinishedMessage implements MvccMessage {
+    /** */
+    private static final long serialVersionUID = -505062368078979867L;
+
+    /** */
+    private UUID nearNodeId;
+
+    /** */
+    public MvccRecoveryFinishedMessage() {
+    }
+
+    /** */
+    public MvccRecoveryFinishedMessage(UUID nearNodeId) {
+        this.nearNodeId = nearNodeId;
+    }
+
+    /**
+     * @return Left node id for which transactions were recovered.
+     */
+    public UUID nearNodeId() {
+        return nearNodeId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean waitForCoordinatorInit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean processedFromNioThread() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                nearNodeId = reader.readUuid("nearNodeId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(MvccRecoveryFinishedMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 164;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
new file mode 100644
index 0000000..ffd9a67
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/** */
+public class PartitionCountersNeighborcastRequest extends GridCacheIdMessage {
+    /** */
+    private static final long serialVersionUID = -1893577108462486998L;
+
+    /** */
+    @GridDirectCollection(PartitionUpdateCountersMessage.class)
+    private Collection<PartitionUpdateCountersMessage> updCntrs;
+
+    /** */
+    private IgniteUuid futId;
+
+    /** */
+    public PartitionCountersNeighborcastRequest() {
+    }
+
+    /** */
+    public PartitionCountersNeighborcastRequest(
+        Collection<PartitionUpdateCountersMessage> updCntrs, IgniteUuid futId) {
+        this.updCntrs = updCntrs;
+        this.futId = futId;
+    }
+
+    /**
+     * @return Partition update counters for remote node.
+     */
+    public Collection<PartitionUpdateCountersMessage> updateCounters() {
+        return updCntrs;
+    }
+
+    /**
+     * @return Sending future id.
+     */
+    public IgniteUuid futId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeCollection("updCntrs", updCntrs, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 4:
+                updCntrs = reader.readCollection("updCntrs", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(PartitionCountersNeighborcastRequest.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 165;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 5;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
new file mode 100644
index 0000000..547539d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.msg;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/** */
+public class PartitionCountersNeighborcastResponse extends GridCacheIdMessage {
+    /** */
+    private static final long serialVersionUID = -8731050539139260521L;
+
+    /** */
+    private IgniteUuid futId;
+
+    /** */
+    public PartitionCountersNeighborcastResponse() {
+    }
+
+    /** */
+    public PartitionCountersNeighborcastResponse(IgniteUuid futId) {
+        this.futId = futId;
+    }
+
+    /**
+     * @return Sending future id.
+     */
+    public IgniteUuid futId() {
+        return futId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 3:
+                if (!writer.writeIgniteUuid("futId", futId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 3:
+                futId = reader.readIgniteUuid("futId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(PartitionCountersNeighborcastResponse.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 166;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean addDeploymentInfo() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index cb682f6..240fbbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1681,6 +1681,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
         }
 
         /** {@inheritDoc} */
+        @Override public void finalizeUpdateCountres() {
+            try {
+                CacheDataStore delegate0 = init0(true);
+
+                if (delegate0 != null)
+                    delegate0.finalizeUpdateCountres();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        /** {@inheritDoc} */
         @Override public long nextUpdateCounter() {
             try {
                 CacheDataStore delegate0 = init0(false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 0d3ba75..399359b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -91,7 +91,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.transactions.TransactionConcurrency;
@@ -283,7 +282,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     private volatile IgniteInternalFuture rollbackFut;
 
     /** */
-    private volatile TxCounters txCounters = new TxCounters();
+    private volatile TxCounters txCounters;
 
     /**
      * Empty constructor required for {@link Externalizable}.

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 895a9d1..75e2087 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -47,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecove
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
@@ -74,6 +77,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.EnlistOperation;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
@@ -257,6 +262,20 @@ public class IgniteTxHandler {
                     processCheckPreparedTxResponse(nodeId, res);
                 }
             });
+
+        ctx.io().addCacheHandler(0, PartitionCountersNeighborcastRequest.class,
+            new CI2<UUID, PartitionCountersNeighborcastRequest>() {
+                @Override public void apply(UUID nodeId, PartitionCountersNeighborcastRequest req) {
+                    processPartitionCountersRequest(nodeId, req);
+                }
+            });
+
+        ctx.io().addCacheHandler(0, PartitionCountersNeighborcastResponse.class,
+            new CI2<UUID, PartitionCountersNeighborcastResponse>() {
+                @Override public void apply(UUID nodeId, PartitionCountersNeighborcastResponse res) {
+                    processPartitionCountersResponse(nodeId, res);
+                }
+            });
     }
 
     /**
@@ -2152,4 +2171,110 @@ public class IgniteTxHandler {
 
         fut.onResult(nodeId, res);
     }
+
+    /**
+     * @param nodeId Node id.
+     * @param req Request.
+     */
+    private void processPartitionCountersRequest(UUID nodeId, PartitionCountersNeighborcastRequest req) {
+        applyPartitionsUpdatesCounters(req.updateCounters());
+
+        try {
+            ctx.io().send(nodeId, new PartitionCountersNeighborcastResponse(req.futId()), SYSTEM_POOL);
+        }
+        catch (ClusterTopologyCheckedException ignored) {
+            if (txRecoveryMsgLog.isDebugEnabled())
+                txRecoveryMsgLog.debug("Failed to send partition counters response, node left [node=" + nodeId + ']');
+        }
+        catch (IgniteCheckedException e) {
+            U.error(txRecoveryMsgLog, "Failed to send partition counters response [node=" + nodeId + ']', e);
+        }
+    }
+
+    /**
+     * @param nodeId Node id.
+     * @param res Response.
+     */
+    private void processPartitionCountersResponse(UUID nodeId, PartitionCountersNeighborcastResponse res) {
+        PartitionCountersNeighborcastFuture fut = ((PartitionCountersNeighborcastFuture)ctx.mvcc().future(res.futId()));
+
+        if (fut == null) {
+            log.warning("Failed to find future for partition counters response [futId=" + res.futId() +
+                ", node=" + nodeId + ']');
+
+            return;
+        }
+
+        fut.onResult(nodeId);
+    }
+
+    /**
+     * Applies partition counter updates for mvcc transactions.
+     *
+     * @param counters Counter values to be updated.
+     */
+    public void applyPartitionsUpdatesCounters(Iterable<PartitionUpdateCountersMessage> counters) {
+        if (counters == null)
+            return;
+
+        int cacheId = CU.UNDEFINED_CACHE_ID;
+        GridDhtPartitionTopology top = null;
+
+        for (PartitionUpdateCountersMessage counter : counters) {
+            if (counter.cacheId() != cacheId) {
+                GridCacheContext ctx0 = ctx.cacheContext(cacheId = counter.cacheId());
+
+                assert ctx0.mvccEnabled();
+
+                top = ctx0.topology();
+            }
+
+            assert top != null;
+
+            for (int i = 0; i < counter.size(); i++) {
+                GridDhtLocalPartition part = top.localPartition(counter.partition(i));
+
+                assert part != null;
+
+                part.updateCounter(counter.initialCounter(i), counter.updatesCount(i));
+            }
+        }
+    }
+
+    /**
+     * @param tx Transaction.
+     * @param node Backup node.
+     * @return Partition counters for the given backup node.
+     */
+    @Nullable public List<PartitionUpdateCountersMessage> filterUpdateCountersForBackupNode(
+        IgniteInternalTx tx, ClusterNode node) {
+        TxCounters txCntrs = tx.txCounters(false);
+
+        if (txCntrs == null || F.isEmpty(txCntrs.updateCounters()))
+            return null;
+
+        Collection<PartitionUpdateCountersMessage> updCntrs = txCntrs.updateCounters();
+
+        List<PartitionUpdateCountersMessage> res = new ArrayList<>(updCntrs.size());
+
+        AffinityTopologyVersion top = tx.topologyVersionSnapshot();
+
+        for (PartitionUpdateCountersMessage partCntrs : updCntrs) {
+            GridCacheAffinityManager affinity = ctx.cacheContext(partCntrs.cacheId()).affinity();
+
+            PartitionUpdateCountersMessage resCntrs = new PartitionUpdateCountersMessage(partCntrs.cacheId(), partCntrs.size());
+
+            for (int i = 0; i < partCntrs.size(); i++) {
+                int part = partCntrs.partition(i);
+
+                if (affinity.backupByPartition(node, part, top))
+                    resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
+            }
+
+            if (resCntrs.size() > 0)
+                res.add(resCntrs);
+        }
+
+        return res;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5939a947/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 27b1522..0c2ca34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -36,12 +36,12 @@ import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture;
@@ -59,19 +59,18 @@ import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVe
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync;
 import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearOptimisticTxPrepareFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
 import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
 import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -106,6 +105,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_TX_STARTED;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
 import static org.apache.ignite.internal.GridTopic.TOPIC_TX;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ;
@@ -254,18 +254,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
             }
         };
 
-        cctx.gridEvents().addLocalEventListener(
-            new GridLocalEventListener() {
-                @Override public void onEvent(Event evt) {
-                    assert evt instanceof DiscoveryEvent;
+        cctx.gridEvents().addDiscoveryEventListener(
+            new DiscoveryEventListener() {
+                @Override public void onEvent(DiscoveryEvent evt, DiscoCache discoCache) {
                     assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
 
-                    DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
-                    UUID nodeId = discoEvt.eventNode().id();
+                    UUID nodeId = evt.eventNode().id();
 
                     // Wait some time in case there are some unprocessed messages from failed node.
-                    cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId));
+                    cctx.time().addTimeoutObject(
+                        new NodeFailureTimeoutObject(evt.eventNode(), discoCache.mvccCoordinator()));
 
                     if (txFinishSync != null)
                         txFinishSync.onNodeLeft(nodeId);
@@ -2026,7 +2024,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      */
     public void finishTxOnRecovery(final IgniteInternalTx tx, boolean commit) {
         if (log.isInfoEnabled())
-            log.info("Finishing prepared transaction [tx=" + tx + ", commit=" + commit + ']');
+            log.info("Finishing prepared transaction [commit=" + commit + ", tx=" + tx + ']');
 
         if (!tx.markFinalizing(RECOVERY_FINISH)) {
             if (log.isInfoEnabled())
@@ -2046,10 +2044,28 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
 
         if (commit)
             tx.commitAsync().listen(new CommitListener(tx));
+        else if (tx.mvccSnapshot() != null && !tx.local())
+            // remote (backup) mvcc transaction sends partition counters to other backup transaction
+            // in order to keep counters consistent
+            neighborcastPartitionCountersAndRollback(tx);
         else
             tx.rollbackAsync();
     }
 
+    /** */
+    private void neighborcastPartitionCountersAndRollback(IgniteInternalTx tx) {
+        TxCounters txCounters = tx.txCounters(false);
+
+        if (txCounters == null || txCounters.updateCounters() == null)
+            tx.rollbackAsync();
+
+        PartitionCountersNeighborcastFuture fut = new PartitionCountersNeighborcastFuture(tx, cctx);
+
+        fut.listen(fut0 -> tx.rollbackAsync());
+
+        fut.init();
+    }
+
     /**
      * Commits transaction in case when node started transaction failed, but all related
      * transactions were prepared (invalidates transaction if it is not fully prepared).
@@ -2427,16 +2443,20 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
      * Timeout object for node failure handler.
      */
     private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
-        /** Left or failed node. */
-        private final UUID evtNodeId;
+        /** */
+        private final ClusterNode node;
+        /** */
+        private final MvccCoordinator mvccCrd;
 
         /**
-         * @param evtNodeId Event node ID.
+         * @param node Failed node.
+         * @param mvccCrd Mvcc coordinator at time of node failure.
          */
-        private NodeFailureTimeoutObject(UUID evtNodeId) {
+        private NodeFailureTimeoutObject(ClusterNode node, MvccCoordinator mvccCrd) {
             super(IgniteUuid.fromUuid(cctx.localNodeId()), TX_SALVAGE_TIMEOUT);
 
-            this.evtNodeId = evtNodeId;
+            this.node = node;
+            this.mvccCrd = mvccCrd;
         }
 
         /**
@@ -2453,11 +2473,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                 return;
             }
 
+            UUID evtNodeId = node.id();
+
             try {
                 if (log.isDebugEnabled())
                     log.debug("Processing node failed event [locNodeId=" + cctx.localNodeId() +
                         ", failedNodeId=" + evtNodeId + ']');
 
+                // Null means that recovery voting is not needed.
+                GridCompoundFuture<IgniteInternalTx, Void> allTxFinFut = node.isClient() && mvccCrd != null
+                    ? new GridCompoundFuture<>() : null;
+
                 for (final IgniteInternalTx tx : activeTransactions()) {
                     if ((tx.near() && !tx.local()) || (tx.storeWriteThrough() && tx.masterNodeIds().contains(evtNodeId))) {
                         // Invalidate transactions.
@@ -2472,24 +2498,57 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
                                 IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
 
                                 if (prepFut != null) {
-                                    prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
-                                        @Override public void apply(IgniteInternalFuture<?> fut) {
-                                            if (tx.state() == PREPARED)
-                                                commitIfPrepared(tx, Collections.singleton(evtNodeId));
-                                            else if (tx.setRollbackOnly())
-                                                tx.rollbackAsync();
-                                        }
+                                    prepFut.listen(fut -> {
+                                        if (tx.state() == PREPARED)
+                                            commitIfPrepared(tx, Collections.singleton(evtNodeId));
+                                            // If we could not mark tx as rollback, it means that transaction is being committed.
+                                        else if (tx.setRollbackOnly())
+                                            tx.rollbackAsync();
                                     });
                                 }
-                                else {
-                                    // If we could not mark tx as rollback, it means that transaction is being committed.
-                                    if (tx.setRollbackOnly())
-                                        tx.rollbackAsync();
-                                }
+                                // If we could not mark tx as rollback, it means that transaction is being committed.
+                                else if (tx.setRollbackOnly())
+                                    tx.rollbackAsync();
                             }
                         }
+
+                        // Await only mvcc transactions initiated by failed client node.
+                        if (allTxFinFut != null && tx.eventNodeId().equals(evtNodeId)
+                            && tx.mvccSnapshot() != null)
+                            allTxFinFut.add(tx.finishFuture());
                     }
                 }
+
+                if (allTxFinFut == null)
+                    return;
+
+                allTxFinFut.markInitialized();
+
+                // Send vote to mvcc coordinator when all recovering transactions have finished.
+                allTxFinFut.listen(fut -> {
+                    // If mvcc coordinator issued snapshot for recovering transaction has failed during recovery,
+                    // then there is no need to send messages to new coordinator.
+                    try {
+                        cctx.kernalContext().io().sendToGridTopic(
+                            mvccCrd.nodeId(),
+                            TOPIC_CACHE_COORDINATOR,
+                            new MvccRecoveryFinishedMessage(evtNodeId),
+                            SYSTEM_POOL);
+                    }
+                    catch (ClusterTopologyCheckedException e) {
+                        if (log.isInfoEnabled())
+                            log.info("Mvcc coordinator issued snapshots for recovering transactions " +
+                                "has left the cluster (will ignore) [locNodeId=" + cctx.localNodeId() +
+                                    ", failedNodeId=" + evtNodeId +
+                                    ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']');
+                    }
+                    catch (IgniteCheckedException e) {
+                        log.warning("Failed to notify mvcc coordinator that all recovering transactions were " +
+                            "finished [locNodeId=" + cctx.localNodeId() +
+                            ", failedNodeId=" + evtNodeId +
+                            ", mvccCrdNodeId=" + mvccCrd.nodeId() + ']', e);
+                    }
+                });
             }
             finally {
                 cctx.kernalContext().gateway().readUnlock();