You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2021/04/02 08:50:13 UTC
[ignite] branch master updated: IGNITE-13873 Milti-cell transaction changes may be not visible (durin… (#8822)
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cfebe24 IGNITE-13873 Milti-cell transaction changes may be not visible (durin… (#8822)
cfebe24 is described below
commit cfebe2450f1f82ced2b63aeeb48b2326aa388e1f
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Fri Apr 2 11:49:56 2021 +0300
IGNITE-13873 Milti-cell transaction changes may be not visible (durin… (#8822)
---
.../org/apache/ignite/internal/IgnitionEx.java | 23 +-
.../communication/GridIoMessageFactory.java | 4 -
.../cache/CacheAffinitySharedManager.java | 31 --
.../processors/cache/GridCacheSharedContext.java | 7 +-
.../distributed/GridCacheTxRecoveryFuture.java | 2 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 80 +---
.../msg/PartitionCountersNeighborcastRequest.java | 173 -------
.../msg/PartitionCountersNeighborcastResponse.java | 141 ------
.../cache/transactions/IgniteTxHandler.java | 57 ---
.../cache/transactions/IgniteTxManager.java | 41 +-
.../PartitionCountersNeighborcastFuture.java | 237 ---------
...GridExchangeFreeCellularSwitchAbstractTest.java | 134 +++++-
...ngeFreeCellularSwitchComplexOperationsTest.java | 76 +--
...ridExchangeFreeCellularSwitchIsolationTest.java | 532 ++++++++++++---------
...changeFreeCellularSwitchTxContinuationTest.java | 264 ++++++++++
...idExchangeFreeCellularSwitchTxCountersTest.java | 242 ++++++++++
.../TxPartitionCounterStateConsistencyTest.java | 6 +-
...teOnePrimaryTwoBackupsHistoryRebalanceTest.java | 160 +++++++
...titionCounterStateOnePrimaryTwoBackupsTest.java | 18 +-
...titionCounterStateTwoPrimaryTwoBackupsTest.java | 2 +-
.../junits/common/GridCommonAbstractTest.java | 39 +-
.../testsuites/IgniteCacheMvccTestSuite5.java | 4 +
.../ignite/testsuites/IgniteCacheTestSuite5.java | 4 +
23 files changed, 1215 insertions(+), 1062 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 3104269..e123278 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1121,7 +1121,28 @@ public class IgnitionEx {
}
if (old != null)
- if (failIfStarted) {
+ if (old.grid() == null) { // Stopped but not removed from map yet.
+ boolean replaced;
+
+ if (name != null)
+ replaced = grids.replace(name, old, grid);
+ else {
+ synchronized (dfltGridMux) {
+ replaced = old == dfltGrid;
+
+ if (replaced)
+ dfltGrid = grid;
+ }
+ }
+
+ if (!replaced) {
+ throw new IgniteCheckedException("Ignite instance with this name has been concurrently started: " +
+ name);
+ }
+ else
+ notifyStateChange(old.getName(), old.state());
+ }
+ else if (failIfStarted) {
if (name == null)
throw new IgniteCheckedException("Default Ignite instance has already been started.");
else
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 84a84cd..d39f7c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -136,8 +136,6 @@ import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccQuerySnapshotReq
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccRecoveryFinishedMessage;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccSnapshotResponse;
import org.apache.ignite.internal.processors.cache.mvcc.msg.MvccTxSnapshotRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -362,8 +360,6 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register((short)162, GenerateEncryptionKeyRequest::new);
factory.register((short)163, GenerateEncryptionKeyResponse::new);
factory.register((short)164, MvccRecoveryFinishedMessage::new);
- factory.register((short)165, PartitionCountersNeighborcastRequest::new);
- factory.register((short)166, PartitionCountersNeighborcastResponse::new);
factory.register((short)167, ServiceDeploymentProcessId::new);
factory.register((short)168, ServiceSingleNodeDeploymentResultBatch::new);
factory.register((short)169, ServiceSingleNodeDeploymentResult::new);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index ff906be..199c5e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -82,7 +82,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheMode.LOCAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -2641,36 +2640,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @return Primary nodes for local backups.
- */
- public Set<ClusterNode> idealPrimaryNodesForLocalBackups() {
- Set<ClusterNode> res = new GridConcurrentHashSet<>();
-
- ClusterNode loc = cctx.localNode();
-
- forAllCacheGroups(new IgniteInClosureX<GridAffinityAssignmentCache>() {
- @Override public void applyx(GridAffinityAssignmentCache aff) {
- CacheGroupDescriptor desc = cachesRegistry.group(aff.groupId());
-
- if (desc.config().getCacheMode() == PARTITIONED) {
- List<List<ClusterNode>> assignment = aff.idealAssignmentRaw();
-
- HashSet<ClusterNode> primaries = new HashSet<>();
-
- for (List<ClusterNode> nodes : assignment) {
- if (nodes.indexOf(loc) > 0)
- primaries.add(nodes.get(0));
- }
-
- res.addAll(primaries);
- }
- }
- });
-
- return res;
- }
-
- /**
*
*/
abstract class CacheGroupHolder {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c933f98..f3495ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -72,7 +72,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.PluginProvider;
import org.jetbrains.annotations.Nullable;
@@ -979,12 +978,10 @@ public class GridCacheSharedContext<K, V> {
*
* @param topVer Topology version.
* @param node Failed node.
- * @param filter Recovery filter.
* @return {@code true} if waiting was successful.
*/
- public IgniteInternalFuture<?> partitionRecoveryFuture(AffinityTopologyVersion topVer, ClusterNode node,
- IgnitePredicate<IgniteInternalTx> filter) {
- return tm().recoverLocalTxs(topVer, node, filter);
+ public IgniteInternalFuture<?> partitionRecoveryFuture(AffinityTopologyVersion topVer, ClusterNode node) {
+ return tm().recoverLocalTxs(topVer, node);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 64034ea..43126b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -260,7 +260,7 @@ public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<B
for (Map.Entry<UUID, Collection<UUID>> entry : txNodes.entrySet()) {
UUID nodeId = entry.getKey();
- // Skip left nodes and local node.
+ // Skipping iteration when local node is one of tx's primary.
if (!nodes.containsKey(nodeId) && nodeId.equals(cctx.localNodeId()))
continue;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 032df1e..dcb131f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -101,8 +101,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.topology.Grid
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
@@ -129,7 +127,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
@@ -394,8 +391,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** Partitions scheduled for clearing before rebalancing for this topology version. */
private Map<Integer, Set<Integer>> clearingPartitions;
- /** Specified only in case of 'cluster is fully rebalanced' state achieved. */
- private volatile RebalancedInfo rebalancedInfo;
+ /** This future finished with 'cluster is fully rebalanced' state. */
+ private volatile boolean rebalanced;
/** Some of owned by affinity partitions were changed state to moving on this exchange. */
private volatile boolean affinityReassign;
@@ -1637,37 +1634,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
if (context().exchangeFreeSwitch() && isBaselineNodeFailed()) {
// Currently MVCC does not support operations on partially switched cluster.
if (cctx.kernalContext().coordinators().mvccEnabled())
- waitPartitionRelease(EXCHANGE_FREE_LATCH_ID, true, false, null);
- else {
- boolean partitionedRecoveryRequired = rebalancedInfo.primaryNodes.contains(firstDiscoEvt.eventNode());
-
- IgnitePredicate<IgniteInternalTx> replicatedOnly = tx -> {
- Collection<IgniteTxEntry> entries = tx.writeEntries();
- for (IgniteTxEntry entry : entries)
- if (cctx.cacheContext(entry.cacheId()).isReplicated())
- return true;
-
- // Checks only affected nodes contain replicated-free txs with failed primaries.
- assert partitionedRecoveryRequired;
-
- return false;
- };
-
- // Assuming that replicated transactions are absent, non-affected nodes will wait only this short sync.
- waitPartitionRelease(EXCHANGE_FREE_LATCH_ID + "-replicated", true, false, replicatedOnly);
-
- String partitionedLatchId = EXCHANGE_FREE_LATCH_ID + "-partitioned";
-
- if (partitionedRecoveryRequired)
- // This node contain backup partitions for failed partitioned caches primaries. Waiting for recovery.
- waitPartitionRelease(partitionedLatchId, true, false, null);
- else {
- // This node contain no backup partitions for failed partitioned caches primaries. Recovery is not needed.
- Latch releaseLatch = cctx.exchange().latch().getOrCreate(partitionedLatchId, initialVersion());
-
- releaseLatch.countDown(); // Await-free confirmation.
- }
- }
+ waitPartitionRelease(EXCHANGE_FREE_LATCH_ID, true, false);
+ else
+ waitPartitionRelease(null, false, false);
}
else if (!skipWaitOnLocalJoin) { // Skip partition release if node has locally joined (it doesn't have any updates to be finished).
boolean distributed = true;
@@ -1677,11 +1646,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
distributed = false;
// On first phase we wait for finishing all local tx updates, atomic updates and lock releases on all nodes.
- waitPartitionRelease(EXCHANGE_LATCH_ID, distributed, true, null);
+ waitPartitionRelease(EXCHANGE_LATCH_ID, distributed, true);
// Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase.
if (distributed)
- waitPartitionRelease(EXCHANGE_LATCH_ID, false, false, null);
+ waitPartitionRelease(EXCHANGE_LATCH_ID, false, false);
}
else {
if (log.isInfoEnabled())
@@ -1892,17 +1861,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @param distributed If {@code true} then node should wait for partition release completion on all other nodes.
* @param doRollback If {@code true} tries to rollback transactions which lock partitions. Avoids unnecessary calls
* of {@link org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager#rollbackOnTopologyChange}
- * @param filter Recovery filter.
*
* @throws IgniteCheckedException If failed.
*/
private void waitPartitionRelease(
String latchId,
boolean distributed,
- boolean doRollback,
- IgnitePredicate<IgniteInternalTx> filter) throws IgniteCheckedException {
- assert context().exchangeFreeSwitch() || filter == null;
-
+ boolean doRollback) throws IgniteCheckedException {
Latch releaseLatch = null;
IgniteInternalFuture<?> partReleaseFut;
@@ -1915,7 +1880,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
releaseLatch = cctx.exchange().latch().getOrCreate(latchId, initialVersion());
partReleaseFut = context().exchangeFreeSwitch() && isBaselineNodeFailed() ?
- cctx.partitionRecoveryFuture(initialVersion(), firstDiscoEvt.eventNode(), filter) :
+ cctx.partitionRecoveryFuture(initialVersion(), firstDiscoEvt.eventNode()) :
cctx.partitionReleaseFuture(initialVersion());
// Assign to class variable so it will be included into toString() method.
@@ -2041,9 +2006,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
cctx.exchange().exchangerBlockingSectionEnd();
}
}
- }
- timeBag.finishGlobalStage("Wait partitions release [latch=" + latchId + "]");
+ timeBag.finishGlobalStage("Wait partitions release [latch=" + latchId + "]");
+ }
if (releaseLatch == null) {
assert !distributed : "Partitions release latch must be initialized in distributed mode.";
@@ -5438,7 +5403,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
* @return {@code True} if cluster fully rebalanced.
*/
public boolean rebalanced() {
- return rebalancedInfo != null;
+ return rebalanced;
}
/**
@@ -5458,16 +5423,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
private void markRebalanced() {
assert !rebalanced();
- rebalancedInfo = new RebalancedInfo(cctx.affinity().idealPrimaryNodesForLocalBackups());
+ rebalanced = true;
}
/**
* Keeps cluster fully rebalanced flag.
*/
private void keepRebalanced() {
- assert !rebalanced() && wasRebalanced();
+ assert wasRebalanced();
- rebalancedInfo = sharedContext().exchange().lastFinishedFuture().rebalancedInfo;
+ markRebalanced();
}
/**
@@ -5808,19 +5773,4 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
/** This exchange was merged with another one. */
MERGED
}
-
- /**
- *
- */
- private static class RebalancedInfo {
- /** Primary nodes for local backups for all registered partitioned caches. */
- private Set<ClusterNode> primaryNodes;
-
- /**
- * @param primaryNodes Primary nodes for local backups.
- */
- public RebalancedInfo(Set<ClusterNode> primaryNodes) {
- this.primaryNodes = primaryNodes;
- }
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
deleted file mode 100644
index c053e10..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastRequest.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc.msg;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
-
-/** */
-public class PartitionCountersNeighborcastRequest extends GridCacheIdMessage {
- /** */
- private static final long serialVersionUID = -1893577108462486998L;
-
- /** */
- @GridDirectCollection(PartitionUpdateCountersMessage.class)
- private Collection<PartitionUpdateCountersMessage> updCntrs;
-
- /** */
- private IgniteUuid futId;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** */
- public PartitionCountersNeighborcastRequest() {
- }
-
- /** */
- public PartitionCountersNeighborcastRequest(
- Collection<PartitionUpdateCountersMessage> updCntrs,
- IgniteUuid futId,
- @NotNull AffinityTopologyVersion topVer
- ) {
- this.updCntrs = updCntrs;
- this.futId = futId;
- this.topVer = topVer;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Partition update counters for remote node.
- */
- public Collection<PartitionUpdateCountersMessage> updateCounters() {
- return updCntrs;
- }
-
- /**
- * @return Sending future id.
- */
- public IgniteUuid futId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection("updCntrs", updCntrs, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- topVer = reader.readAffinityTopologyVersion("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- updCntrs = reader.readCollection("updCntrs", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(PartitionCountersNeighborcastRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 165;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 7;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
deleted file mode 100644
index 44a6949..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/msg/PartitionCountersNeighborcastResponse.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc.msg;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/** */
-public class PartitionCountersNeighborcastResponse extends GridCacheIdMessage {
- /** */
- private static final long serialVersionUID = -8731050539139260521L;
-
- /** */
- private IgniteUuid futId;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** */
- public PartitionCountersNeighborcastResponse() {
- }
-
- /** */
- public PartitionCountersNeighborcastResponse(
- IgniteUuid futId,
- AffinityTopologyVersion topVer
- ) {
- this.futId = futId;
- this.topVer = topVer;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Sending future id.
- */
- public IgniteUuid futId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeIgniteUuid("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeAffinityTopologyVersion("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- futId = reader.readIgniteUuid("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- topVer = reader.readAffinityTopologyVersion("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(PartitionCountersNeighborcastResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 166;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 6;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 90398f2..cb1c33a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -78,8 +78,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxRemote;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastResponse;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -282,20 +280,6 @@ public class IgniteTxHandler {
processCheckPreparedTxResponse(nodeId, res);
}
});
-
- ctx.io().addCacheHandler(0, PartitionCountersNeighborcastRequest.class,
- new CI2<UUID, PartitionCountersNeighborcastRequest>() {
- @Override public void apply(UUID nodeId, PartitionCountersNeighborcastRequest req) {
- processPartitionCountersRequest(nodeId, req);
- }
- });
-
- ctx.io().addCacheHandler(0, PartitionCountersNeighborcastResponse.class,
- new CI2<UUID, PartitionCountersNeighborcastResponse>() {
- @Override public void apply(UUID nodeId, PartitionCountersNeighborcastResponse res) {
- processPartitionCountersResponse(nodeId, res);
- }
- });
}
/**
@@ -2267,47 +2251,6 @@ public class IgniteTxHandler {
}
/**
- * @param nodeId Node id.
- * @param req Request.
- */
- private void processPartitionCountersRequest(UUID nodeId, PartitionCountersNeighborcastRequest req) {
- try {
- applyPartitionsUpdatesCounters(req.updateCounters(), true, false);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
-
- try {
- ctx.io().send(nodeId, new PartitionCountersNeighborcastResponse(req.futId(), req.topologyVersion()), SYSTEM_POOL);
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (txRecoveryMsgLog.isDebugEnabled())
- txRecoveryMsgLog.debug("Failed to send partition counters response, node left [node=" + nodeId + ']');
- }
- catch (IgniteCheckedException e) {
- U.error(txRecoveryMsgLog, "Failed to send partition counters response [node=" + nodeId + ']', e);
- }
- }
-
- /**
- * @param nodeId Node id.
- * @param res Response.
- */
- private void processPartitionCountersResponse(UUID nodeId, PartitionCountersNeighborcastResponse res) {
- PartitionCountersNeighborcastFuture fut = ((PartitionCountersNeighborcastFuture)ctx.mvcc().future(res.futId()));
-
- if (fut == null) {
- log.warning("Failed to find future for partition counters response [futId=" + res.futId() +
- ", node=" + nodeId + ']');
-
- return;
- }
-
- fut.onResult(nodeId);
- }
-
- /**
* Applies partition counter updates for transactions.
* <p>
* Called after entries are written to WAL on commit or during rollback to close gaps in update counter sequence.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 53320e6..b189a3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -112,7 +112,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteOutClosure;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.spi.systemview.view.TransactionView;
import org.apache.ignite.transactions.TransactionConcurrency;
@@ -918,12 +917,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
*
* @param topVer Topology version.
* @param node Failed node.
- * @param filter Recovery filter.
* @return Future that will be completed when all affected transactions are recovered.
*/
- public IgniteInternalFuture<Boolean> recoverLocalTxs(AffinityTopologyVersion topVer, ClusterNode node,
- IgnitePredicate<IgniteInternalTx> filter) {
-
+ public IgniteInternalFuture<Boolean> recoverLocalTxs(AffinityTopologyVersion topVer, ClusterNode node) {
GridCompoundFuture<IgniteInternalTx, Boolean> res =
new CacheObjectsReleaseFuture<>(
"TxRecovery",
@@ -939,11 +935,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
});
for (IgniteInternalTx tx : activeTransactions()) {
- if (tx.dht() && !tx.local() && tx.originatingNodeId().equals(node.id())) {
+ if (tx.transactionNodes() != null && tx.transactionNodes().containsKey(node.id()) // One of tx primaries failed.
+ && (tx.dht() // Local node is a primary (not on originating) or backup.
+ || (tx.near() && tx.local() && ((GridNearTxLocal)tx).colocatedLocallyMapped()))) { // Local node is a primary (on originating).
assert needWaitTransaction(tx, topVer);
- if (filter == null || filter.apply(tx))
- res.add(tx.finishFuture());
+ res.add(tx.finishFuture());
}
}
@@ -2470,26 +2467,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (commit)
tx.commitAsync().listen(new CommitListener(tx));
- else if (!tx.local())
- // remote (backup) transaction sends partition counters to other backup transaction on recovery rollback
- // in order to keep counters consistent
- neighborcastPartitionCountersAndRollback(tx);
- else
- tx.rollbackAsync();
- }
+ else if (!tx.local()) {
+ // This tx was rolled back on recovery because of primary node fail, other backups may be not aware of it.
+ TxCounters cnts = tx.txCounters(false);
- /** */
- private void neighborcastPartitionCountersAndRollback(IgniteInternalTx tx) {
- TxCounters txCounters = tx.txCounters(false);
-
- if (txCounters == null || txCounters.updateCounters() == null)
- tx.rollbackAsync();
-
- PartitionCountersNeighborcastFuture fut = new PartitionCountersNeighborcastFuture(tx, cctx);
-
- fut.listen(fut0 -> tx.rollbackAsync());
+ if (cnts != null)
+ // Skipping counters update to keep them the same everywhere without any sync.
+ // Tx counters will be finalized (gaps removed) on local txs recovery finish.
+ // Each node will have counters equals to latest successful transactions counters.
+ cnts.updateCounters().clear();
+ }
- fut.init();
+ tx.rollbackAsync();
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
deleted file mode 100644
index a4ac865..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/PartitionCountersNeighborcastFuture.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.transactions;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
-import org.apache.ignite.internal.processors.cache.mvcc.msg.PartitionCountersNeighborcastRequest;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.lang.GridPlainRunnable;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
-
-/**
- * Represents partition update counters delivery to remote nodes.
- */
-public class PartitionCountersNeighborcastFuture extends GridCacheCompoundIdentityFuture<Void> {
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** */
- @GridToStringExclude
- private boolean trackable = true;
-
- /** */
- private final GridCacheSharedContext<?, ?> cctx;
-
- /** */
- private final IgniteInternalTx tx;
-
- /** */
- private final IgniteLogger log;
-
- /** */
- public PartitionCountersNeighborcastFuture(
- IgniteInternalTx tx, GridCacheSharedContext<?, ?> cctx) {
- super(null);
-
- this.tx = tx;
-
- this.cctx = cctx;
-
- log = cctx.logger(CU.TX_MSG_RECOVERY_LOG_CATEGORY);
- }
-
- /**
- * Starts processing.
- */
- public void init() {
- if (log.isInfoEnabled()) {
- log.info("Starting delivery partition countres to remote nodes [txId=" + tx.nearXidVersion() +
- ", futId=" + futId);
- }
-
- HashSet<UUID> siblings = siblingBackups();
-
- cctx.mvcc().addFuture(this, futId);
-
- for (UUID peer : siblings) {
- List<PartitionUpdateCountersMessage> cntrs = cctx.tm().txHandler()
- .filterUpdateCountersForBackupNode(tx, cctx.node(peer));
-
- if (F.isEmpty(cntrs))
- continue;
-
- MiniFuture miniFut = new MiniFuture(peer);
-
- try {
- // we must add mini future before sending a message, otherwise mini future must miss completion
- add(miniFut);
-
- cctx.io().send(peer, new PartitionCountersNeighborcastRequest(cntrs, futId, tx.topologyVersion()), SYSTEM_POOL);
- }
- catch (IgniteCheckedException e) {
- if (!(e instanceof ClusterTopologyCheckedException))
- log.warning("Failed to send partition counters to remote node [node=" + peer + ']', e);
- else
- logNodeLeft(peer);
-
- miniFut.onDone();
- }
- }
-
- markInitialized();
- }
-
- /** */
- private HashSet<UUID> siblingBackups() {
- Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
-
- assert txNodes != null;
-
- UUID locNodeId = cctx.localNodeId();
-
- HashSet<UUID> siblings = new HashSet<>();
-
- txNodes.values().stream()
- .filter(backups -> backups.contains(locNodeId))
- .forEach(siblings::addAll);
-
- siblings.remove(locNodeId);
-
- return siblings;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
- boolean comp = super.onDone(res, err);
-
- if (comp)
- cctx.mvcc().removeFuture(futId);
-
- return comp;
- }
-
- /**
- * Processes a response from a remote peer. Completes a mini future for that peer.
- *
- * @param nodeId Remote peer node id.
- */
- public void onResult(UUID nodeId) {
- if (log.isInfoEnabled()) {
- log.info("Remote peer acked partition counters delivery [futId=" + futId +
- ", node=" + nodeId + ']');
- }
-
- completeMini(nodeId);
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- logNodeLeft(nodeId);
-
- // if a left node is one of remote peers then a mini future for it is completed successfully
- completeMini(nodeId);
-
- return true;
- }
-
- /** */
- private void completeMini(UUID nodeId) {
- for (IgniteInternalFuture<?> fut : futures()) {
- assert fut instanceof MiniFuture;
-
- MiniFuture mini = (MiniFuture)fut;
-
- if (mini.nodeId.equals(nodeId)) {
- cctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)mini::onDone);
-
- return;
- }
- }
-
- if (log.isInfoEnabled()) {
- log.info("Failed to find mini future corresponding to node, can prevent parent future completion [" +
- "futId=" + futId +
- ", nodeId=" + nodeId + ']');
- }
- }
-
- /** */
- private void logNodeLeft(UUID nodeId) {
- if (log.isInfoEnabled()) {
- log.info("Failed during partition counters delivery to remote node. " +
- "Node left cluster (will ignore) [futId=" + futId +
- ", node=" + nodeId + ']');
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- trackable = false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PartitionCountersNeighborcastFuture.class, this, "innerFuts", futures());
- }
-
- /**
- * Component of compound parent future. Represents interaction with one of remote peers.
- */
- private static class MiniFuture extends GridFutureAdapter<Void> {
- /** */
- private final UUID nodeId;
-
- /** */
- private MiniFuture(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone());
- }
- }
-}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
index 1f5ec6c..9a9cb98 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchAbstractTest.java
@@ -19,8 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
@@ -147,7 +150,7 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
}
return true;
- }, 5000));
+ }, 10000));
}
/**
@@ -170,14 +173,9 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
*
*/
protected void checkTransactionsCount(
- Ignite orig,
- int origCnt,
- Ignite primary,
- int primaryCnt,
- List<Ignite> backupNodes,
- int backupCnt,
- List<Ignite> nearNodes,
- int nearCnt,
+ Ignite origNode, int origCnt,
+ List<Ignite> brokenCellNodes, int brokenCellCnt,
+ List<Ignite> aliveCellNodes, int aliveCellCnt,
Set<GridCacheVersion> vers) {
Function<Ignite, Collection<GridCacheVersion>> txs = ignite -> {
Collection<IgniteInternalTx> active = ((IgniteEx)ignite).context().cache().context().tm().activeTransactions();
@@ -189,19 +187,77 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
.collect(Collectors.toSet());
};
- if (orig != null)
- assertEquals(origCnt, txs.apply(orig).size());
+ long till = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
- if (primary != null && primary != orig)
- assertEquals(primaryCnt, txs.apply(primary).size());
+ while (true) {
+ try {
+ if (origNode != null)
+ assertEquals(origCnt, txs.apply(origNode).size());
- for (Ignite backup : backupNodes)
- if (backup != orig)
- assertEquals(backupCnt, txs.apply(backup).size());
+ for (Ignite brokenCellNode : brokenCellNodes)
+ if (brokenCellNode != origNode)
+ assertEquals(brokenCellCnt, txs.apply(brokenCellNode).size());
- for (Ignite near : nearNodes)
- if (near != orig)
- assertEquals(nearCnt, txs.apply(near).size());
+ for (Ignite aliveCellNode : aliveCellNodes)
+ if (aliveCellNode != origNode)
+ assertEquals(aliveCellCnt, txs.apply(aliveCellNode).size());
+
+ break;
+ }
+ catch (Throwable err) {
+ if (System.nanoTime() > till)
+ throw err;
+ }
+ }
+
+ }
+
+ /**
+ *
+ */
+ protected CellularCluster resolveCellularCluster(int nodes, TransactionCoordinatorNode startFrom) throws Exception {
+ Ignite failed = G.allGrids().get(new Random().nextInt(nodes));
+
+ Integer cellKey = primaryKey(failed.getOrCreateCache(PART_CACHE_NAME));
+
+ List<Ignite> brokenCellNodes = backupNodes(cellKey, PART_CACHE_NAME);
+ List<Ignite> aliveCellNodes = new ArrayList<>(G.allGrids());
+
+ aliveCellNodes.remove(failed);
+ aliveCellNodes.removeAll(brokenCellNodes);
+
+ assertTrue(Collections.disjoint(brokenCellNodes, aliveCellNodes));
+ assertEquals(nodes / 2 - 1, brokenCellNodes.size());
+ assertEquals(nodes / 2, aliveCellNodes.size());
+
+ Ignite orig;
+
+ switch (startFrom) {
+ case FAILED:
+ orig = failed;
+
+ break;
+
+ case BROKEN_CELL:
+ orig = brokenCellNodes.get(new Random().nextInt(brokenCellNodes.size()));
+
+ break;
+
+ case ALIVE_CELL:
+ orig = aliveCellNodes.get(new Random().nextInt(aliveCellNodes.size()));
+
+ break;
+
+ case CLIENT:
+ orig = startClientGrid();
+
+ break;
+
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ return new CellularCluster(orig, failed, brokenCellNodes, aliveCellNodes);
}
/**
@@ -275,16 +331,44 @@ public abstract class GridExchangeFreeCellularSwitchAbstractTest extends GridCom
* Specifies node starts the transaction (originating node).
*/
protected enum TransactionCoordinatorNode {
- /** Primary. */
- PRIMARY,
+ /** Failed. */
+ FAILED,
- /** Backup. */
- BACKUP,
+ /**Broken cell. */
+ BROKEN_CELL,
- /** Near. */
- NEAR,
+ /** Alive cell. */
+ ALIVE_CELL,
/** Client. */
CLIENT
}
+
+ /**
+ *
+ */
+ protected static class CellularCluster {
+ /** Originating node. */
+ public Ignite orig;
+
+ /** Failed node. */
+ public Ignite failed;
+
+ /** Broken cell's nodes. */
+ public List<Ignite> brokenCellNodes;
+
+ /** Alive cell's nodes. */
+ public List<Ignite> aliveCellNodes;
+
+ /**
+ *
+ */
+ public CellularCluster(Ignite orig, Ignite failed, List<Ignite> brokenCellNodes,
+ List<Ignite> aliveCellNodes) {
+ this.orig = orig;
+ this.failed = failed;
+ this.brokenCellNodes = brokenCellNodes;
+ this.aliveCellNodes = aliveCellNodes;
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchComplexOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchComplexOperationsTest.java
index 1586361..4a392c0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchComplexOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchComplexOperationsTest.java
@@ -19,10 +19,8 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -75,7 +73,8 @@ public class GridExchangeFreeCellularSwitchComplexOperationsTest extends GridExc
}
/**
- *
+ * Test checks that txs will be recovered on Cellular switch if prepared, regardless of their content,
+ * as well as upcoming txs will be committed.
*/
@Test
public void testComplexOperationsRecoveryOnCellularSwitch() throws Exception {
@@ -85,46 +84,12 @@ public class GridExchangeFreeCellularSwitchComplexOperationsTest extends GridExc
blockRecoveryMessages();
- Ignite failed = G.allGrids().get(new Random().nextInt(nodes));
-
- Integer partKey = primaryKey(failed.getOrCreateCache(PART_CACHE_NAME));
-
- List<Ignite> backupNodes = backupNodes(partKey, PART_CACHE_NAME);
- List<Ignite> nearNodes = new ArrayList<>(G.allGrids());
-
- nearNodes.remove(failed);
- nearNodes.removeAll(backupNodes);
-
- assertTrue(Collections.disjoint(backupNodes, nearNodes));
- assertEquals(nodes / 2 - 1, backupNodes.size()); // Cell 1.
- assertEquals(nodes / 2, nearNodes.size()); // Cell 2.
-
- Ignite orig;
-
- switch (startFrom) {
- case PRIMARY:
- orig = failed;
-
- break;
+ CellularCluster cluster = resolveCellularCluster(nodes, startFrom);
- case BACKUP:
- orig = backupNodes.get(0);
-
- break;
-
- case NEAR:
- orig = nearNodes.get(0);
-
- break;
-
- case CLIENT:
- orig = startClientGrid();
-
- break;
-
- default:
- throw new UnsupportedOperationException();
- }
+ Ignite orig = cluster.orig;
+ Ignite failed = cluster.failed;
+ List<Ignite> brokenCellNodes = cluster.brokenCellNodes;
+ List<Ignite> aliveCellNodes = cluster.aliveCellNodes;
int recFutsCnt = 7;
@@ -227,14 +192,14 @@ public class GridExchangeFreeCellularSwitchComplexOperationsTest extends GridExc
futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(2), 1));
futs.add(multithreadedAsync(() -> putEverywhereToBoth.accept(10), 1));
- Consumer<Boolean> singleTxPerCell = (partAtCell1) -> {
+ Consumer<Boolean> singleTxPerCell = (partAtBrokenCell) -> {
try {
Transaction tx = orig.transactions().txStart(concurrency, isolation);
- Integer pKey = partAtCell1 ? nextPrimaryKey.apply(failed, PART_CACHE_NAME) :
- nextPrimaryKey.apply(nearNodes.get(0), PART_CACHE_NAME);
+ Integer pKey = partAtBrokenCell ? nextPrimaryKey.apply(failed, PART_CACHE_NAME) :
+ nextPrimaryKey.apply(aliveCellNodes.get(0), PART_CACHE_NAME);
- Integer rKey = partAtCell1 ? nextPrimaryKey.apply(nearNodes.get(0), REPL_CACHE_NAME) :
+ Integer rKey = partAtBrokenCell ? nextPrimaryKey.apply(aliveCellNodes.get(0), REPL_CACHE_NAME) :
nextPrimaryKey.apply(failed, REPL_CACHE_NAME);
IgniteCache<Integer, Integer> pCache = orig.getOrCreateCache(PART_CACHE_NAME);
@@ -309,14 +274,14 @@ public class GridExchangeFreeCellularSwitchComplexOperationsTest extends GridExc
}
};
- for (Ignite backup : backupNodes) {
- futs.add(multithreadedAsync(() -> partTxRun.accept(backup), 1));
- futs.add(multithreadedAsync(() -> replTxRun.accept(backup), 1));
+ for (Ignite brokenCellNode : brokenCellNodes) {
+ futs.add(multithreadedAsync(() -> partTxRun.accept(brokenCellNode), 1));
+ futs.add(multithreadedAsync(() -> replTxRun.accept(brokenCellNode), 1));
}
- for (Ignite near : nearNodes) {
- futs.add(multithreadedAsync(() -> partTxRun.accept(near), 1));
- futs.add(multithreadedAsync(() -> replTxRun.accept(near), 1));
+ for (Ignite aliveCellNode : aliveCellNodes) {
+ futs.add(multithreadedAsync(() -> partTxRun.accept(aliveCellNode), 1));
+ futs.add(multithreadedAsync(() -> replTxRun.accept(aliveCellNode), 1));
}
// Allowing recovery.
@@ -346,9 +311,8 @@ public class GridExchangeFreeCellularSwitchComplexOperationsTest extends GridExc
// Final check that any transactions are absent.
checkTransactionsCount(
null, 0,
- null, 0,
- backupNodes, 0,
- nearNodes, 0,
- null);
+ brokenCellNodes, 0,
+ aliveCellNodes, 0,
+ null /*any*/);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
index 3b3af0b..6d93e89 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchIsolationTest.java
@@ -19,12 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
@@ -69,111 +68,186 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
}
/**
+ * Tests checks that switch finished only when all transactions required recovery are recovered.
+ * Based on corner case found at TeamCity runs:
*
+ * We have 2 cells, the first contains partitions for k1, second for k2.
+ * Tx with put(k1,v1) and put(k2,v2) started and prepared.
+ * Then node from the first cell, which is the primary for k1, failed.
+ * The second cell (with key2) should NOT finish the cellular switch before tx recovered,
+ * otherwice stale data read is possible.
*/
@Test
- public void testOnlyAffectedNodesWaitForRecovery() throws Exception {
+ public void testMutliKeyTxRecoveryHappenBeforeTheSwitchOnCellularSwitch() throws Exception {
int nodes = 6;
- String recoveryStatusMsg = "TxRecovery Status and Timings [txs=";
+ startGridsMultiThreaded(nodes);
- LogListener lsnrAny = matches(recoveryStatusMsg).build(); // Any.
- LogListener lsnrBackup = matches(recoveryStatusMsg).times((nodes / 2) - 1).build(); // Cell 1 (backups).
- LogListener lsnrNear = matches(recoveryStatusMsg).times((nodes / 2)).build(); // Cell 2 (near).
+ blockRecoveryMessages();
- listeningLog.registerListener(lsnrAny);
+ CellularCluster cluster = resolveCellularCluster(nodes, startFrom);
- startGridsMultiThreaded(nodes);
+ Ignite orig = cluster.orig;
+ Ignite failed = cluster.failed;
+ List<Ignite> brokenCellNodes = cluster.brokenCellNodes;
+ List<Ignite> aliveCellNodes = cluster.aliveCellNodes;
- blockRecoveryMessages();
+ CountDownLatch prepLatch = new CountDownLatch(1);
+ CountDownLatch commitLatch = new CountDownLatch(1);
+
+ AtomicInteger key = new AtomicInteger();
+
+ // Puts 2 entries, each on it's own cell.
+ IgniteInternalFuture<?> putFut = multithreadedAsync(() -> {
+ try {
+ Transaction tx = orig.transactions().txStart();
+
+ IgniteCache<Integer, Integer> cache = orig.getOrCreateCache(PART_CACHE_NAME);
- Ignite failed = G.allGrids().get(new Random().nextInt(nodes));
+ cache.put(primaryKey(failed.getOrCreateCache(PART_CACHE_NAME)), 42);
- Integer partKey = primaryKey(failed.getOrCreateCache(PART_CACHE_NAME));
- Integer replKey = primaryKey(failed.getOrCreateCache(REPL_CACHE_NAME));
+ key.set(primaryKey(aliveCellNodes.get(0).getOrCreateCache(PART_CACHE_NAME)));
- List<Ignite> backupNodes = backupNodes(partKey, PART_CACHE_NAME);
- List<Ignite> nearNodes = new ArrayList<>(G.allGrids());
+ cache.put(key.get(), key.get());
- nearNodes.remove(failed);
- nearNodes.removeAll(backupNodes);
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ prepLatch.countDown();
- assertTrue(Collections.disjoint(backupNodes, nearNodes));
- assertEquals(nodes / 2 - 1, backupNodes.size()); // Cell 1.
- assertEquals(nodes / 2, nearNodes.size()); // Cell 2.
+ commitLatch.await();
- Ignite orig;
+ if (orig != failed)
+ ((TransactionProxyImpl<?, ?>)tx).commit();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, 1);
+
+ prepLatch.await();
+
+ // Should be null white tx is uncommitted/unrecovered.
+ assertNull(aliveCellNodes.get(0).getOrCreateCache(PART_CACHE_NAME).get(key.get()));
+
+ failed.close(); // Stopping node.
- switch (startFrom) {
- case PRIMARY:
- orig = failed;
+ awaitForSwitchOnNodeLeft(failed);
- break;
+ checkTransactionsCount( // Making sure txs still unrecovered.
+ null, 0,
+ brokenCellNodes, 1,
+ aliveCellNodes, 1,
+ null /*any*/);
- case BACKUP:
- orig = backupNodes.get(0);
+ CountDownLatch getLatch = new CountDownLatch(1);
- break;
+ IgniteInternalFuture<?> getFut = multithreadedAsync(() -> {
+ try {
+ IgniteCache<Integer, Integer> cache = aliveCellNodes.get(0).getOrCreateCache(PART_CACHE_NAME);
- case NEAR:
- orig = nearNodes.get(0);
+ // Should be available for reading only after recovery happen (should be not null).
+ assertEquals((Integer)key.get(), cache.get(key.get()));
- break;
+ getLatch.countDown();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, 1);
- case CLIENT:
- orig = startClientGrid();
+ // Get should not happen while tx is not recovered.
+ assertFalse(getLatch.await(10, TimeUnit.SECONDS));
- break;
+ // Allowing recovery.
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
- default:
- throw new UnsupportedOperationException();
+ spi.stopBlock(true, blockedMsg -> true);
}
- Set<GridCacheVersion> vers = new GridConcurrentHashSet<>();
+ // Allowing commit.
+ commitLatch.countDown();
+
+ putFut.get();
+
+ // Awaiting for get on alive cell.
+ getLatch.await();
+
+ // Making sure get finished with recovered value.
+ getFut.get();
+
+ // Final check that any transactions are absent.
+ checkTransactionsCount(
+ null, 0,
+ brokenCellNodes, 0,
+ aliveCellNodes, 0,
+ null /*any*/);
+ }
+
+ /**
+ * Test checks than non-affected nodes (alive cells) finishes the switch asap,
+ * that they wait only for the recovery related to these nodes (eg. replicated caches recovery that affects every node).
+ */
+ @Test
+ public void testOnlyAffectedNodesWaitForRecovery() throws Exception {
+ int nodes = 6;
+
+ String recoveryStatusMsg = "TxRecovery Status and Timings [txs=";
+
+ LogListener lsnrAny = matches(recoveryStatusMsg).build(); // Any.
+ LogListener lsnrBrokenCell = matches(recoveryStatusMsg).times((nodes / 2) - 1 /*failed*/).build();
+ LogListener lsnrAliveCell = matches(recoveryStatusMsg).times((nodes / 2)).build();
+
+ listeningLog.registerListener(lsnrAny);
+
+ startGridsMultiThreaded(nodes);
+
+ blockRecoveryMessages();
+
+ CellularCluster cluster = resolveCellularCluster(nodes, startFrom);
+
+ Ignite orig = cluster.orig;
+ Ignite failed = cluster.failed;
+ List<Ignite> brokenCellNodes = cluster.brokenCellNodes;
+ List<Ignite> aliveCellNodes = cluster.aliveCellNodes;
+
+ List<Integer> partKeys = new ArrayList<>();
+ List<Integer> replKeys = new ArrayList<>();
+
+ for (Ignite node : G.allGrids()) {
+ if (!node.configuration().isClientMode()) {
+ partKeys.add(primaryKey(node.getOrCreateCache(PART_CACHE_NAME)));
+ replKeys.add(primaryKey(node.getOrCreateCache(REPL_CACHE_NAME)));
+ }
+ }
+
+ CountDownLatch partPreparedLatch = new CountDownLatch(nodes);
+ CountDownLatch replPreparedLatch = new CountDownLatch(nodes);
- CountDownLatch partPreparedLatch = new CountDownLatch(1);
CountDownLatch partCommitLatch = new CountDownLatch(1);
- CountDownLatch replPreparedLatch = new CountDownLatch(1);
CountDownLatch replCommitLatch = new CountDownLatch(1);
+ AtomicInteger partKeyIdx = new AtomicInteger();
+ AtomicInteger replKeyIdx = new AtomicInteger();
+
+ Set<GridCacheVersion> partTxVers = new GridConcurrentHashSet<>();
+ Set<GridCacheVersion> replTxVers = new GridConcurrentHashSet<>();
+
IgniteInternalFuture<?> partFut = multithreadedAsync(() -> {
try {
- checkTransactionsCount(
- orig, 0,
- failed, 0,
- backupNodes, 0,
- nearNodes, 0,
- vers);
+ int idx = partKeyIdx.getAndIncrement();
Transaction tx = orig.transactions().txStart();
- vers.add(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
+ partTxVers.add(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
- checkTransactionsCount(
- orig, 1,
- failed, 0,
- backupNodes, 0,
- nearNodes, 0,
- vers);
+ int key = partKeys.get(idx);
- orig.getOrCreateCache(PART_CACHE_NAME).put(partKey, 42);
-
- checkTransactionsCount(
- orig, 1,
- failed, 1,
- backupNodes, 0,
- nearNodes, 0,
- vers);
+ orig.getOrCreateCache(PART_CACHE_NAME).put(key, key);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
- checkTransactionsCount(
- orig, 1,
- failed, 1,
- backupNodes, 1,
- nearNodes, 0,
- vers);
-
partPreparedLatch.countDown();
partCommitLatch.await();
@@ -184,52 +258,22 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
- }, 1);
-
- AtomicReference<GridCacheVersion> replTxVer = new AtomicReference<>();
+ }, nodes);
IgniteInternalFuture<?> replFut = multithreadedAsync(() -> {
try {
- partPreparedLatch.await(); // Waiting for partitioned cache tx preparation.
-
- checkTransactionsCount(
- orig, 1,
- failed, 1,
- backupNodes, 1,
- nearNodes, 0,
- vers);
+ int idx = replKeyIdx.getAndIncrement();
Transaction tx = orig.transactions().txStart();
- replTxVer.set(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
-
- vers.add(replTxVer.get());
+ replTxVers.add(((TransactionProxyImpl<?, ?>)tx).tx().nearXidVersion());
- checkTransactionsCount(
- orig, 2,
- failed, 1,
- backupNodes, 1,
- nearNodes, 0,
- vers);
+ int key = replKeys.get(idx);
- orig.getOrCreateCache(REPL_CACHE_NAME).put(replKey, 43);
-
- checkTransactionsCount(
- orig, 2,
- failed, 2,
- backupNodes, 1,
- nearNodes, 0,
- vers);
+ orig.getOrCreateCache(REPL_CACHE_NAME).put(key, key);
((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
- checkTransactionsCount(
- orig, 2,
- failed, 2,
- backupNodes, 2,
- nearNodes, 1,
- vers);
-
replPreparedLatch.countDown();
replCommitLatch.await();
@@ -240,32 +284,47 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
catch (Exception e) {
fail("Should not happen [exception=" + e + "]");
}
- }, 1);
+ }, nodes);
partPreparedLatch.await();
replPreparedLatch.await();
checkTransactionsCount(
- orig, 2,
- failed, 2,
- backupNodes, 2,
- nearNodes, 1,
- vers);
+ orig, nodes,
+ brokenCellNodes, nodes / 2,
+ aliveCellNodes, nodes / 2,
+ partTxVers);
+
+ checkTransactionsCount(
+ orig, nodes,
+ brokenCellNodes, nodes,
+ aliveCellNodes, nodes,
+ replTxVers);
assertFalse(lsnrAny.check());
- listeningLog.registerListener(lsnrNear);
+ listeningLog.registerListener(lsnrAliveCell);
failed.close(); // Stopping node.
awaitForSwitchOnNodeLeft(failed);
+ // In case of originating node failed all alive primaries will recover (commit) txs on tx cordinator falure.
+ // Txs with failed primary will start recovery, but can't finish it since recovery messages are blocked.
+
+ // Broken cell's nodes will have 1 unrecovered tx for partitioned cache.
checkTransactionsCount(
- orig != failed ? orig : null /*stopped*/, 2 /* replicated + partitioned */,
- null /*stopped*/, 0,
- backupNodes, 2 /* replicated + partitioned */,
- nearNodes, 1 /* replicated */,
- vers);
+ orig != failed ? orig : null /*stopped*/, nodes,
+ brokenCellNodes, orig == failed ? 1 : nodes / 2,
+ aliveCellNodes, orig == failed ? 0 : nodes / 2,
+ partTxVers);
+
+ // All cell's nodes will have 1 unrecovered tx for replicated cache.
+ checkTransactionsCount(
+ orig != failed ? orig : null /*stopped*/, nodes,
+ brokenCellNodes, orig == failed ? 1 : nodes,
+ aliveCellNodes, orig == failed ? 1 : nodes,
+ replTxVers);
BiConsumer<T2<Ignite, String>, T3<CountDownLatch, CountDownLatch, CountDownLatch>> txRun = // Counts tx's creations and preparations.
(T2<Ignite, String> pair, T3</*create*/CountDownLatch, /*put*/CountDownLatch, /*commit*/CountDownLatch> latches) -> {
@@ -278,7 +337,8 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
try (Transaction tx = ignite.transactions().txStart()) {
latches.get1().countDown(); // Create.
- cache.put(primaryKeys(cache, 100).get(99), 2);
+ // Avoiding intersection with prepared keys.
+ cache.put(primaryKeys(cache, 1, 1_000).get(0), 42);
latches.get2().countDown(); // Put.
@@ -292,63 +352,69 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
}
};
- CountDownLatch replBackupCreateLatch = new CountDownLatch(backupNodes.size());
- CountDownLatch replBackupPutLatch = new CountDownLatch(backupNodes.size());
- CountDownLatch replBackupCommitLatch = new CountDownLatch(backupNodes.size());
- CountDownLatch replNearCreateLatch = new CountDownLatch(nearNodes.size());
- CountDownLatch replNearPutLatch = new CountDownLatch(nearNodes.size());
- CountDownLatch replNearCommitLatch = new CountDownLatch(nearNodes.size());
+ CountDownLatch partBrokenCellCreateLatch = new CountDownLatch(brokenCellNodes.size());
+ CountDownLatch partBrokenCellPutLatch = new CountDownLatch(brokenCellNodes.size());
+ CountDownLatch partBrokenCellCommitLatch = new CountDownLatch(brokenCellNodes.size());
+ CountDownLatch partAliveCellCreateLatch = new CountDownLatch(aliveCellNodes.size());
+ CountDownLatch partAliveCellPutLatch = new CountDownLatch(aliveCellNodes.size());
+ CountDownLatch partAliveCellCommitLatch = new CountDownLatch(aliveCellNodes.size());
- CountDownLatch partBackupCreateLatch = new CountDownLatch(backupNodes.size());
- CountDownLatch partBackupPutLatch = new CountDownLatch(backupNodes.size());
- CountDownLatch partBackupCommitLatch = new CountDownLatch(backupNodes.size());
- CountDownLatch partNearCreateLatch = new CountDownLatch(nearNodes.size());
- CountDownLatch partNearPutLatch = new CountDownLatch(nearNodes.size());
- CountDownLatch partNearCommitLatch = new CountDownLatch(nearNodes.size());
+ CountDownLatch replBrokenCellCreateLatch = new CountDownLatch(brokenCellNodes.size());
+ CountDownLatch replBrokenCellPutLatch = new CountDownLatch(brokenCellNodes.size());
+ CountDownLatch replBrokenCellCommitLatch = new CountDownLatch(brokenCellNodes.size());
+ CountDownLatch replAliveCellCreateLatch = new CountDownLatch(aliveCellNodes.size());
+ CountDownLatch replAliveCellPutLatch = new CountDownLatch(aliveCellNodes.size());
+ CountDownLatch replAliveCellCommitLatch = new CountDownLatch(aliveCellNodes.size());
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
- for (Ignite backup : backupNodes) {
+ for (Ignite brokenCellNode : brokenCellNodes) {
futs.add(multithreadedAsync(() ->
- txRun.accept(new T2<>(backup, REPL_CACHE_NAME),
- new T3<>(replBackupCreateLatch, replBackupPutLatch, replBackupCommitLatch)), 1));
+ txRun.accept(new T2<>(brokenCellNode, REPL_CACHE_NAME),
+ new T3<>(replBrokenCellCreateLatch, replBrokenCellPutLatch, replBrokenCellCommitLatch)), 1));
futs.add(multithreadedAsync(() ->
- txRun.accept(new T2<>(backup, PART_CACHE_NAME),
- new T3<>(partBackupCreateLatch, partBackupPutLatch, partBackupCommitLatch)), 1));
+ txRun.accept(new T2<>(brokenCellNode, PART_CACHE_NAME),
+ new T3<>(partBrokenCellCreateLatch, partBrokenCellPutLatch, partBrokenCellCommitLatch)), 1));
}
- for (Ignite near : nearNodes) {
+ for (Ignite aliveCellNode : aliveCellNodes) {
futs.add(multithreadedAsync(() ->
- txRun.accept(new T2<>(near, REPL_CACHE_NAME),
- new T3<>(replNearCreateLatch, replNearPutLatch, replNearCommitLatch)), 1));
+ txRun.accept(new T2<>(aliveCellNode, REPL_CACHE_NAME),
+ new T3<>(replAliveCellCreateLatch, replAliveCellPutLatch, replAliveCellCommitLatch)), 1));
futs.add(multithreadedAsync(() ->
- txRun.accept(new T2<>(near, PART_CACHE_NAME),
- new T3<>(partNearCreateLatch, partNearPutLatch, partNearCommitLatch)), 1));
+ txRun.accept(new T2<>(aliveCellNode, PART_CACHE_NAME),
+ new T3<>(partAliveCellCreateLatch, partAliveCellPutLatch, partAliveCellCommitLatch)), 1));
}
// Switch in progress cluster-wide.
+ // Alive nodes switch blocked until replicated caches recovery happen.
checkUpcomingTransactionsState(
- replBackupCreateLatch, 0, // Started.
- replBackupPutLatch, backupNodes.size(),
- replBackupCommitLatch, backupNodes.size(),
- replNearCreateLatch, 0, // Started.
- replNearPutLatch, nearNodes.size(),
- replNearCommitLatch, nearNodes.size());
+ partBrokenCellCreateLatch, 0, // Started.
+ partBrokenCellPutLatch, brokenCellNodes.size(),
+ partBrokenCellCommitLatch, brokenCellNodes.size(),
+ partAliveCellCreateLatch, 0, // Started. Blocked by replicated cache recovery.
+ partAliveCellPutLatch, aliveCellNodes.size(),
+ partAliveCellCommitLatch, aliveCellNodes.size());
checkUpcomingTransactionsState(
- partBackupCreateLatch, 0, // Started.
- partBackupPutLatch, backupNodes.size(),
- partBackupCommitLatch, backupNodes.size(),
- partNearCreateLatch, 0, // Started.
- partNearPutLatch, nearNodes.size(),
- partNearCommitLatch, nearNodes.size());
+ replBrokenCellCreateLatch, 0, // Started.
+ replBrokenCellPutLatch, brokenCellNodes.size(),
+ replBrokenCellCommitLatch, brokenCellNodes.size(),
+ replAliveCellCreateLatch, 0, // Started. Blocked by replicated cache recovery.
+ replAliveCellPutLatch, aliveCellNodes.size(),
+ replAliveCellCommitLatch, aliveCellNodes.size());
checkTransactionsCount(
- orig != failed ? orig : null, 2 /* replicated + partitioned */,
- null, 0,
- backupNodes, 2 /* replicated + partitioned */,
- nearNodes, 1 /* replicated */,
- vers);
+ orig != failed ? orig : null /*stopped*/, nodes,
+ brokenCellNodes, orig == failed ? 1 : nodes / 2,
+ aliveCellNodes, orig == failed ? 0 : nodes / 2,
+ partTxVers);
+
+ checkTransactionsCount(
+ orig != failed ? orig : null /*stopped*/, nodes,
+ brokenCellNodes, orig == failed ? 1 : nodes,
+ aliveCellNodes, orig == failed ? 1 : nodes,
+ replTxVers);
// Replicated recovery.
for (Ignite ignite : G.allGrids()) {
@@ -358,7 +424,7 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
spi.stopBlock(true, blockedMsg -> {
Message msg = blockedMsg.ioMessage().message();
- return ((GridCacheTxRecoveryRequest)msg).nearXidVersion().equals(replTxVer.get());
+ return replTxVers.contains(((GridCacheTxRecoveryRequest)msg).nearXidVersion());
});
}
@@ -366,41 +432,51 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
replFut.get();
// Switch partially finished.
- // Cell 1 (backups) still in switch.
- // Cell 2 (near nodes) finished the switch.
+ // Broken cell still in switch.
+ // Alive cell finished the switch.
checkUpcomingTransactionsState(
- replBackupCreateLatch, 0, // Started.
- replBackupPutLatch, backupNodes.size(),
- replBackupCommitLatch, backupNodes.size(),
- replNearCreateLatch, 0, // Started.
- replNearPutLatch, 0, // Near nodes able to start transactions on primaries (Cell 2),
- replNearCommitLatch, nearNodes.size()); // But not able to commit, since some backups (Cell 1) still in switch.
+ partBrokenCellCreateLatch, 0, // Started.
+ partBrokenCellPutLatch, brokenCellNodes.size(),
+ partBrokenCellCommitLatch, brokenCellNodes.size(),
+ partAliveCellCreateLatch, 0, // Started.
+ partAliveCellPutLatch, 0, // Alive cell nodes's able to start transactions on primaries,
+ partAliveCellCommitLatch, 0); // Able to commit, since all primaries and backups are inside the alive cell.
checkUpcomingTransactionsState(
- partBackupCreateLatch, 0, // Started.
- partBackupPutLatch, backupNodes.size(),
- partBackupCommitLatch, backupNodes.size(),
- partNearCreateLatch, 0, // Started.
- partNearPutLatch, 0, // Near nodes able to start transactions on primaries (Cell 2),
- partNearCommitLatch, 0); // Able to commit, since all primaries and backups are in Cell 2.
+ replBrokenCellCreateLatch, 0, // Started.
+ replBrokenCellPutLatch, brokenCellNodes.size(),
+ replBrokenCellCommitLatch, brokenCellNodes.size(),
+ replAliveCellCreateLatch, 0, // Started.
+ replAliveCellPutLatch, 0, // Alive cell's nodes able to start transactions on primaries,
+ replAliveCellCommitLatch, aliveCellNodes.size()); // But not able to commit, since broken cell's nodes still in switch.
checkTransactionsCount(
- orig != failed ? orig : null, 1 /* partitioned */,
- null, 0,
- backupNodes, 1 /* partitioned */,
- nearNodes, 0,
- vers);
+ orig != failed ? orig : null /*stopped*/, nodes,
+ brokenCellNodes, orig == failed ? 1 : nodes / 2,
+ aliveCellNodes, orig == failed ? 0 : nodes / 2 /*to be committed*/, // New txs able to start while previous are in progress.
+ partTxVers);
- assertTrue(waitForCondition(lsnrNear::check, 5000));
+ checkTransactionsCount(
+ orig != failed ? orig : null /*stopped*/, 0,
+ brokenCellNodes, 0,
+ aliveCellNodes, 0,
+ replTxVers);
- listeningLog.registerListener(lsnrBackup);
+ // Recovery finished on alive cell.
+ assertTrue(waitForCondition(lsnrAliveCell::check, 5000));
+
+ listeningLog.registerListener(lsnrBrokenCell);
// Partitioned recovery.
for (Ignite ignite : G.allGrids()) {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
- spi.stopBlock(true, blockedMsg -> true);
+ spi.stopBlock(true, blockedMsg -> {
+ Message msg = blockedMsg.ioMessage().message();
+
+ return partTxVers.contains(((GridCacheTxRecoveryRequest)msg).nearXidVersion());
+ });
}
partCommitLatch.countDown();
@@ -408,79 +484,87 @@ public class GridExchangeFreeCellularSwitchIsolationTest extends GridExchangeFre
// Switches finished cluster-wide, all transactions can be committed.
checkUpcomingTransactionsState(
- replBackupCreateLatch, 0,
- replBackupPutLatch, 0,
- replBackupCommitLatch, 0,
- replNearCreateLatch, 0,
- replNearPutLatch, 0,
- replNearCommitLatch, 0);
+ replBrokenCellCreateLatch, 0,
+ replBrokenCellPutLatch, 0,
+ replBrokenCellCommitLatch, 0,
+ replAliveCellCreateLatch, 0,
+ replAliveCellPutLatch, 0,
+ replAliveCellCommitLatch, 0);
checkUpcomingTransactionsState(
- partBackupCreateLatch, 0,
- partBackupPutLatch, 0,
- partBackupCommitLatch, 0,
- partNearCreateLatch, 0,
- partNearPutLatch, 0,
- partNearCommitLatch, 0);
+ partBrokenCellCreateLatch, 0,
+ partBrokenCellPutLatch, 0,
+ partBrokenCellCommitLatch, 0,
+ partAliveCellCreateLatch, 0,
+ partAliveCellPutLatch, 0,
+ partAliveCellCommitLatch, 0);
// Check that pre-failure transactions are absent.
checkTransactionsCount(
- orig != failed ? orig : null, 0,
- null, 0,
- backupNodes, 0,
- nearNodes, 0,
- vers);
+ orig != failed ? orig : null /*stopped*/, 0,
+ brokenCellNodes, 0,
+ aliveCellNodes, 0,
+ partTxVers);
+
+ checkTransactionsCount(
+ orig != failed ? orig : null /*stopped*/, 0,
+ brokenCellNodes, 0,
+ aliveCellNodes, 0,
+ replTxVers);
- assertTrue(waitForCondition(lsnrBackup::check, 5000));
+ // Recovery finished on broken cell.
+ assertTrue(waitForCondition(lsnrBrokenCell::check, 5000));
for (IgniteInternalFuture<?> fut : futs)
fut.get();
for (Ignite node : G.allGrids()) {
- assertEquals(42, node.getOrCreateCache(PART_CACHE_NAME).get(partKey));
- assertEquals(43, node.getOrCreateCache(REPL_CACHE_NAME).get(replKey));
+ for (int key : partKeys)
+ assertEquals(key, node.getOrCreateCache(PART_CACHE_NAME).get(key));
+
+ for (int key : replKeys)
+ assertEquals(key, node.getOrCreateCache(REPL_CACHE_NAME).get(key));
}
// Final check that any transactions are absent.
checkTransactionsCount(
null, 0,
- null, 0,
- backupNodes, 0,
- nearNodes, 0,
- null);
+ brokenCellNodes, 0,
+ aliveCellNodes, 0,
+ null /*any*/);
}
/**
*
*/
private void checkUpcomingTransactionsState(
- CountDownLatch backupCreateLatch,
- int backupCreateCnt,
- CountDownLatch backupPutLatch,
- int backupPutCnt,
- CountDownLatch backupCommitLatch,
- int backupCommitCnt,
- CountDownLatch nearCreateLatch,
- int nearCreateCnt,
- CountDownLatch nearPutLatch,
- int nearPutCnt,
- CountDownLatch nearCommitLatch,
- int nearCommitCnt) throws InterruptedException {
- checkTransactionsState(backupCreateLatch, backupCreateCnt);
- checkTransactionsState(backupPutLatch, backupPutCnt);
- checkTransactionsState(backupCommitLatch, backupCommitCnt);
- checkTransactionsState(nearCreateLatch, nearCreateCnt);
- checkTransactionsState(nearPutLatch, nearPutCnt);
- checkTransactionsState(nearCommitLatch, nearCommitCnt);
+ CountDownLatch brokenCellCreateLatch,
+ int brokenCellCreateCnt,
+ CountDownLatch brokenCellPutLatch,
+ int brokenCellPutCnt,
+ CountDownLatch brokenCellCommitLatch,
+ int brokenCellCommitCnt,
+ CountDownLatch aliveCellCreateLatch,
+ int aliveCellCreateCnt,
+ CountDownLatch aliveCellPutLatch,
+ int aliveCellPutCnt,
+ CountDownLatch aliveCellCommitLatch,
+ int aliveCellCommitCnt) throws InterruptedException {
+ checkTransactionsState(brokenCellCreateLatch, brokenCellCreateCnt);
+ checkTransactionsState(brokenCellPutLatch, brokenCellPutCnt);
+ checkTransactionsState(brokenCellCommitLatch, brokenCellCommitCnt);
+ checkTransactionsState(aliveCellCreateLatch, aliveCellCreateCnt);
+ checkTransactionsState(aliveCellPutLatch, aliveCellPutCnt);
+ checkTransactionsState(aliveCellCommitLatch, aliveCellCommitCnt);
}
/**
*
*/
private void checkTransactionsState(CountDownLatch latch, int cnt) throws InterruptedException {
- if (cnt > 0)
- assertEquals(cnt, latch.getCount()); // Switch in progress.
- else
- latch.await(); // Switch finished (finishing).
+ if (cnt == 0)
+ latch.await(10, TimeUnit.SECONDS); // Switch finished (finishing).
+
+ assertEquals(cnt, latch.getCount()); // Switch in progress.
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchTxContinuationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchTxContinuationTest.java
new file mode 100644
index 0000000..f214710
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchTxContinuationTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class GridExchangeFreeCellularSwitchTxContinuationTest extends GridExchangeFreeCellularSwitchAbstractTest {
+ /** Concurrency. */
+ @Parameterized.Parameter(0)
+ public TransactionConcurrency concurrency;
+
+ /** Isolation. */
+ @Parameterized.Parameter(1)
+ public TransactionIsolation isolation;
+
+ /** Start from. */
+ @Parameterized.Parameter(2)
+ public TransactionCoordinatorNode startFrom;
+
+ /**
+ *
+ */
+ @Parameterized.Parameters(name = "Isolation = {0}, Concurrency = {1}, Started from = {2}")
+ public static Collection<Object[]> runConfig() {
+ ArrayList<Object[]> params = new ArrayList<>();
+ for (TransactionConcurrency concurrency : TransactionConcurrency.values())
+ for (TransactionIsolation isolation : TransactionIsolation.values())
+ for (TransactionCoordinatorNode from : TransactionCoordinatorNode.values())
+ if (from != TransactionCoordinatorNode.FAILED) // Impossible to continue tx started at failed node :)
+ params.add(new Object[] {concurrency, isolation, from});
+
+ return params;
+ }
+
+ /**
+ * Tests checks that txs started before the switch can be continued after the switch if they are not affected by
+ * node fail.
+ */
+ @Test
+ public void testAlreadyStartedTxsContinuationDuringAndAfterTheSwitch() throws Exception {
+ int nodes = 6;
+
+ startGridsMultiThreaded(nodes);
+
+ blockRecoveryMessages();
+
+ CellularCluster cluster = resolveCellularCluster(nodes, startFrom);
+
+ Ignite orig = cluster.orig;
+ Ignite failed = cluster.failed;
+
+ int txCnt = 1024;
+ int keysPerTx = 6; // See puts count inside the closure.
+ int prepTxCnt = 100;
+ int dataAmount = txCnt * keysPerTx + prepTxCnt;
+ int totalDataAmount = dataAmount + prepTxCnt;
+
+ Queue<Integer> keys = new ConcurrentLinkedDeque<>();
+ Queue<Integer> primaryOnFailedKeys = new ConcurrentLinkedDeque<>();
+
+ Queue<Integer> keysToCheck = new ConcurrentLinkedDeque<>();
+
+ for (int i = 0; keys.size() < dataAmount; i++)
+ if (!primaryNode(i, PART_CACHE_NAME).equals(failed)) // Will not cause node failed exception on put.
+ keys.add(i);
+
+ for (int i = 0; primaryOnFailedKeys.size() < prepTxCnt; i++)
+ if (primaryNode(i, PART_CACHE_NAME).equals(failed)) // Will cause explicit recovery on node fail.
+ primaryOnFailedKeys.add(i);
+
+ CountDownLatch putInitLatch = new CountDownLatch(txCnt);
+ CountDownLatch prepLatch = new CountDownLatch(prepTxCnt * 2);
+
+ CountDownLatch nodeFailedLatch = new CountDownLatch(1);
+ CountDownLatch nodesAwareOfFailLatch = new CountDownLatch(1);
+ CountDownLatch txsRecoveryAllowedLatch = new CountDownLatch(1);
+ CountDownLatch txsRecoveryFinishedLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> txFut = multithreadedAsync(() -> {
+ try {
+ Transaction tx = orig.transactions().txStart(concurrency, isolation);
+
+ IgniteCache<Integer, Integer> cache = orig.getOrCreateCache(PART_CACHE_NAME);
+
+ // Put before node fail.
+ put(cache, keys, keysToCheck);
+
+ long initTopVer =
+ ((IgniteEx)orig).context().cache().context().exchange().readyAffinityVersion().topologyVersion();
+
+ putInitLatch.countDown();
+
+ nodeFailedLatch.await();
+
+ // Put right after node fail.
+ put(cache, keys, keysToCheck);
+
+ nodesAwareOfFailLatch.await();
+
+ // Put when nodes are aware of fail.
+ put(cache, keys, keysToCheck);
+
+ txsRecoveryAllowedLatch.await();
+
+ // Put right after recovery allowed.
+ put(cache, keys, keysToCheck);
+
+ txsRecoveryFinishedLatch.await();
+
+ // Put right after recovery finished.
+ put(cache, keys, keysToCheck);
+
+ // Put with some random delay after recovery happen.
+ U.sleep(ThreadLocalRandom.current().nextInt(5_000));
+
+ put(cache, keys, keysToCheck);
+
+ ((TransactionProxyImpl<?, ?>)tx).commit();
+
+ long commitTopVer =
+ ((IgniteEx)orig).context().cache().context().exchange().readyAffinityVersion().topologyVersion();
+
+ assertTrue(commitTopVer > initTopVer); // Started before the switch, but continued after it.
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, txCnt);
+
+ IgniteInternalFuture<?> prepFut1 = multithreadedAsync(() -> { // Keys with unaffected primary.
+ try {
+ putInitLatch.await();
+
+ Transaction tx = failed.transactions().txStart();
+
+ IgniteCache<Integer, Integer> cache = failed.getOrCreateCache(PART_CACHE_NAME);
+
+ put(cache, keys, keysToCheck);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ prepLatch.countDown();
+ txsRecoveryFinishedLatch.await();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, prepTxCnt);
+
+ IgniteInternalFuture<?> prepFut2 = multithreadedAsync(() -> { // Primary keys of failed primary.
+ try {
+ putInitLatch.await();
+
+ Transaction tx = failed.transactions().txStart();
+
+ IgniteCache<Integer, Integer> cache = failed.getOrCreateCache(PART_CACHE_NAME);
+
+ put(cache, primaryOnFailedKeys, keysToCheck);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ prepLatch.countDown();
+ txsRecoveryFinishedLatch.await();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, prepTxCnt);
+
+ prepLatch.await();
+
+ failed.close(); // Stopping node.
+
+ nodeFailedLatch.countDown();
+
+ awaitForSwitchOnNodeLeft(failed);
+
+ nodesAwareOfFailLatch.countDown();
+
+ // Allowing recovery.
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.stopBlock(true, blockedMsg -> true);
+ }
+
+ txsRecoveryAllowedLatch.countDown();
+
+ for (Ignite ignite : G.allGrids()) {
+ for (IgniteInternalTx tx : ((IgniteEx)ignite).context().cache().context().tm().activeTransactions()) {
+ while (tx.state() == TransactionState.PREPARED)
+ U.sleep(100);
+ }
+ }
+
+ txsRecoveryFinishedLatch.countDown();
+
+ prepFut1.get();
+ prepFut2.get();
+ txFut.get();
+
+ assertTrue(keys.isEmpty());
+ assertTrue(primaryOnFailedKeys.isEmpty());
+
+ assertEquals(totalDataAmount, keysToCheck.size());
+
+ IgniteCache<Integer, Integer> cache = orig.getOrCreateCache(PART_CACHE_NAME);
+
+ for (Integer i : keysToCheck)
+ assertEquals(i, cache.get(i));
+ }
+
+ /**
+ *
+ */
+ private void put(IgniteCache<Integer, Integer> cache, Queue<Integer> keysToPut, Queue<Integer> keysToCheck) {
+ Integer key = keysToPut.remove();
+
+ keysToCheck.add(key);
+
+ cache.put(key, key);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchTxCountersTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchTxCountersTest.java
new file mode 100644
index 0000000..c7d1f23
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridExchangeFreeCellularSwitchTxCountersTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class GridExchangeFreeCellularSwitchTxCountersTest extends GridExchangeFreeCellularSwitchAbstractTest {
+ /**
+ * Test checks that partition counters are the same across the cluster after the partial prepared txs rollback.
+ */
+ @Test
+ public void testPartitionCountersSynchronizationOnPmeFreeSwitch() throws Exception {
+ int nodes = 6;
+
+ startGridsMultiThreaded(nodes);
+
+ CellularCluster cluster = resolveCellularCluster(nodes, TransactionCoordinatorNode.FAILED);
+
+ Ignite orig = cluster.orig;
+ Ignite failed = cluster.failed;
+ List<Ignite> brokenCellNodes = cluster.brokenCellNodes;
+ List<Ignite> aliveCellNodes = cluster.aliveCellNodes;
+
+ List<Integer> keys;
+ List<Integer> putKeys;
+ List<Integer> partialPreparedKeys1;
+ List<Integer> preparedKeys;
+ List<Integer> partialPreparedKeys2;
+
+ int part = -1;
+
+ do { // Getting keys related to the primary partition on failed node.
+ keys = partitionKeys(failed.getOrCreateCache(PART_CACHE_NAME), ++part, 40, 0);
+ }
+ while (!(failed.equals(primaryNode(keys.get(0), PART_CACHE_NAME))));
+
+ putKeys = keys.subList(0, 10);
+ partialPreparedKeys1 = keys.subList(10, 20);
+ preparedKeys = keys.subList(20, 30);
+ partialPreparedKeys2 = keys.subList(30, 40);
+
+ IgniteCache<Integer, Integer> failedCache = failed.getOrCreateCache(PART_CACHE_NAME);
+
+ // Regular put.
+ for (Integer key : putKeys)
+ failedCache.put(key, key);
+
+ // Partial prepare #1.
+ IgniteInternalFuture<?> hangedPrepFut1 = partialPrepare(partialPreparedKeys1, failed, brokenCellNodes.get(0));
+
+ // Regular prepare.
+ CountDownLatch nodeFailedLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> prepFut = prepare(preparedKeys, orig, nodeFailedLatch);
+
+ // Partial prepare #2.
+ IgniteInternalFuture<?> hangedPrepFut2 = partialPrepare(partialPreparedKeys2, failed, brokenCellNodes.get(1));
+
+ assertCountersAsExpected(part, false, PART_CACHE_NAME, 10, -1 /*ignored*/);
+
+ failed.close(); // Stopping node.
+
+ nodeFailedLatch.countDown();
+
+ hangedPrepFut1.get();
+ prepFut.get();
+ hangedPrepFut2.get();
+
+ waitForTopology(nodes - 1);
+
+ awaitPartitionMapExchange();
+
+ for (Ignite ignite : G.allGrids()) {
+ IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(PART_CACHE_NAME);
+
+ for (Integer key : putKeys)
+ assertEquals(key, cache.get(key)); // Successful put. Cnts 1 - 10.
+
+ for (Integer key : partialPreparedKeys1)
+ assertEquals(null, cache.get(key)); // Rolled back due to partial preparation. Cnts 11 - 20.
+
+ for (Integer key : preparedKeys)
+ assertEquals(key, cache.get(key)); // Successful recovery due to full preparation. Cnts 20 - 30.
+
+ for (Integer key : partialPreparedKeys2)
+ assertEquals(null, cache.get(key)); // Rolled back due to partial preparation. Cnts 30 - 40.
+ }
+
+ // Finalized to last update. Gaps (11-20) filled. Gaps tail (30-40) dropped.
+ assertCountersAsExpected(part, true, PART_CACHE_NAME, 30, 30);
+
+ assertPartitionsSame(idleVerify(aliveCellNodes.get(0), PART_CACHE_NAME));
+ }
+
+ /**
+ *
+ */
+ private IgniteInternalFuture<?> prepare(List<Integer> keys, Ignite node, CountDownLatch nodeFailedLatch) throws Exception {
+ CountDownLatch initLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+ try {
+ IgniteCache<Integer, Integer> cache = node.getOrCreateCache(PART_CACHE_NAME);
+
+ Transaction tx = node.transactions().txStart();
+
+ for (Integer key : keys)
+ cache.put(key, key);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ initLatch.countDown();
+
+ nodeFailedLatch.await();
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, 1);
+
+ initLatch.await();
+
+ return fut;
+ }
+
+ /**
+ *
+ */
+ private IgniteInternalFuture<?> partialPrepare(List<Integer> keys, Ignite node, Ignite blockedBackup) throws Exception {
+ CountDownLatch prepMsgLatch = new CountDownLatch(2 /*one per node*/);
+ AtomicInteger blockedMsgCnt = new AtomicInteger();
+
+ // Blocking messages to have tx partially prepared.
+ blockPrepareMessages(blockedBackup, prepMsgLatch, blockedMsgCnt);
+
+ IgniteInternalFuture<?> fut = multithreadedAsync(() -> {
+ try {
+ IgniteCache<Integer, Integer> cache = node.getOrCreateCache(PART_CACHE_NAME);
+
+ Transaction tx = node.transactions().txStart();
+
+ for (Integer key : keys)
+ cache.put(key, key);
+
+ ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+ fail("Should hang before this line since one prepare message is blocked.");
+ }
+ catch (NodeStoppingException ignored) {
+ // No-op.
+ }
+ catch (Exception e) {
+ fail("Should not happen [exception=" + e + "]");
+ }
+ }, 1);
+
+ prepMsgLatch.await(); // Both messages handled.
+ assertEquals(1, blockedMsgCnt.get()); // One message blocked.
+
+ stopBlockingPrepareMessages();
+
+ return fut;
+ }
+
+ /**
+ *
+ */
+ protected void blockPrepareMessages(Ignite igniteTo, CountDownLatch prepMsgLatch, AtomicInteger blockedMsgCnt) {
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtTxPrepareRequest) {
+ IgniteEx to = IgnitionEx.gridxx(node.id());
+
+ assert prepMsgLatch.getCount() > 0;
+
+ boolean block = to.equals(igniteTo);
+
+ if (block)
+ blockedMsgCnt.incrementAndGet();
+
+ prepMsgLatch.countDown();
+
+ return block;
+ }
+
+ return false;
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ protected void stopBlockingPrepareMessages() {
+ for (Ignite ignite : G.allGrids()) {
+ TestRecordingCommunicationSpi spi =
+ (TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.stopBlock(false, blockedMsg -> true);
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
index 1d69c05..e6f3bbaf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java
@@ -143,7 +143,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
cache.put(keys.get(2), new TestVal(keys.get(2)));
ops.add(new T2<>(keys.get(2), op));
- assertCountersSame(PARTITION_ID, false);
+ assertCountersSame(PARTITION_ID, false, DEFAULT_CACHE_NAME);
cache.remove(keys.get(2));
ops.add(new T2<>(keys.get(2), GridCacheOperation.DELETE));
@@ -154,7 +154,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
cache.remove(keys.get(0));
ops.add(new T2<>(keys.get(0), GridCacheOperation.DELETE));
- assertCountersSame(PARTITION_ID, false);
+ assertCountersSame(PARTITION_ID, false, DEFAULT_CACHE_NAME);
for (Ignite ignite : G.allGrids()) {
if (ignite.configuration().isClientMode())
@@ -407,7 +407,7 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt
assertPartitionsSame(idleVerify(prim, DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
index e6c64ad..e6d18b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest.java
@@ -56,4 +56,164 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsHistoryRebalanceTest
@Override public void testCommitReorderWithRollbackNoRebalanceAfterRestart() throws Exception {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_1_1() throws Exception {
+ super.testPartialPrepare_2TX_1_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_1_2() throws Exception {
+ super.testPartialPrepare_2TX_1_2();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_1_3() throws Exception {
+ super.testPartialPrepare_2TX_1_3();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_1_4() throws Exception {
+ super.testPartialPrepare_2TX_1_4();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_2_1() throws Exception {
+ super.testPartialPrepare_2TX_2_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_2_2() throws Exception {
+ super.testPartialPrepare_2TX_2_2();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_2_3() throws Exception {
+ super.testPartialPrepare_2TX_2_3();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_2TX_2_4() throws Exception {
+ super.testPartialPrepare_2TX_2_4();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_1_1() throws Exception {
+ super.testPartialPrepare_3TX_1_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_2_1() throws Exception {
+ super.testPartialPrepare_3TX_2_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_3_1() throws Exception {
+ super.testPartialPrepare_3TX_3_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_4_1() throws Exception {
+ super.testPartialPrepare_3TX_4_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_5_1() throws Exception {
+ super.testPartialPrepare_3TX_5_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_6_1() throws Exception {
+ super.testPartialPrepare_3TX_6_1();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_1_2() throws Exception {
+ super.testPartialPrepare_3TX_1_2();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_2_2() throws Exception {
+ super.testPartialPrepare_3TX_2_2();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_3_2() throws Exception {
+ super.testPartialPrepare_3TX_3_2();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_4_2() throws Exception {
+ super.testPartialPrepare_3TX_4_2();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_5_2() throws Exception {
+ super.testPartialPrepare_3TX_5_2();
+ }
+
+ /** {@inheritDoc} */
+ @Test
+ @Ignore("Rebalance may be not triggered because all prepared txs may have counters greater than commited tx's " +
+ "and will be rolled back on recovery, so, nothing to rebalance")
+ @Override public void testPartialPrepare_3TX_6_2() throws Exception {
+ super.testPartialPrepare_3TX_6_2();
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsTest.java
index 234e218..e3cfdca 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateOnePrimaryTwoBackupsTest.java
@@ -307,7 +307,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
IgniteEx client = grid(CLIENT_GRID_NAME);
assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
PartitionUpdateCounter cntr1 = counter(PARTITION_ID, backup1.name());
assertNotNull(cntr1);
@@ -440,7 +440,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
assertPartitionsSame(idleVerify(grid(CLIENT_GRID_NAME), DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
}
/**
@@ -664,7 +664,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
assertPartitionsSame(idleVerify(grid(CLIENT_GRID_NAME), DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
startGrid(txTops.get(PARTITION_ID).get1().name());
@@ -672,7 +672,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
assertPartitionsSame(idleVerify(grid(CLIENT_GRID_NAME), DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
}
/**
@@ -747,7 +747,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
assertPartitionsSame(idleVerify(grid(CLIENT_GRID_NAME), DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
assertEquals(PRELOAD_KEYS_CNT + expCommittedSize, grid(CLIENT_GRID_NAME).cache(DEFAULT_CACHE_NAME).size());
@@ -758,7 +758,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
assertPartitionsSame(idleVerify(grid(CLIENT_GRID_NAME), DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
}
/**
@@ -846,7 +846,7 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
assertPartitionsSame(idleVerify(grid(CLIENT_GRID_NAME), DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
}
/**
@@ -899,14 +899,14 @@ public class TxPartitionCounterStateOnePrimaryTwoBackupsTest extends TxPartition
assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
startGrid(txTops.get(PARTITION_ID).get1().name());
awaitPartitionMapExchange();
// TODO assert with expected lwm value.
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
assertPartitionsSame(idleVerify(client, DEFAULT_CACHE_NAME));
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateTwoPrimaryTwoBackupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateTwoPrimaryTwoBackupsTest.java
index c097099..e93079f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateTwoPrimaryTwoBackupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateTwoPrimaryTwoBackupsTest.java
@@ -220,6 +220,6 @@ public class TxPartitionCounterStateTwoPrimaryTwoBackupsTest extends TxPartition
// Expect consistent partitions.
assertPartitionsSame(idleVerify(grid(CLIENT_GRID_NAME), DEFAULT_CACHE_NAME));
- assertCountersSame(PARTITION_ID, true);
+ assertCountersSame(PARTITION_ID, true, DEFAULT_CACHE_NAME);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 7075635..c4e3159 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -2179,6 +2179,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
* @throws IgniteException If none caches or node found.
*/
protected IdleVerifyResultV2 idleVerify(Ignite ig, @Nullable String... caches) {
+ log.info("Starting idleVerify ...");
+
IgniteEx ig0 = (IgniteEx)ig;
Set<String> cacheNames = new HashSet<>();
@@ -2393,15 +2395,16 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param partId Partition.
* @param withReserveCntr {@code True} to compare reserve counters. Because reserve counters are synced during
+ * @param cacheName Cache name.
* PME invoking with {@code true} makes sense only after PME was finished.
*/
- protected void assertCountersSame(int partId, boolean withReserveCntr) throws AssertionFailedError {
+ protected void assertCountersSame(int partId, boolean withReserveCntr, String cacheName) throws AssertionFailedError {
PartitionUpdateCounter cntr0 = null;
List<T3<String, @Nullable PartitionUpdateCounter, Boolean>> cntrMap = G.allGrids().stream().filter(ignite ->
!ignite.configuration().isClientMode()).map(ignite ->
- new T3<>(ignite.name(), counter(partId, ignite.name()),
- ignite.affinity(DEFAULT_CACHE_NAME).isPrimary(ignite.cluster().localNode(), partId))).collect(toList());
+ new T3<>(ignite.name(), counter(partId, cacheName, ignite.name()),
+ ignite.affinity(cacheName).isPrimary(ignite.cluster().localNode(), partId))).collect(toList());
for (T3<String, PartitionUpdateCounter, Boolean> cntr : cntrMap) {
if (cntr.get2() == null)
@@ -2422,6 +2425,36 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
/**
+ * @param partId Partition.
+ * @param withReserveCntr {@code True} to compare reserve counters. Because reserve counters are synced during
+ * @param cacheName Cache name.
+ * @param cnt Counter.
+ * @param reserved Reserved counter.
+ * PME invoking with {@code true} makes sense only after PME was finished.
+ */
+ protected void assertCountersAsExpected(int partId, boolean withReserveCntr, String cacheName, long cnt,
+ long reserved) throws AssertionFailedError {
+ List<T3<String, @Nullable PartitionUpdateCounter, Boolean>> cntrMap = G.allGrids().stream().filter(ignite ->
+ !ignite.configuration().isClientMode()).map(ignite ->
+ new T3<>(ignite.name(), counter(partId, cacheName, ignite.name()),
+ ignite.affinity(cacheName).isPrimary(ignite.cluster().localNode(), partId))).collect(toList());
+
+ for (T3<String, PartitionUpdateCounter, Boolean> cntr : cntrMap) {
+ if (cntr.get2() == null)
+ continue;
+
+ assertEquals("Expecting same counters [partId=" + partId +
+ ", cntrs=" + cntrMap + ']', cnt, cntr.get2().get());
+
+ if (withReserveCntr) {
+ assertEquals("Expecting same reservation counters [partId=" + partId +
+ ", cntrs=" + cntrMap + ']',
+ reserved, cntr.get2().reserved());
+ }
+ }
+ }
+
+ /**
* Checks that return types of all registered ignite metrics methods are correct.
* Also checks that all classes from {@code namesToCheck} are registered as mbeans.
*
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
index 189934d..f0c96f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite5.java
@@ -48,6 +48,8 @@ import org.apache.ignite.internal.processors.cache.PartitionsExchangeOnDiscovery
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentNodeJoinValidationTest;
import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchComplexOperationsTest;
import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchIsolationTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchTxContinuationTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchTxCountersTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGroupsPartitionLossPolicySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePartitionLossPolicySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxIteratorSelfTest;
@@ -113,6 +115,8 @@ public class IgniteCacheMvccTestSuite5 {
// Cellular switch can't be performed on MVCC caches, at least at the moment.
ignoredTests.add(GridExchangeFreeCellularSwitchIsolationTest.class);
ignoredTests.add(GridExchangeFreeCellularSwitchComplexOperationsTest.class);
+ ignoredTests.add(GridExchangeFreeCellularSwitchTxContinuationTest.class);
+ ignoredTests.add(GridExchangeFreeCellularSwitchTxCountersTest.class);
ignoredTests.add(IgniteCachePartitionLossPolicySelfTest.class);
ignoredTests.add(IgniteCacheGroupsPartitionLossPolicySelfTest.class);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
index 35072ab..7aa27e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite5.java
@@ -55,6 +55,8 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinity
import org.apache.ignite.internal.processors.cache.distributed.CacheLateAffinityAssignmentTest;
import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchComplexOperationsTest;
import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchIsolationTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchTxContinuationTest;
+import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeCellularSwitchTxCountersTest;
import org.apache.ignite.internal.processors.cache.distributed.GridExchangeFreeSwitchTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheGroupsPartitionLossPolicySelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePartitionLossPolicySelfTest;
@@ -101,6 +103,8 @@ public class IgniteCacheTestSuite5 {
GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeSwitchTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeCellularSwitchComplexOperationsTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeCellularSwitchIsolationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeCellularSwitchTxContinuationTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, GridExchangeFreeCellularSwitchTxCountersTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, EntryVersionConsistencyReadThroughTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteCacheSyncRebalanceModeSelfTest.class, ignoredTests);