You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2019/02/05 11:38:56 UTC

[ignite] 04/06: IGNITE-8841: MVCC TX: Read transactions remap when coordinator fails. This closes #5949.

This is an automated email from the ASF dual-hosted git repository.

dpavlov pushed a commit to branch ignite-11191
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit e239bb610215325505f8a85ec6893a99ca29894d
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Feb 5 12:18:36 2019 +0300

    IGNITE-8841: MVCC TX: Read transactions remap when coordinator fails. This closes #5949.
---
 .../cache/distributed/near/GridNearTxLocal.java    | 118 ++++++++++-
 ...racker.java => MvccCoordinatorChangeAware.java} |  49 +----
 .../cache/mvcc/MvccPreviousCoordinatorQueries.java | 222 ---------------------
 .../processors/cache/mvcc/MvccProcessor.java       |  83 ++++----
 .../processors/cache/mvcc/MvccProcessorImpl.java   |  59 +++---
 .../processors/cache/mvcc/MvccQueryTracker.java    |  20 +-
 .../internal/processors/cache/mvcc/MvccUtils.java  |  21 +-
 .../processors/cache/mvcc/PreviousQueries.java     | 186 +++++++++++++++++
 .../cache/mvcc/StaticMvccQueryTracker.java         |  10 -
 .../cache/mvcc/CacheMvccAbstractTest.java          |  29 ++-
 ...acheMvccAbstractSqlCoordinatorFailoverTest.java |  81 ++++++++
 11 files changed, 495 insertions(+), 383 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 0ab8984..8ff0960 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -62,6 +62,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -113,6 +118,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
 import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
 import static org.apache.ignite.transactions.TransactionState.COMMITTED;
 import static org.apache.ignite.transactions.TransactionState.COMMITTING;
 import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
@@ -127,7 +133,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
  * Replicated user transaction.
  */
 @SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable, MvccCoordinatorChangeAware {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -192,6 +198,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
      */
     private Boolean mvccOp;
 
+    /** */
+    private long qryId = MVCC_TRACKER_ID_NA;
+
+    /** */
+    private long crdVer;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -3322,7 +3334,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         }
     }
 
