You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/08 11:31:23 UTC
[05/48] incubator-ignite git commit: # ignite-157-2 Tests and fix for
tx recovery issue
# ignite-157-2 Tests and fix for tx recovery issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5daaa278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5daaa278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5daaa278
Branch: refs/heads/ignite-373
Commit: 5daaa278afcca7e00be5002e3d5247661c6ba474
Parents: 4b775f0
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 12:48:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 17:13:48 2015 +0300
----------------------------------------------------------------------
...ridCacheOptimisticCheckPreparedTxFuture.java | 78 +++-
...idCacheOptimisticCheckPreparedTxRequest.java | 47 +-
.../GridDistributedTxRemoteAdapter.java | 2 +-
.../cache/transactions/IgniteInternalTx.java | 5 +-
.../cache/transactions/IgniteTxAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 3 +-
.../transactions/IgniteTxLocalAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 47 +-
...xOriginatingNodeFailureAbstractSelfTest.java | 2 +-
...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 ++
...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 ++
...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 ++
...ePrimaryNodeFailureRecoveryAbstractTest.java | 454 +++++++++++++++++++
.../IgniteCacheTxRecoverySelfTestSuite.java | 4 +
14 files changed, 713 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 8a14b48..3e345f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -70,6 +70,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
/** Transaction nodes mapping. */
private final Map<UUID, Collection<UUID>> txNodes;
+ /** */
+ private final boolean nearTxCheck;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -77,8 +80,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
* @param txNodes Transaction mapping.
*/
@SuppressWarnings("ConstantConditions")
- public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx, IgniteInternalTx tx,
- UUID failedNodeId, Map<UUID, Collection<UUID>> txNodes) {
+ public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx,
+ IgniteInternalTx tx,
+ UUID failedNodeId,
+ Map<UUID, Collection<UUID>> txNodes)
+ {
super(cctx.kernalContext(), CU.boolReducer());
this.cctx = cctx;
@@ -114,6 +120,10 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
}
}
}
+
+ UUID nearNodeId = tx.eventNodeId();
+
+ nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
}
/**
@@ -121,6 +131,48 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
*/
@SuppressWarnings("ConstantConditions")
public void prepare() {
+ if (nearTxCheck) {
+ UUID nearNodeId = tx.eventNodeId();
+
+ if (cctx.localNodeId().equals(nearNodeId)) {
+ IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+ try {
+ onDone(fut.get());
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ else {
+ MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+ add(fut);
+
+ GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
+ tx,
+ 0,
+ true,
+ futureId(),
+ fut.futureId());
+
+ try {
+ cctx.io().send(nearNodeId, req, tx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onError(e);
+ }
+
+ markInitialized();
+ }
+
+ return;
+ }
+
// First check transactions on local node.
int locTxNum = nodeTransactions(cctx.localNodeId());
@@ -206,6 +258,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
nodeTransactions(id),
+ false,
futureId(),
fut.futureId());
@@ -228,7 +281,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
add(fut);
GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
- tx, nodeTransactions(nodeId), futureId(), fut.futureId());
+ tx,
+ nodeTransactions(nodeId),
+ false,
+ futureId(),
+ fut.futureId());
try {
cctx.io().send(nodeId, req, tx.ioPolicy());
@@ -341,11 +398,18 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
cctx.tm().finishOptimisticTxOnRecovery(tx, res);
}
else {
- if (log.isDebugEnabled())
- log.debug("Failed to check prepared transactions, " +
- "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+ if (nearTxCheck) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check transaction on near node, " +
+ "ignoring [err=" + err + ", tx=" + tx + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Failed to check prepared transactions, " +
+ "invalidating transaction [err=" + err + ", tx=" + tx + ']');
- cctx.tm().salvageTx(tx);
+ cctx.tm().salvageTx(tx);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index e83db66..4f2a1d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@ -27,8 +27,7 @@ import java.io.*;
import java.nio.*;
/**
- * Message sent to check that transactions related to some optimistic transaction
- * were prepared on remote node.
+ * Message sent to check that transactions related to transaction were prepared on remote node.
*/
public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
/** */
@@ -49,6 +48,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
/** System transaction flag. */
private boolean sys;
+ /** {@code True} if should check only tx on near node. */
+ private boolean nearTxCheck;
+
/**
* Empty constructor required by {@link Externalizable}
*/
@@ -59,11 +61,16 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
/**
* @param tx Transaction.
* @param txNum Expected number of transactions on remote node.
+ * @param nearTxCheck
* @param futId Future ID.
* @param miniId Mini future ID.
*/
- public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, int txNum, IgniteUuid futId,
- IgniteUuid miniId) {
+ public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx,
+ int txNum,
+ boolean nearTxCheck,
+ IgniteUuid futId,
+ IgniteUuid miniId)
+ {
super(tx.xidVersion(), 0);
nearXidVer = tx.nearXidVersion();
@@ -72,6 +79,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
this.futId = futId;
this.miniId = miniId;
this.txNum = txNum;
+ this.nearTxCheck = nearTxCheck;
+ }
+
+ /**
+ * @return {@code True} if should check only tx on near node.
+ */
+ public boolean nearTxCheck() {
+ return nearTxCheck;
}
/**
@@ -137,18 +152,24 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
writer.incrementState();
case 10:
- if (!writer.writeMessage("nearXidVer", nearXidVer))
+ if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
return false;
writer.incrementState();
case 11:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
case 12:
+ if (!writer.writeBoolean("sys", sys))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
if (!writer.writeInt("txNum", txNum))
return false;
@@ -187,7 +208,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
reader.incrementState();
case 10:
- nearXidVer = reader.readMessage("nearXidVer");
+ nearTxCheck = reader.readBoolean("nearTxCheck");
if (!reader.isLastRead())
return false;
@@ -195,7 +216,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
reader.incrementState();
case 11:
- sys = reader.readBoolean("sys");
+ nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
return false;
@@ -203,6 +224,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
reader.incrementState();
case 12:
+ sys = reader.readBoolean("sys");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
txNum = reader.readInt("txNum");
if (!reader.isLastRead())
@@ -222,7 +251,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 5c75390..3215138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -206,7 +206,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
+ @Override public GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key,
CacheEntryPredicate[] filter)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 30367c5..8dc07cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -338,8 +338,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
/**
* Gets node ID which directly started this transaction. In case of DHT local transaction it will be
- * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote
- * transaction it will be starter node ID.
+ * near node ID, in case of DHT remote transaction it will be primary node ID.
*
* @return Originating node ID.
*/
@@ -507,7 +506,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
* @return Current value for the key within transaction.
* @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}.
*/
- @Nullable public <K, V> GridTuple<CacheObject> peek(
+ @Nullable public GridTuple<CacheObject> peek(
GridCacheContext ctx,
boolean failFast,
KeyCacheObject key,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1c02356..82d68b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1964,7 +1964,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
- @Nullable @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext ctx,
+ @Nullable @Override public GridTuple<CacheObject> peek(GridCacheContext ctx,
boolean failFast,
KeyCacheObject key,
@Nullable CacheEntryPredicate[] filter) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6b45fee..6843075 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1183,7 +1183,8 @@ public class IgniteTxHandler {
if (log.isDebugEnabled())
log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
- IgniteInternalFuture<Boolean> fut = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+ IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) :
+ ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
if (fut == null || fut.isDone()) {
boolean prepared;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dfce09c..fc3efba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -330,7 +330,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@SuppressWarnings({"RedundantTypeArguments"})
- @Nullable @Override public <K, V> GridTuple<CacheObject> peek(
+ @Nullable @Override public GridTuple<CacheObject> peek(
GridCacheContext cacheCtx,
boolean failFast,
KeyCacheObject key,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c494602..19efc5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -727,14 +727,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param txId Transaction ID.
- * @return Transaction with given ID.
- */
- @Nullable public IgniteInternalTx txx(GridCacheVersion txId) {
- return idMap.get(txId);
- }
-
- /**
* Handles prepare stage of 2PC.
*
* @param tx Transaction to prepare.
@@ -1770,6 +1762,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param ver Version.
+ * @return Future for flag indicating if transactions was committed.
+ */
+ public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion ver) {
+ final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>();
+
+ final IgniteInternalTx tx = cctx.tm().tx(ver);
+
+ if (tx != null) {
+ assert tx.near() && tx.local() : tx;
+
+ if (log.isDebugEnabled())
+ log.debug("Found near transaction, will wait for completion: " + tx);
+
+ tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+ @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+ TransactionState state = tx.state();
+
+ if (log.isDebugEnabled())
+ log.debug("Near transaction finished with state: " + state);
+
+ resFut.onDone(state == COMMITTED);
+ }
+ });
+
+ return resFut;
+ }
+
+ Boolean committed = completedVers.get(ver);
+
+ if (log.isDebugEnabled())
+ log.debug("Near transaction committed: " + committed);
+
+ resFut.onDone(committed != null && committed);
+
+ return resFut;
+ }
+
+ /**
* @param nearVer Near version ID.
* @param txNum Number of transactions.
* @param fut Result future.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 00bd43f..d664aa8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -156,7 +156,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
TransactionProxyImpl tx = (TransactionProxyImpl)txIgniteNode.transactions().txStart();
- IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx");
+ IgniteInternalTx txEx = tx.tx();
assertTrue(txEx.optimistic());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..62d9b79
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest
+ extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..a40c989
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedPrimaryNodeFailureRecoveryTest
+ extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected NearCacheConfiguration nearConfiguration() {
+ return new NearCacheConfiguration();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..70eef1d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
+ extends IgniteCachePartitionedPrimaryNodeFailureRecoveryTest {
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+ assertEquals(1, ccfg.getBackups());
+
+ ccfg.setBackups(2);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
new file mode 100644
index 0000000..7a393d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.transactions.TransactionState.*;
+
+/**
+ *
+ */
+public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends IgniteCacheAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheMode cacheMode() {
+ return PARTITIONED;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception {
+ primaryNodeFailure(false, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception {
+ primaryNodeFailure(true, false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRollback1() throws Exception {
+ primaryNodeFailure(false, true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryNodeFailureRollback2() throws Exception {
+ primaryNodeFailure(true, true, true);
+ }
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRecovery1() throws Exception {
+ primaryNodeFailure(false, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRecovery2() throws Exception {
+ primaryNodeFailure(true, false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRollback1() throws Exception {
+ primaryNodeFailure(false, true, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryNodeFailureRollback2() throws Exception {
+ primaryNodeFailure(true, true, false);
+ }
+
+ /**
+ * @param locBackupKey If {@code true} uses recovery for local backup key.
+ * @param rollback If {@code true} tests rollback after primary node failure.
+ * @param optimistic If {@code true} tests optimistic transaction.
+ * @throws Exception If failed.
+ */
+ private void primaryNodeFailure(boolean locBackupKey, final boolean rollback, boolean optimistic) throws Exception {
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+ IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+ Affinity<Integer> aff = ignite(0).affinity(null);
+
+ Integer key0 = null;
+
+ for (int key = 0; key < 10_000; key++) {
+ if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+ if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+ key0 = key;
+
+ break;
+ }
+ }
+ }
+
+ assertNotNull(key0);
+
+ final Integer key1 = key0;
+ final Integer key2 = primaryKey(cache2);
+
+ TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ try (Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ)) {
+ log.info("Put key1: " + key1);
+
+ cache0.put(key1, key1);
+
+ log.info("Put key2: " + key2);
+
+ cache0.put(key2, key2);
+
+ log.info("Start prepare.");
+
+ IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+ commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+ IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+ waitPrepared(ignite(1));
+
+ log.info("Stop one primary node.");
+
+ stopGrid(1);
+
+ U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+ commSpi.stopBlock();
+
+ prepFut.get(10_000);
+
+ if (rollback) {
+ log.info("Rollback.");
+
+ tx.rollback();
+ }
+ else {
+ log.info("Commit.");
+
+ tx.commit();
+ }
+ }
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+
+ return true;
+ }
+ catch (AssertionError e) {
+ log.info("Check failed: " + e);
+
+ return false;
+ }
+ }
+ }, 5000);
+
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+ primaryAndOriginatingNodeFailure(false, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+ primaryAndOriginatingNodeFailure(true, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+ primaryAndOriginatingNodeFailure(false, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+ primaryAndOriginatingNodeFailure(true, false);
+ }
+
+ /**
+ * @param rollback If {@code true} tests rollback after primary node failure.
+ * @param optimistic If {@code true} tests optimistic transaction.
+ * @throws Exception If failed.
+ */
+ private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+ IgniteCache<Integer, Integer> cache0 = jcache(0);
+ IgniteCache<Integer, Integer> cache1 = jcache(1);
+ IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+ final Integer key1 = primaryKey(cache1);
+ final Integer key2 = primaryKey(cache2);
+
+ TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+ IgniteTransactions txs = ignite(0).transactions();
+
+ Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
+
+ log.info("Put key1: " + key1);
+
+ cache0.put(key1, key1);
+
+ log.info("Put key2: " + key2);
+
+ cache0.put(key2, key2);
+
+ log.info("Start prepare.");
+
+ IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+ commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+ IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+ waitPrepared(ignite(1));
+
+ log.info("Stop one primary node.");
+
+ stopGrid(1);
+
+ U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+ if (!rollback) {
+ commSpi.stopBlock();
+
+ prepFut.get(10_000);
+ }
+
+ log.info("Stop originating node.");
+
+ stopGrid(0);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+
+ return true;
+ }
+ catch (AssertionError e) {
+ log.info("Check failed: " + e);
+
+ return false;
+ }
+ }
+ }, 5000);
+
+ checkKey(key1, rollback);
+ checkKey(key2, rollback);
+ }
+
+ /**
+ * @param key Key.
+ * @param expNull {@code True} if {@code null} value is expected.
+ */
+ private void checkKey(Integer key, boolean expNull) {
+ Affinity<Integer> aff = ignite(2).affinity(null);
+
+ Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+ assertFalse(nodes.isEmpty());
+
+ for (ClusterNode node : nodes) {
+ Ignite ignite = grid(node);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ if (expNull)
+ assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+ else
+ assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+ }
+ }
+
+ /**
+ * @param ignite Node.
+ * @throws Exception If failed.
+ */
+ private void waitPrepared(Ignite ignite) throws Exception {
+ final IgniteTxManager tm = ((IgniteKernal)ignite).context().cache().context().tm();
+
+ boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ GridDhtTxLocal locTx = null;
+
+ for (IgniteInternalTx tx : tm.txs()) {
+ if (tx instanceof GridDhtTxLocal) {
+ assertNull("Only one tx is expected.", locTx);
+
+ locTx = (GridDhtTxLocal)tx;
+ }
+ }
+
+ log.info("Wait for tx, state: " + (locTx != null ? locTx.state() : null));
+
+ return locTx != null && locTx.state() == PREPARED;
+ }
+ }, 5000);
+
+ assertTrue("Failed to wait for tx.", wait);
+ }
+
+ /**
+ *
+ */
+ private static class TestCommunicationSpi extends TcpCommunicationSpi {
+ /** Logger. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** */
+ private UUID blockNodeId;
+
+ /** */
+ private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ if (msg instanceof GridIoMessage) {
+ Object msg0 = ((GridIoMessage)msg).message();
+
+ if (msg0 instanceof GridNearTxPrepareRequest) {
+ synchronized (this) {
+ if (blockNodeId != null && blockNodeId.equals(node.id())) {
+ log.info("Block message: " + msg0);
+
+ blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+ return;
+ }
+ }
+ }
+ }
+
+ super.sendMessage(node, msg);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ void blockMessages(UUID nodeId) {
+ blockNodeId = nodeId;
+ }
+
+ /**
+ *
+ */
+ void stopBlock() {
+ synchronized (this) {
+ blockNodeId = null;
+
+ for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+ log.info("Send blocked message: " + msg.get2().message());
+
+ super.sendMessage(msg.get1(), msg.get2());
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index e832099..1bd0e5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -33,6 +33,10 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Cache tx recovery test suite");
+ suite.addTestSuite(IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.class);
+ suite.addTestSuite(IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.class);
+ suite.addTestSuite(IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);
+
suite.addTestSuite(GridCachePartitionedTxOriginatingNodeFailureSelfTest.class);
suite.addTestSuite(GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.class);
suite.addTestSuite(GridCacheReplicatedTxOriginatingNodeFailureSelfTest.class);