You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2019/02/05 11:38:56 UTC
[ignite] 04/06: IGNITE-8841: MVCC TX: Read transactions remap when
coordinator fails. This closes #5949.
This is an automated email from the ASF dual-hosted git repository.
dpavlov pushed a commit to branch ignite-11191
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit e239bb610215325505f8a85ec6893a99ca29894d
Author: Igor Seliverstov <gv...@gmail.com>
AuthorDate: Tue Feb 5 12:18:36 2019 +0300
IGNITE-8841: MVCC TX: Read transactions remap when coordinator fails. This closes #5949.
---
.../cache/distributed/near/GridNearTxLocal.java | 118 ++++++++++-
...racker.java => MvccCoordinatorChangeAware.java} | 49 +----
.../cache/mvcc/MvccPreviousCoordinatorQueries.java | 222 ---------------------
.../processors/cache/mvcc/MvccProcessor.java | 83 ++++----
.../processors/cache/mvcc/MvccProcessorImpl.java | 59 +++---
.../processors/cache/mvcc/MvccQueryTracker.java | 20 +-
.../internal/processors/cache/mvcc/MvccUtils.java | 21 +-
.../processors/cache/mvcc/PreviousQueries.java | 186 +++++++++++++++++
.../cache/mvcc/StaticMvccQueryTracker.java | 10 -
.../cache/mvcc/CacheMvccAbstractTest.java | 29 ++-
...acheMvccAbstractSqlCoordinatorFailoverTest.java | 81 ++++++++
11 files changed, 495 insertions(+), 383 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 0ab8984..8ff0960 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -62,6 +62,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridInvokeValue;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -113,6 +118,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
@@ -127,7 +133,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
* Replicated user transaction.
*/
@SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable, MvccCoordinatorChangeAware {
/** */
private static final long serialVersionUID = 0L;
@@ -192,6 +198,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
*/
private Boolean mvccOp;
+ /** */
+ private long qryId = MVCC_TRACKER_ID_NA;
+
+ /** */
+ private long crdVer;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -3322,7 +3334,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
}
-
/** {@inheritDoc} */
@Override public boolean queryEnlisted() {
if (!txState.mvccEnabled())
@@ -3336,6 +3347,96 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
+ * Requests version on coordinator.
+ *
+ * @return Future to wait for result.
+ */
+ public IgniteInternalFuture<MvccSnapshot> requestSnapshot() {
+ if (isRollbackOnly())
+ return new GridFinishedFuture<>(rollbackException());
+
+ MvccSnapshot mvccSnapshot0 = mvccSnapshot;
+
+ if (mvccSnapshot0 != null)
+ return new GridFinishedFuture<>(mvccSnapshot0);
+
+ MvccProcessor prc = cctx.coordinators();
+
+ MvccCoordinator crd = prc.currentCoordinator();
+
+ synchronized (this) {
+ this.crdVer = crd.version();
+ }
+
+ if (crd.local())
+ mvccSnapshot0 = prc.requestWriteSnapshotLocal();
+
+ if (mvccSnapshot0 == null) {
+ MvccSnapshotFuture fut = new MvccTxSnapshotFuture();
+
+ prc.requestWriteSnapshotAsync(crd, fut);
+
+ return fut;
+ }
+
+ GridFutureAdapter<MvccSnapshot> fut = new GridFutureAdapter<>();
+
+ onResponse0(mvccSnapshot0, fut);
+
+ return fut;
+ }
+
+ /** */
+ private synchronized void onResponse0(MvccSnapshot res, GridFutureAdapter<MvccSnapshot> fut) {
+ assert mvccSnapshot == null;
+
+ if (state() != ACTIVE) {
+ // The transaction were concurrently rolled back.
+ // We need to notify the coordinator about that.
+ assert isRollbackOnly();
+
+ cctx.coordinators().ackTxRollback(res);
+
+ fut.onDone(timedOut() ? timeoutException() : rollbackException());
+ }
+ else if (crdVer != res.coordinatorVersion()) {
+ setRollbackOnly();
+
+ fut.onDone(new IgniteTxRollbackCheckedException(
+ "Mvcc coordinator has been changed during request. " +
+ "Please retry on a stable topology."));
+ }
+ else
+ fut.onDone(mvccSnapshot = res);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized long onMvccCoordinatorChange(MvccCoordinator newCrd) {
+ if (isDone // Already finished.
+ || crdVer == 0 // Mvcc snapshot has not been requested yet or it's an non-mvcc transaction.
+ || newCrd.version() == crdVer) // Acceptable operations reordering.
+ return MVCC_TRACKER_ID_NA;
+
+ crdVer = newCrd.version();
+
+ if (mvccSnapshot == null)
+ return MVCC_TRACKER_ID_NA;
+
+ if (qryId == MVCC_TRACKER_ID_NA) {
+ long qryId0 = qryId = ID_CNTR.incrementAndGet();
+
+ finishFuture().listen(f -> cctx.coordinators().ackQueryDone(mvccSnapshot, qryId0));
+ }
+
+ return qryId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mvccSnapshot(MvccSnapshot mvccSnapshot) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
* Adds key mapping to dht mapping.
*
* @param key Key to add.
@@ -4796,6 +4897,19 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
abstract T finish(T t) throws IgniteCheckedException;
}
+ /** */
+ private class MvccTxSnapshotFuture extends MvccSnapshotFuture {
+ @Override public void onResponse(MvccSnapshot res) {
+ onResponse0(res, this);
+ }
+
+ @Override public void onError(IgniteCheckedException err) {
+ setRollbackOnly();
+
+ super.onError(err);
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridNearTxLocal.class, this,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
similarity index 51%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
index c8ce98e..36e6a38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccCoordinatorChangeAware.java
@@ -18,51 +18,20 @@
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import java.util.function.LongPredicate;
/**
- * Mvcc tracker.
+ *
*/
-public interface MvccQueryTracker {
+public interface MvccCoordinatorChangeAware {
/** */
- public static final AtomicLong ID_CNTR = new AtomicLong();
+ AtomicLong ID_CNTR = new AtomicLong();
/** */
- public static final long MVCC_TRACKER_ID_NA = -1;
-
- /**
- * @return Tracker id.
- */
- public long id();
-
- /**
- * @return Requested MVCC snapshot.
- */
- public MvccSnapshot snapshot();
-
- /**
- * @return Cache context.
- */
- public GridCacheContext context();
+ long MVCC_TRACKER_ID_NA = -1;
- /**
- * @return Topology version.
- */
- public AffinityTopologyVersion topologyVersion();
-
- /**
- * Requests version on coordinator.
- *
- * @return Future to wait for result.
- */
- public IgniteInternalFuture<MvccSnapshot> requestSnapshot();
-
- /**
- * Marks tracker as done.
- */
- public void onDone();
+ /** */
+ LongPredicate ID_FILTER = id -> id != MVCC_TRACKER_ID_NA;
/**
* Mvcc coordinator change callback.
@@ -70,5 +39,7 @@ public interface MvccQueryTracker {
* @param newCrd New mvcc coordinator.
* @return Query id if exists.
*/
- long onMvccCoordinatorChange(MvccCoordinator newCrd);
+ default long onMvccCoordinatorChange(MvccCoordinator newCrd) {
+ return MVCC_TRACKER_ID_NA;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
deleted file mode 100644
index 26e4574..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccPreviousCoordinatorQueries.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
-
-/**
- *
- */
-class MvccPreviousCoordinatorQueries {
- /** */
- private volatile boolean prevQueriesDone;
-
- /** Map of nodes to active {@link MvccQueryTracker} IDs list. */
- private final ConcurrentHashMap<UUID, Set<Long>> activeQueries = new ConcurrentHashMap<>();
-
- /** */
- private final ConcurrentHashMap<UUID, Set<Long>> rcvdAcks = new ConcurrentHashMap<>();
-
- /** */
- private Set<UUID> rcvd;
-
- /** */
- private Set<UUID> waitNodes;
-
- /** */
- private boolean initDone;
-
- /** */
- void init() {
- init(null, Collections.emptyList(), null);
- }
-
- /**
- * @param nodeQueries Active queries map.
- * @param nodes Cluster nodes.
- * @param mgr Discovery manager.
- */
- void init(GridLongList nodeQueries, Collection<ClusterNode> nodes, GridDiscoveryManager mgr) {
- synchronized (this) {
- assert !initDone;
- assert waitNodes == null;
-
- waitNodes = new HashSet<>();
-
- for (ClusterNode node : nodes) {
- if (!node.isLocal() && mgr.alive(node) && !F.contains(rcvd, node.id()))
- waitNodes.add(node.id());
- }
-
- initDone = waitNodes.isEmpty();
-
- if (nodeQueries != null)
- mergeToActiveQueries(mgr.localNode().id(), nodeQueries);
-
- if (initDone && !prevQueriesDone)
- prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
- }
- }
-
- /**
- * @return {@code True} if all queries on
- */
- boolean previousQueriesDone() {
- return prevQueriesDone;
- }
-
- /**
- * Merges current node active queries with the given ones.
- *
- * @param nodeId Node ID.
- * @param nodeTrackers Active query trackers started on node.
- */
- private void mergeToActiveQueries(UUID nodeId, GridLongList nodeTrackers) {
- if (nodeTrackers == null || nodeTrackers.isEmpty() || prevQueriesDone)
- return;
-
- Set<Long> currTrackers = activeQueries.get(nodeId);
-
- if (currTrackers == null)
- activeQueries.put(nodeId, currTrackers = addAll(nodeTrackers, null));
- else
- addAll(nodeTrackers, currTrackers);
-
- // Check if there were any acks had been arrived before.
- Set<Long> currAcks = rcvdAcks.get(nodeId);
-
- if (!currTrackers.isEmpty() && currAcks != null && !currAcks.isEmpty()) {
- Collection<Long> intersection = new HashSet<>(currAcks);
-
- intersection.retainAll(currTrackers);
-
- currAcks.removeAll(intersection);
- currTrackers.removeAll(intersection);
-
- if (currTrackers.isEmpty())
- activeQueries.remove(nodeId);
-
- if (currAcks.isEmpty())
- rcvdAcks.remove(nodeId);
- }
-
- if (initDone && !prevQueriesDone)
- prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
- }
-
- /**
- * @param nodeId Node ID.
- * @param nodeTrackers Active query trackers started on node.
- */
- void addNodeActiveQueries(UUID nodeId, @Nullable GridLongList nodeTrackers) {
- synchronized (this) {
- if (initDone)
- return;
-
- if (waitNodes == null) {
- if (rcvd == null)
- rcvd = new HashSet<>();
-
- rcvd.add(nodeId);
- }
- else {
- waitNodes.remove(nodeId);
-
- initDone = waitNodes.isEmpty();
- }
-
- mergeToActiveQueries(nodeId, nodeTrackers);
-
- if (initDone && !prevQueriesDone)
- prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
- }
- }
-
- /**
- * @param nodeId Failed node ID.
- */
- void onNodeFailed(UUID nodeId) {
- synchronized (this) {
- if (waitNodes != null) {
- waitNodes.remove(nodeId);
-
- initDone = waitNodes.isEmpty();
- }
-
- if (initDone && !prevQueriesDone && activeQueries.remove(nodeId) != null)
- prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param qryTrackerId Query tracker Id.
- */
- void onQueryDone(UUID nodeId, long qryTrackerId) {
- if (qryTrackerId == MVCC_TRACKER_ID_NA)
- return;
-
- synchronized (this) {
- Set<Long> nodeTrackers = activeQueries.get(nodeId);
-
- if (nodeTrackers == null || !nodeTrackers.remove(qryTrackerId)) {
- Set<Long> nodeAcks = rcvdAcks.get(nodeId);
-
- if (nodeAcks == null)
- rcvdAcks.put(nodeId, nodeAcks = new HashSet<>());
-
- // We received qry done ack before the active qry message. Need to save it.
- nodeAcks.add(qryTrackerId);
- }
-
- if (nodeTrackers != null && nodeTrackers.isEmpty())
- activeQueries.remove(nodeId);
-
- if (initDone && !prevQueriesDone)
- prevQueriesDone = activeQueries.isEmpty() && rcvdAcks.isEmpty();
- }
- }
-
- /**
- * @param from Long list.
- * @param to Set.
- */
- private Set<Long> addAll(GridLongList from, Set<Long> to) {
- assert from != null;
-
- if (to == null)
- to = new HashSet<>(from.size());
-
- for (int i = 0; i < from.size(); i++)
- to.add(from.get(i));
-
- return to;
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
index 05fd9cf..f41d615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessor.java
@@ -101,8 +101,7 @@ public interface MvccProcessor extends GridProcessor {
* @param blockerVer Version the entry is locked by.
* @return Future, which is completed as soon as the lock is released.
*/
- IgniteInternalFuture<Void> waitForLock(GridCacheContext cctx, MvccVersion waiterVer,
- MvccVersion blockerVer);
+ IgniteInternalFuture<Void> waitForLock(GridCacheContext cctx, MvccVersion waiterVer, MvccVersion blockerVer);
/**
* @param locked Version the entry is locked by.
@@ -110,6 +109,23 @@ public interface MvccProcessor extends GridProcessor {
void releaseWaiters(MvccVersion locked);
/**
+ * Checks whether one tx is waiting for another tx.
+ * It is assumed that locks on data nodes are requested one by one, so tx can wait only for one another tx here.
+ *
+ * @param mvccVer Version of transaction which is checked for being waiting.
+ * @return Version of tx which blocks checked tx.
+ */
+ Optional<? extends MvccVersion> checkWaiting(MvccVersion mvccVer);
+
+ /**
+ * Unfreezes waiter for specific version failing it with passed exception.
+ *
+ * @param mvccVer Version of a waiter to fail.
+ * @param e Exception reflecting failure reason.
+ */
+ void failWaiter(MvccVersion mvccVer, Exception e);
+
+ /**
* @param tracker Query tracker.
*/
void addQueryTracker(MvccQueryTracker tracker);
@@ -123,12 +139,6 @@ public interface MvccProcessor extends GridProcessor {
* @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
* {@code Null} in other cases.
*/
- MvccSnapshot requestWriteSnapshotLocal();
-
- /**
- * @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
- * {@code Null} in other cases.
- */
MvccSnapshot requestReadSnapshotLocal();
/**
@@ -141,17 +151,23 @@ public interface MvccProcessor extends GridProcessor {
/**
* Requests snapshot on Mvcc coordinator.
*
- * @return Result future.
+ * @param crd Expected coordinator.
+ * @param lsnr Request listener.
*/
- IgniteInternalFuture<MvccSnapshot> requestWriteSnapshotAsync();
+ void requestReadSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
+
+ /**
+ * @return {@link MvccSnapshot} if this is a coordinator node and coordinator is initialized.
+ * {@code Null} in other cases.
+ */
+ MvccSnapshot requestWriteSnapshotLocal();
/**
* Requests snapshot on Mvcc coordinator.
*
- * @param crd Expected coordinator.
- * @param lsnr Request listener.
+ * @return Result future.
*/
- void requestReadSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
+ IgniteInternalFuture<MvccSnapshot> requestWriteSnapshotAsync();
/**
* Requests snapshot on Mvcc coordinator.
@@ -162,6 +178,12 @@ public interface MvccProcessor extends GridProcessor {
void requestWriteSnapshotAsync(MvccCoordinator crd, MvccSnapshotResponseListener lsnr);
/**
+ * @param snapshot Query version.
+ * @param qryId Query tracker ID.
+ */
+ void ackQueryDone(MvccSnapshot snapshot, long qryId);
+
+ /**
* @param updateVer Transaction update version.
* @return Acknowledge future.
*/
@@ -173,18 +195,6 @@ public interface MvccProcessor extends GridProcessor {
void ackTxRollback(MvccVersion updateVer);
/**
- * @param snapshot Query version.
- * @param qryId Query tracker ID.
- */
- void ackQueryDone(MvccSnapshot snapshot, long qryId);
-
- /**
- * @param log Logger.
- * @param diagCtx Diagnostic request.
- */
- void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx);
-
- /**
* @return {@code True} if at least one cache with
* {@code CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT} mode is registered.
*/
@@ -213,23 +223,6 @@ public interface MvccProcessor extends GridProcessor {
void ensureStarted() throws IgniteCheckedException;
/**
- * Checks whether one tx is waiting for another tx.
- * It is assumed that locks on data nodes are requested one by one, so tx can wait only for one another tx here.
- *
- * @param mvccVer Version of transaction which is checked for being waiting.
- * @return Version of tx which blocks checked tx.
- */
- Optional<? extends MvccVersion> checkWaiting(MvccVersion mvccVer);
-
- /**
- * Unfreezes waiter for specific version failing it with passed exception.
- *
- * @param mvccVer Version of a waiter to fail.
- * @param e Exception reflecting failure reason.
- */
- void failWaiter(MvccVersion mvccVer, Exception e);
-
- /**
* Cache stop callback.
* @param cctx Cache context.
*
@@ -240,4 +233,10 @@ public interface MvccProcessor extends GridProcessor {
* Force txLog stop.
*/
void stopTxLog();
+
+ /**
+ * @param log Logger.
+ * @param diagCtx Diagnostic request.
+ */
+ void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
index 0fa8226..dfe4f75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccProcessorImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -112,6 +113,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware.ID_FILTER;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker.MVCC_TRACKER_ID_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
@@ -204,7 +206,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
private final ActiveQueries activeQueries = new ActiveQueries();
/** */
- private final MvccPreviousCoordinatorQueries prevCrdQueries = new MvccPreviousCoordinatorQueries();
+ private final PreviousQueries prevQueries = new PreviousQueries();
/** */
private final GridFutureAdapter<Void> initFut = new GridFutureAdapter<>();
@@ -444,7 +446,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (evt.type() == EVT_NODE_JOINED) {
if (curCrd0.disconnected()) // Handle join event only if coordinator has not been elected yet.
- onCoordinatorChanged(topVer, nodes, false);
+ onCoordinatorChanged(topVer, nodes, true);
}
else if (Objects.equals(nodeId, curCrd0.nodeId())) {
// 1. Notify all listeners waiting for a snapshot.
@@ -459,7 +461,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
activeQueries.onNodeFailed(nodeId);
// 2. Notify previous queries.
- prevCrdQueries.onNodeFailed(nodeId);
+ prevQueries.onNodeFailed(nodeId);
// 3. Recover transactions started by the failed node.
recoveryBallotBoxes.forEach((nearNodeId, ballotBox) -> {
@@ -509,7 +511,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
/**
* Coordinator change callback. Performs all needed actions for handling new coordinator assignment.
*
- * @param sndQrys {@code True} if it is need to send an active queries list to the new coordinator.
+ * @param sndQrys {@code True} if it is need to collect/send an active queries list.
*/
private void onCoordinatorChanged(AffinityTopologyVersion topVer, Collection<ClusterNode> nodes, boolean sndQrys) {
MvccCoordinator newCrd = pickMvccCoordinator(nodes, topVer);
@@ -524,29 +526,28 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
curCrd = newCrd;
- if (newCrd.local() && !sndQrys)
- // Coordinator was assigned on local join. There was no coordinator before.
- prevCrdQueries.init();
+ processActiveQueries(nodes, newCrd, sndQrys);
+ }
- if (sndQrys) {
- GridLongList activeQryTrackers = new GridLongList();
+ /** */
+ private void processActiveQueries(Collection<ClusterNode> nodes, MvccCoordinator newCrd, boolean sndQrys) {
+ GridLongList qryIds = sndQrys ? new GridLongList(Stream.concat(activeTrackers.values().stream(),
+ ctx.cache().context().tm().activeTransactions().stream()
+ .filter(tx -> tx.near() && tx.local()))
+ .mapToLong(q -> ((MvccCoordinatorChangeAware)q).onMvccCoordinatorChange(newCrd))
+ .filter(ID_FILTER).toArray()) : new GridLongList();
- for (MvccQueryTracker tracker : activeTrackers.values()) {
- long trackerId = tracker.onMvccCoordinatorChange(newCrd);
+ if (newCrd.local()) {
+ prevQueries.addActiveQueries(ctx.localNodeId(), qryIds);
- if (trackerId != MVCC_TRACKER_ID_NA)
- activeQryTrackers.add(trackerId);
+ prevQueries.init(nodes, ctx.discovery()::alive);
+ }
+ else if (sndQrys) {
+ try {
+ sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(qryIds));
}
-
- if (newCrd.local())
- prevCrdQueries.init(activeQryTrackers, nodes, ctx.discovery());
- else {
- try {
- sendMessage(newCrd.nodeId(), new MvccActiveQueriesMessage(activeQryTrackers));
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
- }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
}
}
}
@@ -1036,7 +1037,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (minQry != -1)
cleanup = Math.min(cleanup, minQry);
- cleanup = prevCrdQueries.previousQueriesDone() ? cleanup - 1 : MVCC_COUNTER_NA;
+ cleanup = prevQueries.done() ? cleanup - 1 : MVCC_COUNTER_NA;
res.init(futId, curCrd.version(), ver, MVCC_START_OP_CNTR, cleanup, tracking);
@@ -1464,7 +1465,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
* @param msg Message.
*/
private void processNewCoordinatorQueryAckRequest(UUID nodeId, MvccAckRequestQueryId msg) {
- prevCrdQueries.onQueryDone(nodeId, msg.queryTrackerId());
+ prevQueries.onQueryDone(nodeId, msg.queryTrackerId());
}
/**
@@ -1477,7 +1478,7 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
if (msg.queryCounter() != MVCC_COUNTER_NA)
onQueryDone(nodeId, msg.queryCounter());
else if (msg.queryTrackerId() != MVCC_TRACKER_ID_NA)
- prevCrdQueries.onQueryDone(nodeId, msg.queryTrackerId());
+ prevQueries.onQueryDone(nodeId, msg.queryTrackerId());
if (!msg.skipResponse()) {
try {
@@ -1515,7 +1516,11 @@ public class MvccProcessorImpl extends GridProcessorAdapter implements MvccProce
* @param msg Message.
*/
private void processActiveQueriesMessage(UUID nodeId, MvccActiveQueriesMessage msg) {
- prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
+ GridLongList queryIds = msg.activeQueries();
+
+ assert queryIds != null;
+
+ prevQueries.addActiveQueries(nodeId, queryIds);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
index c8ce98e..c362e9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.mvcc;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -25,17 +24,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
/**
* Mvcc tracker.
*/
-public interface MvccQueryTracker {
- /** */
- public static final AtomicLong ID_CNTR = new AtomicLong();
-
- /** */
- public static final long MVCC_TRACKER_ID_NA = -1;
-
+public interface MvccQueryTracker extends MvccCoordinatorChangeAware {
/**
* @return Tracker id.
*/
- public long id();
+ default long id() {
+ return MVCC_TRACKER_ID_NA;
+ }
/**
* @return Requested MVCC snapshot.
@@ -64,11 +59,4 @@ public interface MvccQueryTracker {
*/
public void onDone();
- /**
- * Mvcc coordinator change callback.
- *
- * @param newCrd New mvcc coordinator.
- * @return Query id if exists.
- */
- long onMvccCoordinatorChange(MvccCoordinator newCrd);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
index 725f123..f34416f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccUtils.java
@@ -829,23 +829,12 @@ public class MvccUtils {
* @return Mvcc snapshot.
*/
public static MvccSnapshot requestSnapshot(GridCacheContext cctx,
- GridNearTxLocal tx) throws IgniteCheckedException {
- assert tx != null;
-
- MvccSnapshot snapshot;
-
- tx = checkActive(tx);
-
- if ((snapshot = tx.mvccSnapshot()) == null) {
- MvccProcessor prc = cctx.shared().coordinators();
+ @NotNull GridNearTxLocal tx) throws IgniteCheckedException {
+ MvccSnapshot snapshot = tx.mvccSnapshot();
- snapshot = prc.requestWriteSnapshotLocal();
-
- if (snapshot == null)
- snapshot = prc.requestWriteSnapshotAsync().get();
-
- tx.mvccSnapshot(snapshot);
- }
+ if (snapshot == null)
+ // TODO IGNITE-7388
+ return tx.requestSnapshot().get();
return snapshot;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousQueries.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousQueries.java
new file mode 100644
index 0000000..c6005ef
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/PreviousQueries.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Predicate;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+class PreviousQueries {
+ /** */
+ private static class Node {
+ /** */
+ @GridToStringInclude
+ boolean init;
+
+ /** */
+ @GridToStringInclude
+ Set<Long> cntrs;
+
+ /** */
+ boolean isDone() {
+ return init && (cntrs == null || cntrs.stream().allMatch(l -> l < 0));
+ }
+
+ @Override public String toString() {
+ return S.toString(Node.class, this);
+ }
+ }
+
+ /** */
+ private Map<UUID, Node> active = new HashMap<>();
+
+ /** */
+ private boolean init;
+
+ /** */
+ private volatile boolean done;
+
+ /**
+ * @param nodes Waiting nodes.
+ * @param alivePredicate Alive nodes filter.
+ */
+ synchronized void init(Collection<ClusterNode> nodes, Predicate<UUID> alivePredicate) {
+ assert !init && !done;
+
+ nodes.stream().map(ClusterNode::id).forEach(uuid -> active.putIfAbsent(uuid, new Node()));
+
+ active.entrySet().removeIf(e -> !alivePredicate.test(e.getKey()) || e.getValue().isDone());
+
+ if (active.isEmpty())
+ done = true;
+
+ init = true;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ void onNodeFailed(@NotNull UUID nodeId) {
+ if (done())
+ return;
+
+ synchronized (this) {
+ if (init)
+ removeAndCheckDone(nodeId);
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param qryId Query tracker Id.
+ */
+ void onQueryDone(@NotNull UUID nodeId, long qryId) {
+ if (!done())
+ onQueryDone0(nodeId, qryId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param queryIds Query tracker Ids.
+ */
+ void addActiveQueries(@NotNull UUID nodeId, @NotNull GridLongList queryIds) {
+ if (!done())
+ addActiveQueries0(nodeId, queryIds);
+ }
+
+ /**
+ * @return {@code True} if all queries mapped on previous coordinator are done.
+ */
+ boolean done() {
+ return done;
+ }
+
+ /** */
+ private synchronized void onQueryDone0(@NotNull UUID nodeId, long qryId) {
+ assert qryId > 0;
+
+ Node node = active.get(nodeId);
+
+ if (node == null && !init)
+ active.put(nodeId, node = new Node());
+
+ if (node != null) {
+ Set<Long> cntrs = node.cntrs;
+
+ boolean wasNull = cntrs == null;
+
+ if (cntrs == null)
+ cntrs = node.cntrs = new HashSet<>();
+
+ if (wasNull || !cntrs.remove(qryId))
+ cntrs.add(-qryId);
+
+ if (init && node.isDone())
+ removeAndCheckDone(nodeId);
+ }
+ }
+
+ /** */
+ private synchronized void addActiveQueries0(@NotNull UUID nodeId, @NotNull GridLongList queryIds) {
+ Node node = active.get(nodeId);
+
+ if (node == null && !init)
+ active.put(nodeId, node = new Node());
+
+ if (node != null) {
+ Set<Long> cntrs = node.cntrs;
+
+ boolean wasNull = cntrs == null, hasQueries = false;
+
+ for (int i = 0; i < queryIds.size(); i++) {
+ long qryId = queryIds.get(i);
+
+ assert qryId > 0;
+
+ if (cntrs == null)
+ cntrs = node.cntrs = new HashSet<>();
+
+ if (wasNull || !cntrs.remove(-qryId))
+ hasQueries |= cntrs.add(qryId);
+ }
+
+ if (init && !hasQueries)
+ removeAndCheckDone(nodeId);
+ else
+ node.init = true;
+ }
+ }
+
+ /** */
+ private void removeAndCheckDone(@NotNull UUID nodeId) {
+ assert init;
+
+ active.remove(nodeId);
+
+ if (active.isEmpty())
+ done = true;
+ }
+}
\ No newline at end of file
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
index 95a1664..552a16c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/StaticMvccQueryTracker.java
@@ -66,14 +66,4 @@ public class StaticMvccQueryTracker implements MvccQueryTracker {
@Override public void onDone() {
// No-op.
}
-
- /** {@inheritDoc} */
- @Override public long onMvccCoordinatorChange(MvccCoordinator newCrd) {
- return MVCC_TRACKER_ID_NA;
- }
-
- /** {@inheritDoc} */
- @Override public long id() {
- return MVCC_TRACKER_ID_NA;
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index a026849..1459f0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -1723,12 +1723,19 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
protected final void checkActiveQueriesCleanup(Ignite node) throws Exception {
- final MvccProcessorImpl crd = mvccProcessor(node);
+ final MvccProcessorImpl prc = mvccProcessor(node);
- assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition(
+ MvccCoordinator crd = prc.currentCoordinator();
+
+ if (!crd.local())
+ return;
+
+ assertTrue("Coordinator is not initialized: " + prc, GridTestUtils.waitForCondition(crd::initialized, 8_000));
+
+ assertTrue("Active queries are not cleared: " + node.name(), GridTestUtils.waitForCondition(
new GridAbsPredicate() {
@Override public boolean apply() {
- Object activeQueries = GridTestUtils.getFieldValue(crd, "activeQueries");
+ Object activeQueries = GridTestUtils.getFieldValue(prc, "activeQueries");
synchronized (activeQueries) {
Long minQry = GridTestUtils.getFieldValue(activeQueries, "minQry");
@@ -1754,16 +1761,20 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest {
}, 8_000)
);
- assertTrue("Previous coordinator queries not empty: " + node.name(), GridTestUtils.waitForCondition(
+ assertTrue("Previous coordinator queries are not empty: " + node.name(), GridTestUtils.waitForCondition(
new GridAbsPredicate() {
@Override public boolean apply() {
- Map queries = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "activeQueries");
- Boolean prevDone = GridTestUtils.getFieldValue(crd, "prevCrdQueries", "prevQueriesDone");
+ PreviousQueries prevQueries = GridTestUtils.getFieldValue(prc, "prevQueries");
+
+ synchronized (prevQueries) {
+ Map queries = GridTestUtils.getFieldValue(prevQueries, "active");
+ Boolean prevDone = GridTestUtils.getFieldValue(prevQueries, "done");
- if (!queries.isEmpty() || !prevDone)
- log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']');
+ if (!queries.isEmpty() || !prevDone)
+ log.info("Previous coordinator state [prevDone=" + prevDone + ", queries=" + queries + ']');
- return queries.isEmpty();
+ return queries.isEmpty();
+ }
}
}, 8_000)
);
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
index 2111fcb..6ad2ea5 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractSqlCoordinatorFailoverTest.java
@@ -18,12 +18,15 @@
package org.apache.ignite.internal.processors.cache.mvcc;
import java.util.concurrent.Callable;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheServerNotFoundException;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
@@ -31,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffini
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.Transaction;
import org.junit.Ignore;
import org.junit.Test;
@@ -171,6 +175,83 @@ public abstract class CacheMvccAbstractSqlCoordinatorFailoverTest extends CacheM
* @throws Exception If failed.
*/
@Test
+ public void testTxReadAfterCoordinatorChangeDirectOrder() throws Exception {
+ testTxReadAfterCoordinatorChange(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTxReadAfterCoordinatorChangeReverseOrder() throws Exception {
+ testTxReadAfterCoordinatorChange(false);
+ }
+
+ /** */
+ private void testTxReadAfterCoordinatorChange(boolean directOrder) throws Exception {
+ ccfg = cacheConfiguration(cacheMode(), FULL_SYNC, 0, DFLT_PARTITION_COUNT)
+ .setIndexedTypes(Integer.class, Integer.class).setNodeFilter(new CoordinatorNodeFilter());
+
+ MvccProcessorImpl.coordinatorAssignClosure(new CoordinatorAssignClosure());
+
+ IgniteEx node = startGrid(0);
+
+ nodeAttr = CRD_ATTR;
+
+ startGrid(1);
+
+ IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+
+ cache.put(1,1);
+
+ Semaphore sem = new Semaphore(0);
+
+ IgniteInternalFuture future = GridTestUtils.runAsync(() -> {
+ IgniteCache<Integer, Integer> cache0 = node.cache(DEFAULT_CACHE_NAME);
+
+ try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ Integer res = cache0.get(1);
+
+ sem.release();
+
+ // wait for coordinator change.
+ assertTrue(sem.tryAcquire(2, getTestTimeout(), TimeUnit.MILLISECONDS));
+
+ assertEquals(res, cache0.get(1));
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertTrue(sem.tryAcquire(getTestTimeout(), TimeUnit.MILLISECONDS));
+
+ if (directOrder)
+ stopGrid(1);
+
+ MvccProcessorImpl prc = mvccProcessor(startGrid(2));
+
+ if (!directOrder)
+ stopGrid(1);
+
+ awaitPartitionMapExchange();
+
+ MvccCoordinator crd = prc.currentCoordinator();
+
+ assert crd.local() && crd.initialized();
+
+ cache.put(1,2);
+ cache.put(1,3);
+
+ sem.release(2);
+
+ future.get(getTestTimeout());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
public void testStartLastServerFails() throws Exception {
testSpi = true;