-
     /** {@inheritDoc} */
     @Override public boolean queryEnlisted() {
         if (!txState.mvccEnabled())
@@ -3336,6 +3347,96 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
+     * Requests version on coordinator.
+     *
+     * @return Future to wait for result.
+     */
+    public IgniteInternalFuture<MvccSnapshot> requestSnapshot() {
+        if (isRollbackOnly())
+            return new GridFinishedFuture<>(rollbackException());
+
+        MvccSnapshot mvccSnapshot0 = mvccSnapshot;
+
+        if (mvccSnapshot0 != null)
+            return new GridFinishedFuture<>(mvccSnapshot0);
+
+        MvccProcessor prc = cctx.coordinators();
+
+        MvccCoordinator crd = prc.currentCoordinator();
+
+        synchronized (this) {
+            this.crdVer = crd.version();
+        }
+
+        if (crd.local())
+            mvccSnapshot0 = prc.requestWriteSnapshotLocal();
+
+        if (mvccSnapshot0 == null) {
+            MvccSnapshotFuture fut = new MvccTxSnapshotFuture();
+
+            prc.requestWriteSnapshotAsync(crd, fut);
+
+            return fut;
+        }
+
+        GridFutureAdapter<MvccSnapshot> fut = new GridFutureAdapter<>();
+
+        onResponse0(mvccSnapshot0, fut);
+
+        return fut;
+    }
+
+    /** */
+    private synchronized void onResponse0(MvccSnapshot res, GridFutureAdapter<MvccSnapshot> fut) {
+        assert mvccSnapshot == null;
+
+        if (state() != ACTIVE) {
+            // The transaction were concurrently rolled back.
+            // We need to notify the coordinator about that.
+            assert isRollbackOnly();
+
+            cctx.coordinators().ackTxRollback(res);
+
+            fut.onDone(timedOut() ? timeoutException() : rollbackException());
+        }
+        else if (crdVer != res.coordinatorVersion()) {
+            setRollbackOnly();
+
+            fut.onDone(new IgniteTxRollbackCheckedException(
+                "Mvcc coordinator has been changed during request. " +
+                "Please retry on a stable topology."));
+        }
+        else
+            fut.onDone(mvccSnapshot = res);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long onMvccCoordinatorChange(MvccCoordinator newCrd) {
+        if (isDone // Already finished.
+            || crdVer == 0 // Mvcc snapshot has not been requested yet or it's an non-mvcc transaction.
+            || newCrd.version() == crdVer) // Acceptable operations reordering.
+            return MVCC_TRACKER_ID_NA;
+
+        crdVer = newCrd.version();
+
+        if (mvccSnapshot == null)
+            return MVCC_TRACKER_ID_NA;
+
+        if (qryId == MVCC_TRACKER_ID_NA) {
+            long qryId0 = qryId = ID_CNTR.incrementAndGet();
+
+            finishFuture().listen(f -> cctx.coordinators().ackQueryDone(mvccSnapshot, qryId0));
+        }
+
+        return qryId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
      * Adds key mapping to dht mapping.
      *
      * @param key Key to add.
@@ -4796,6 +4897,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
         abstract T finish(T t) throws IgniteCheckedException;
     }
 
+    /** */
+    private class MvccTxSnapshotFuture extends MvccSnapshotFuture {
+        @Override public void onResponse(MvccSnapshot res) {
+            onResponse0(res, this);
+        }
+
+        @Override public void onError(IgniteCheckedException err) {
+            setRollbackOnly();
+
+            super.onError(err);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridNearTxLocal.class, this,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
similarity index 51%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
index c8ce98e..36e6a38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
@@ -18,51 +18,20 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import java.util.function.LongPredicate;
 
 /**
- * Mvcc tracker.
+ *
  */
-public interface MvccQueryTracker {
+public interface MvccCoordinatorChangeAware {
     /** */
-    public static final AtomicLong ID_CNTR = new AtomicLong();
+    AtomicLong ID_CNTR = new AtomicLong();
 
     /** */
-    public static final long MVCC_TRACKER_ID_NA = -1;
-
-    /**
-     * @return Tracker id.
-     */
-    public long id();
-
-    /**
-     * @return Requested MVCC snapshot.
-     */
-    public MvccSnapshot snapshot();
-
-    /**
-     * @return Cache context.
-     */
-    public GridCacheContext context();
+    long MVCC_TRACKER_ID_NA = -1;
 
-    /**
-     * @return Topology version.
-     */
-    public AffinityTopologyVersion topologyVersion();
-
-    /**
-     * Requests version on coordinator.
-     *
-     * @return Future to wait for result.
-     */
-    public IgniteInternalFuture<MvccSnapshot> requestSnapshot();
-
-    /**
-     * Marks tracker as done.
-     */
-    public void onDone();
+    /** */
+    LongPredicate ID_FILTER = id -> id != MVCC_TRACKER_ID_NA;
 
     /**
      * Mvcc coordinator change callback.
@@ -70,5 +39,7 @@ public interface MvccQueryTracker {
      * @param newCrd New mvcc coordinator.
      * @return Query id if exists.
      */
-    long onMvccCoordinatorChange(MvccCoordinator newCrd);
+    default long onMvccCoordinatorChange(MvccCoordinator newCrd) {
+        return MVCC_TRACKER_ID_NA;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
deleted file mode 100644
index 26e4574..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
-
-/**
- *
- */
-class MvccPreviousCoordinatorQueries {
-    /** */
-    private volatile boolean prevQueriesDone;
-
-    /** Map of nodes to active {@link MvccQueryTracker} IDs list. */
-    private final ConcurrentHashMap<UUID, Set<Long>> activeQueries = new ConcurrentHashMap<>();
-
-    /** */
-    private final ConcurrentHashMap<UUID, Set<Long>> rcvdAcks = new ConcurrentHashMap<>();
-
-    /** */
-    private Set<UUID> rcvd;
-
-    /** */
-    private Set<UUID> waitNodes;
-
-    /** */
-    private boolean initDone;
-
-    /** */
-    void init() {
-        init(null, Collections.emptyList(), null);
-    }
-
-    /**
-     * @param nodeQueries Active queries map.
-     * @param nodes Cluster nodes.
-     * @param mgr Discovery manager.
-     */
-    void init(GridLongList nodeQueries, Collection<ClusterNode> nodes, GridDiscoveryManager mgr) {
-        synchronized (this) {
-            assert !initDone;
-            assert waitNodes == null;
-
-            waitNodes = new HashSet<>();
-
-            for (ClusterNode node : nodes) {
-                if (!node.isLocal() && mgr.alive(node) && !F.contains(rcvd, node.id()))
-                    waitNodes.add(node.id());
-            }
-
-            initDone = waitNodes.isEmpty();
-
-            if (nodeQueries != null)
-                mergeToActiveQueries(mgr.localNode().id(), nodeQueries);
-
-            if (initDone && !prevQueriesDone)
-                prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
-        }
-    }
-
-    /**
-     * @return {@code True} if all queries on
-     */
-    boolean previousQueriesDone() {
-        return prevQueriesDone;
-    }
-
-    /**
-     * Merges current node active queries with the given ones.
-     *
-     * @param nodeId Node ID.
-     * @param nodeTrackers Active query trackers started on node.
-     */
-    private void mergeToActiveQueries(UUID nodeId, GridLongList nodeTrackers) {
-        if (nodeTrackers == null || nodeTrackers.isEmpty() || prevQueriesDone)
-            return;
-
-        Set<Long> currTrackers = activeQueries.get(nodeId);
-
-        if (currTrackers == null)
-            activeQueries.put(nodeId, currTrackers = addAll(nodeTrackers, null));
-        else
-            addAll(nodeTrackers, currTrackers);
-
-        // Check if there were any acks had been arrived before.
-        Set<Long> currAcks = rcvdAcks.get(nodeId);
-
-        if (!currTrackers.isEmpty() && currAcks != null && !currAcks.isEmpty()) {
-            Collection<Long> intersection =  new HashSet<>(currAcks);
-
-            intersection.retainAll(currTrackers);
-
-            currAcks.removeAll(intersection);
-            currTrackers.removeAll(intersection);
-
-            if (currTrackers.isEmpty())
-                activeQueries.remove(nodeId);
-
-            if (currAcks.isEmpty())
-                rcvdAcks.remove(nodeId);
-        }
-
-        if (initDone && !prevQueriesDone)
-            prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param nodeTrackers  Active query trackers started on node.
-     */
-    void addNodeActiveQueries(UUID nodeId, @Nullable GridLongList nodeTrackers) {
-        synchronized (this) {
-            if (initDone)
-                return;
-
-            if (waitNodes == null) {
-                if (rcvd == null)
-                    rcvd = new HashSet<>();
-
-                rcvd.add(nodeId);
-            }
-            else {
-                waitNodes.remove(nodeId);
-
-                initDone = waitNodes.isEmpty();
-            }
-
-            mergeToActiveQueries(nodeId, nodeTrackers);
-
-            if (initDone && !prevQueriesDone)
-                prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
-        }
-    }
-
-    /**
-     * @param nodeId Failed node ID.
-     */
-    void onNodeFailed(UUID nodeId) {
-        synchronized (this) {
-            if (waitNodes != null) {
-                waitNodes.remove(nodeId);
-
-                initDone = waitNodes.isEmpty();
-            }
-
-            if (initDone && !prevQueriesDone && activeQueries.remove(nodeId) != null)
-                prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param qryTrackerId Query tracker Id.
-     */
-    void onQueryDone(UUID nodeId, long qryTrackerId) {
-        if (qryTrackerId == MVCC_TRACKER_ID_NA)
-            return;
-
-        synchronized (this) {
-            Set<Long> nodeTrackers = activeQueries.get(nodeId);
-
-            if (nodeTrackers == null || !nodeTrackers.remove(qryTrackerId)) {
-                Set<Long> nodeAcks = rcvdAcks.get(nodeId);
-
-                if (nodeAcks == null)
-                    rcvdAcks.put(nodeId, nodeAcks = new HashSet<>());
-
-                // We received qry done ack before the active qry message. Need to save it.
-                nodeAcks.add(qryTrackerId);
-            }
-
-            if (nodeTrackers != null && nodeTrackers.isEmpty())
-                activeQueries.remove(nodeId);
-
-            if (initDone && !prevQueriesDone)
-                prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
-        }
-    }
-
-    /**
-     * @param from Long list.
-     * @param to Set.
-     */
-    private Set<Long> addAll(GridLongList from, Set<Long> to) {
-        assert from != null;
-
-        if (to == null)
-            to = new HashSet<>(from.size());
-
-        for (int i = 0; i < from.size(); i++)
-            to.add(from.get(i));
-
-        return to;
-    }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index 05fd9cf..f41d615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -101,8 +101,7 @@ public interface MvccProcessor extends GridProcessor {
      * @param blockerVer Version the entry is locked by.
      * @return Future, which is completed as soon as the lock is released.
      */
-    IgniteInternalFuture<Void> waitForLock(GridCacheContext cctx, MvccVersion waiterVer,
-        MvccVersion blockerVer);
+    IgniteInternalFuture<Void> waitForLock(GridCacheContext cctx, MvccVersion waiterVer, MvccVersion blockerVer);
 
     /**
      * @param locked Version the entry is locked by.
@@ -110,6 +109,23 @@ public interface MvccProcessor extends GridProcessor {
     void releaseWaiters(MvccVersion locked);
 
     /**
+     * Checks whether one tx is waiting for another tx.
+     * It is assumed that locks on data nodes are requested one by one, so tx can wait only for one another tx here.
+     *
+     * @param mvccVer Version of transaction which is checked for being waiting.
+     * @return Version of tx which blocks checked tx.
+     */
+    Optional<? extends MvccVersion> checkWaiting(MvccVersion mvccVer);
+
+    /**
+     * Unfreezes waiter for specific version failing it with passed exception.
+     *
+     * @param mvccVer Version of a waiter to fail.
+     * @param e Exception reflecting failure reason.
+     */
+    void failWaiter(MvccVersion mvccVer, Exception e);
+
+    /**
      * @param tracker Query tracker.
      */
     void addQueryTracker(MvccQueryTracker tracker);
@@ -123,12 +139,6 @@ public interface MvccProcessor extends GridProcessor {
      * @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
      * {@code Null} in other cases.
      */
-    MvccSnapshot requestWriteSnapshotLocal();
-
-    /**
-     * @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
-     * {@code Null} in other cases.
-     */
     MvccSnapshot requestReadSnapshotLocal();
 
     /**
@@ -141,17 +151,23 @@ public interface MvccProcessor extends GridProcessor {
     /**
      * Requests snapshot on Mvcc coordinator.
      *
-     * @return Result future.
+     * @param crd Expected coordinator.
+     * @param lsnr Request listener.
      */
-    IgniteInternalFuture<MvccSnapshot> requestWriteSnapshotAsync();
+    void requestReadSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
+
+    /**
+     * @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
+     * {@code Null} in other cases.
+     */
+    MvccSnapshot requestWriteSnapshotLocal();
 
     /**
      * Requests snapshot on Mvcc coordinator.
      *
-     * @param crd Expected coordinator.
-     * @param lsnr Request listener.
+     * @return Result future.
      */
-    void requestReadSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
+    IgniteInternalFuture<MvccSnapshot> requestWriteSnapshotAsync();
 
     /**
      * Requests snapshot on Mvcc coordinator.
@@ -162,6 +178,12 @@ public interface MvccProcessor extends GridProcessor {
     void requestWriteSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
 
     /**
+     * @param snapshot Query version.
+     * @param qryId Query tracker ID.
+     */
+    void ackQueryDone(MvccSnapshot snapshot, long qryId);
+
+    /**
      * @param updateVer Transaction update version.
      * @return Acknowledge future.
      */
@@ -173,18 +195,6 @@ public interface MvccProcessor extends GridProcessor {
     void ackTxRollback(MvccVersion updateVer);
 
     /**
-     * @param snapshot Query version.
-     * @param qryId Query tracker ID.
-     */
-    void ackQueryDone(MvccSnapshot snapshot, long qryId);
-
-    /**
-     * @param log Logger.
-     * @param diagCtx Diagnostic request.
-     */
-    void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx);
-
-    /**
      * @return {@code True} if at least one cache with
      * {@code CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode is registered.
      */
@@ -213,23 +223,6 @@ public interface MvccProcessor extends GridProcessor {
     void ensureStarted() throws IgniteCheckedException;
 
     /**
-     * Checks whether one tx is waiting for another tx.
-     * It is assumed that locks on data nodes are requested one by one, so tx can wait only for one another tx here.
-     *
-     * @param mvccVer Version of transaction which is checked for being waiting.
-     * @return Version of tx which blocks checked tx.
-     */
-    Optional<? extends MvccVersion> checkWaiting(MvccVersion mvccVer);
-
-    /**
-     * Unfreezes waiter for specific version failing it with passed exception.
-     *
-     * @param mvccVer Version of a waiter to fail.
-     * @param e Exception reflecting failure reason.
-     */
-    void failWaiter(MvccVersion mvccVer, Exception e);
-
-    /**
      * Cache stop callback.
      * @param cctx Cache context.
      *
@@ -240,4 +233,10 @@ public interface MvccProcessor extends GridProcessor {
      * Force txLog stop.
      */
     void stopTxLog();
+
+    /**
+     * @param log Logger.
+     * @param diagCtx Diagnostic request.
+     */
+    void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 0fa8226..dfe4f75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -112,6 +113,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware.ID_FILTER;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
 import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
@@ -204,7 +206,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
     private final ActiveQueries activeQueries = new ActiveQueries();
 
     /** */
-    private final MvccPreviousCoordinatorQueries prevCrdQueries = new MvccPreviousCoordinatorQueries();
+    private final PreviousQueries prevQueries = new PreviousQueries();
 
     /** */
     private final GridFutureAdapter<Void> initFut = new GridFutureAdapter<>();
@@ -444,7 +446,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
 
         if (evt.type() == EVT_NODE_JOINED) {
             if (curCrd0.disconnected()) // Handle join event only if coordinator has not been elected yet.
-                onCoordinatorChanged(topVer, nodes, false);
+                onCoordinatorChanged(topVer, nodes, true);
         }
         else if (Objects.equals(nodeId, curCrd0.nodeId())) {
             // 1. Notify all listeners waiting for a snapshot.
@@ -459,7 +461,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
             activeQueries.onNodeFailed(nodeId);
 
             // 2. Notify previous queries.
-            prevCrdQueries.onNodeFailed(nodeId);
+            prevQueries.onNodeFailed(nodeId);
 
             // 3. Recover transactions started by the failed node.
             recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
@@ -509,7 +511,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
     /**
      * Coordinator change callback. Performs all needed actions for handling new coordinator assignment.
      *
-     * @param sndQrys {@code True} if it is need to send an active queries list to the new coordinator.
+     * @param sndQrys {@code True} if it is need to collect/send an active queries list.
      */
     private void onCoordinatorChanged(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes, boolean sndQrys) {
         MvccCoordinator newCrd = pickMvccCoordinator(nodes, topVer);
@@ -524,29 +526,28 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
 
         curCrd = newCrd;
 
-        if (newCrd.local() && !sndQrys)
-            // Coordinator was assigned on local join. There was no coordinator before.
-            prevCrdQueries.init();
+        processActiveQueries(nodes, newCrd, sndQrys);
+    }
 
-        if (sndQrys) {
-            GridLongList activeQryTrackers = new GridLongList();
+    /** */
+    private void processActiveQueries(Collection<ClusterNode> nodes, MvccCoordinator newCrd, boolean sndQrys) {
+        GridLongList qryIds = sndQrys ? new GridLongList(Stream.concat(activeTrackers.values().stream(),
+            ctx.cache().context().tm().activeTransactions().stream()
+                .filter(tx -> tx.near() && tx.local()))
+            .mapToLong(q -> ((MvccCoordinatorChangeAware)q).onMvccCoordinatorChange(newCrd))
+            .filter(ID_FILTER).toArray()) : new GridLongList();
 
-            for (MvccQueryTracker tracker : activeTrackers.values()) {
-                long trackerId = tracker.onMvccCoordinatorChange(newCrd);
+        if (newCrd.local()) {
+            prevQueries.addActiveQueries(ctx.localNodeId(), qryIds);
 
-                if (trackerId != MVCC_TRACKER_ID_NA)
-                    activeQryTrackers.add(trackerId);
+            prevQueries.init(nodes, ctx.discovery()::alive);
+        }
+        else if (sndQrys) {
+            try {
+                sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(qryIds));
             }
-
-            if (newCrd.local())
-                prevCrdQueries.init(activeQryTrackers, nodes, ctx.discovery());
-            else {
-                try {
-                    sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(activeQryTrackers));
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
-                }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
             }
         }
     }
@@ -1036,7 +1037,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         if (minQry != -1)
             cleanup = Math.min(cleanup, minQry);
 
-        cleanup = prevCrdQueries.previousQueriesDone() ? cleanup - 1 : MVCC_COUNTER_NA;
+        cleanup = prevQueries.done() ? cleanup - 1 : MVCC_COUNTER_NA;
 
         res.init(futId, curCrd.version(), ver, MVCC_START_OP_CNTR, cleanup, tracking);
 
@@ -1464,7 +1465,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
      * @param msg Message.
      */
     private void processNewCoordinatorQueryAckRequest(UUID nodeId, MvccAckRequestQueryId msg) {
-        prevCrdQueries.onQueryDone(nodeId, msg.queryTrackerId());
+        prevQueries.onQueryDone(nodeId, msg.queryTrackerId());
     }
 
     /**
@@ -1477,7 +1478,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
         if (msg.queryCounter() != MVCC_COUNTER_NA)
             onQueryDone(nodeId, msg.queryCounter());
         else if (msg.queryTrackerId() != MVCC_TRACKER_ID_NA)
-            prevCrdQueries.onQueryDone(nodeId, msg.queryTrackerId());
+            prevQueries.onQueryDone(nodeId, msg.queryTrackerId());
 
         if (!msg.skipResponse()) {
             try {
@@ -1515,7 +1516,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
      * @param msg Message.
      */
     private void processActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) {
-        prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
+        GridLongList queryIds = msg.activeQueries();
+
+        assert queryIds != null;
+
+        prevQueries.addActiveQueries(nodeId, queryIds);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index c8ce98e..c362e9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -25,17 +24,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
 /**
  * Mvcc tracker.
  */
-public interface MvccQueryTracker {
-    /** */
-    public static final AtomicLong ID_CNTR = new AtomicLong();
-
-    /** */
-    public static final long MVCC_TRACKER_ID_NA = -1;
-
+public interface MvccQueryTracker extends MvccCoordinatorChangeAware {
     /**
      * @return Tracker id.
      */
-    public long id();
+    default long id() {
+        return MVCC_TRACKER_ID_NA;
+    }
 
     /**
      * @return Requested MVCC snapshot.
@@ -64,11 +59,4 @@ public interface MvccQueryTracker {
      */
     public void onDone();
 
-    /**
-     * Mvcc coordinator change callback.
-     *
-     * @param newCrd New mvcc coordinator.
-     * @return Query id if exists.
-     */
-    long onMvccCoordinatorChange(MvccCoordinator newCrd);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 725f123..f34416f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -829,23 +829,12 @@ public class MvccUtils {
      * @return Mvcc snapshot.
      */
     public static MvccSnapshot requestSnapshot(GridCacheContext cctx,
-        GridNearTxLocal tx) throws IgniteCheckedException {
-        assert tx != null;
-
-        MvccSnapshot snapshot;
-
-        tx = checkActive(tx);
-
-        if ((snapshot = tx.mvccSnapshot()) == null) {
-            MvccProcessor prc = cctx.shared().coordinators();
+        @NotNull GridNearTxLocal tx) throws IgniteCheckedException {
+        MvccSnapshot snapshot = tx.mvccSnapshot();
 
-            snapshot = prc.requestWriteSnapshotLocal();
-
-            if (snapshot == null)
-                snapshot = prc.requestWriteSnapshotAsync().get();
-
-            tx.mvccSnapshot(snapshot);
-        }
+        if (snapshot == null)
+            // TODO IGNITE-7388
+            return tx.requestSnapshot().get();
 
         return snapshot;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousQueries.java
new file mode 100644
index 0000000..c6005ef
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousQueries.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+class PreviousQueries {
+    /** */
+    private static class Node {
+        /** */
+        @GridToStringInclude
+        boolean init;
+
+        /** */
+        @GridToStringInclude
+        Set<Long> cntrs;
+
+        /** */
+        boolean isDone() {
+            return init && (cntrs == null || cntrs.stream().allMatch(l -> l < 0));
+        }
+
+        @Override public String toString() {
+            return S.toString(Node.class, this);
+        }
+    }
+
+    /** */
+    private Map<UUID, Node> active = new HashMap<>();
+
+    /** */
+    private boolean init;
+
+    /** */
+    private volatile boolean done;
+
+    /**
+     * @param nodes Waiting nodes.
+     * @param alivePredicate Alive nodes filter.
+     */
+    synchronized void init(Collection<ClusterNode> nodes, Predicate<UUID> alivePredicate) {
+        assert !init && !done;
+
+        nodes.stream().map(ClusterNode::id).forEach(uuid -> active.putIfAbsent(uuid, new Node()));
+
+        active.entrySet().removeIf(e -> !alivePredicate.test(e.getKey()) || e.getValue().isDone());
+
+        if (active.isEmpty())
+            done = true;
+
+        init = true;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     */
+    void onNodeFailed(@NotNull UUID nodeId) {
+        if (done())
+            return;
+
+        synchronized (this) {
+            if (init)
+                removeAndCheckDone(nodeId);
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param qryId Query tracker Id.
+     */
+    void onQueryDone(@NotNull UUID nodeId, long qryId) {
+        if (!done())
+            onQueryDone0(nodeId, qryId);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param queryIds Query tracker Ids.
+     */
+    void addActiveQueries(@NotNull UUID nodeId, @NotNull GridLongList queryIds) {
+        if (!done())
+            addActiveQueries0(nodeId, queryIds);
+    }
+
+    /**
+     * @return {@code True} if all queries mapped on previous coordinator are done.
+     */
+    boolean done() {
+        return done;
+    }
+
+    /** */
+    private synchronized void onQueryDone0(@NotNull UUID nodeId, long qryId) {
+        assert qryId > 0;
+
+        Node node = active.get(nodeId);
+
+        if (node == null && !init)
+            active.put(nodeId, node = new Node());
+
+        if (node != null) {
+            Set<Long> cntrs = node.cntrs;
+
+            boolean wasNull = cntrs == null;
+
+            if (cntrs == null)
+                cntrs = node.cntrs = new HashSet<>();
+
+            if (wasNull || !cntrs.remove(qryId))
+                cntrs.add(-qryId);
+
+            if (init && node.isDone())
+                removeAndCheckDone(nodeId);
+        }
+    }
+
+    /** */
+    private synchronized void addActiveQueries0(@NotNull UUID nodeId, @NotNull GridLongList queryIds) {
+        Node node = active.get(nodeId);
+
+        if (node == null && !init)
+            active.put(nodeId, node = new Node());
+
+        if (node != null) {
+            Set<Long> cntrs = node.cntrs;
+
+            boolean wasNull = cntrs == null, hasQueries = false;
+
+            for (int i = 0; i < queryIds.size(); i++) {
+                long qryId = queryIds.get(i);
+
+                assert qryId > 0;
+
+                if (cntrs == null)
+                    cntrs = node.cntrs = new HashSet<>();
+
+                if (wasNull || !cntrs.remove(-qryId))
+                    hasQueries |= cntrs.add(qryId);
+            }
+
+            if (init && !hasQueries)
+                removeAndCheckDone(nodeId);
+            else
+                node.init = true;
+        }
+    }
+
+    /** */
+    private void removeAndCheckDone(@NotNull UUID nodeId) {
+        assert init;
+
+        active.remove(nodeId);
+
+        if (active.isEmpty())
+            done = true;
+    }
+}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
index 95a1664..552a16c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
@@ -66,14 +66,4 @@ public class StaticMvccQueryTracker implements MvccQueryTracker {
     @Override public void onDone() {
         // No-op.
     }
-
-    /** {@inheritDoc} */
-    @Override public long onMvccCoordinatorChange(MvccCoordinator newCrd) {
-        return MVCC_TRACKER_ID_NA;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long id() {
-        return MVCC_TRACKER_ID_NA;
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index a026849..1459f0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -1723,12 +1723,19 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     protected final void checkActiveQueriesCleanup(Ignite node) throws Exception {
-        final MvccProcessorImpl crd = mvccProcessor(node);
+        final MvccProcessorImpl prc = mvccProcessor(node);
 
-        assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition(
+        MvccCoordinator crd = prc.currentCoordinator();
+
+        if (!crd.local())
+            return;
+
+        assertTrue("Coordinator is not initialized: " + prc, GridTestUtils.waitForCondition(crd::initialized, 8_000));
+
+        assertTrue("Active queries are not cleared: " + node.name(), GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    Object activeQueries = GridTestUtils.getFieldValue(crd, "activeQueries");
+                    Object activeQueries = GridTestUtils.getFieldValue(prc, "activeQueries");
 
                     synchronized (activeQueries) {
                         Long minQry = GridTestUtils.getFieldValue(activeQueries, "minQry");
@@ -1754,16 +1761,20 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
             }, 8_000)
         );
 
-        assertTrue("Previous coordinator queries not empty: " + node.name(), GridTestUtils.waitForCondition(
+        assertTrue("Previous coordinator queries are not empty: " + node.name(), GridTestUtils.waitForCondition(
             new GridAbsPredicate() {
                 @Override public boolean apply() {
-                    Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries");
-                    Boolean prevDone = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone");
+                    PreviousQueries prevQueries = GridTestUtils.getFieldValue(prc, "prevQueries");
+
+                    synchronized (prevQueries) {
+                        Map queries = GridTestUtils.getFieldValue(prevQueries, "active");
+                        Boolean prevDone = GridTestUtils.getFieldValue(prevQueries, "done");
 
-                    if (!queries.isEmpty() || !prevDone)
-                        log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']');
+                        if (!queries.isEmpty() || !prevDone)
+                            log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']');
 
-                    return queries.isEmpty();
+                        return queries.isEmpty();
+                    }
                 }
             }, 8_000)
         );
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
index 2111fcb..6ad2ea5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
@@ -18,12 +18,15 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheServerNotFoundException;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -31,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -171,6 +175,83 @@ public abstract class CacheMvccAbstractSqlCoordinatorFailoverTest extends CacheM
      * @throws Exception If failed.
      */
     @Test
+    public void testTxReadAfterCoordinatorChangeDirectOrder() throws Exception {
+        testTxReadAfterCoordinatorChange(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTxReadAfterCoordinatorChangeReverseOrder() throws Exception {
+        testTxReadAfterCoordinatorChange(false);
+    }
+
+    /** */
+    private void testTxReadAfterCoordinatorChange(boolean directOrder) throws Exception {
+        ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+            .setIndexedTypes(Integer.class, Integer.class).setNodeFilter(new CoordinatorNodeFilter());
+
+        MvccProcessorImpl.coordinatorAssignClosure(new CoordinatorAssignClosure());
+
+        IgniteEx node = startGrid(0);
+
+        nodeAttr = CRD_ATTR;
+
+        startGrid(1);
+
+        IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+
+        cache.put(1,1);
+
+        Semaphore sem = new Semaphore(0);
+
+        IgniteInternalFuture future = GridTestUtils.runAsync(() -> {
+            IgniteCache<Integer, Integer> cache0 = node.cache(DEFAULT_CACHE_NAME);
+
+            try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                Integer res = cache0.get(1);
+
+                sem.release();
+
+                // wait for coordinator change.
+                assertTrue(sem.tryAcquire(2, getTestTimeout(), TimeUnit.MILLISECONDS));
+
+                assertEquals(res, cache0.get(1));
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        assertTrue(sem.tryAcquire(getTestTimeout(), TimeUnit.MILLISECONDS));
+
+        if (directOrder)
+            stopGrid(1);
+
+        MvccProcessorImpl prc = mvccProcessor(startGrid(2));
+
+        if (!directOrder)
+            stopGrid(1);
+
+        awaitPartitionMapExchange();
+
+        MvccCoordinator crd = prc.currentCoordinator();
+
+        assert crd.local() && crd.initialized();
+
+        cache.put(1,2);
+        cache.put(1,3);
+
+        sem.release(2);
+
+        future.get(getTestTimeout());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
     public void testStartLastServerFails() throws Exception {
         testSpi = true;