You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/08 11:31:23 UTC

[05/48] incubator-ignite git commit: # ignite-157-2 Tests and fix for tx recovery issue

# ignite-157-2 Tests and fix for tx recovery issue


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5daaa278
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5daaa278
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5daaa278

Branch: refs/heads/ignite-373
Commit: 5daaa278afcca7e00be5002e3d5247661c6ba474
Parents: 4b775f0
Author: sboikov <sb...@gridgain.com>
Authored: Wed Apr 29 12:48:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Apr 29 17:13:48 2015 +0300

----------------------------------------------------------------------
 ...ridCacheOptimisticCheckPreparedTxFuture.java |  78 +++-
 ...idCacheOptimisticCheckPreparedTxRequest.java |  47 +-
 .../GridDistributedTxRemoteAdapter.java         |   2 +-
 .../cache/transactions/IgniteInternalTx.java    |   5 +-
 .../cache/transactions/IgniteTxAdapter.java     |   2 +-
 .../cache/transactions/IgniteTxHandler.java     |   3 +-
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 .../cache/transactions/IgniteTxManager.java     |  47 +-
 ...xOriginatingNodeFailureAbstractSelfTest.java |   2 +-
 ...rDisabledPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...rtitionedPrimaryNodeFailureRecoveryTest.java |  31 ++
 ...woBackupsPrimaryNodeFailureRecoveryTest.java |  37 ++
 ...ePrimaryNodeFailureRecoveryAbstractTest.java | 454 +++++++++++++++++++
 .../IgniteCacheTxRecoverySelfTestSuite.java     |   4 +
 14 files changed, 713 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 8a14b48..3e345f4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -70,6 +70,9 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
     /** Transaction nodes mapping. */
     private final Map<UUID, Collection<UUID>> txNodes;
 
+    /** */
+    private final boolean nearTxCheck;
+
     /**
      * @param cctx Context.
      * @param tx Transaction.
@@ -77,8 +80,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
      * @param txNodes Transaction mapping.
      */
     @SuppressWarnings("ConstantConditions")
-    public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx, IgniteInternalTx tx,
-        UUID failedNodeId, Map<UUID, Collection<UUID>> txNodes) {
+    public GridCacheOptimisticCheckPreparedTxFuture(GridCacheSharedContext<K, V> cctx,
+        IgniteInternalTx tx,
+        UUID failedNodeId,
+        Map<UUID, Collection<UUID>> txNodes)
+    {
         super(cctx.kernalContext(), CU.boolReducer());
 
         this.cctx = cctx;
@@ -114,6 +120,10 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 }
             }
         }
+
+        UUID nearNodeId = tx.eventNodeId();
+
+        nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
     }
 
     /**
@@ -121,6 +131,48 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
      */
     @SuppressWarnings("ConstantConditions")
     public void prepare() {
+        if (nearTxCheck) {
+            UUID nearNodeId = tx.eventNodeId();
+
+            if (cctx.localNodeId().equals(nearNodeId)) {
+                IgniteInternalFuture<Boolean> fut = cctx.tm().txCommitted(tx.nearXidVersion());
+
+                fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                        try {
+                            onDone(fut.get());
+                        }
+                        catch (IgniteCheckedException e) {
+                            onDone(e);
+                        }
+                    }
+                });
+            }
+            else {
+                MiniFuture fut = new MiniFuture(tx.eventNodeId());
+
+                add(fut);
+
+                GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
+                    tx,
+                    0,
+                    true,
+                    futureId(),
+                    fut.futureId());
+
+                try {
+                    cctx.io().send(nearNodeId, req, tx.ioPolicy());
+                }
+                catch (IgniteCheckedException e) {
+                    fut.onError(e);
+                }
+
+                markInitialized();
+            }
+
+            return;
+        }
+
         // First check transactions on local node.
         int locTxNum = nodeTransactions(cctx.localNodeId());
 
@@ -206,6 +258,7 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
 
                     GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(tx,
                         nodeTransactions(id),
+                        false,
                         futureId(),
                         fut.futureId());
 
@@ -228,7 +281,11 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 add(fut);
 
                 GridCacheOptimisticCheckPreparedTxRequest req = new GridCacheOptimisticCheckPreparedTxRequest(
-                    tx, nodeTransactions(nodeId), futureId(), fut.futureId());
+                    tx,
+                    nodeTransactions(nodeId),
+                    false,
+                    futureId(),
+                    fut.futureId());
 
                 try {
                     cctx.io().send(nodeId, req, tx.ioPolicy());
@@ -341,11 +398,18 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends GridCompound
                 cctx.tm().finishOptimisticTxOnRecovery(tx, res);
             }
             else {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to check prepared transactions, " +
-                        "invalidating transaction [err=" + err + ", tx=" + tx + ']');
+                if (nearTxCheck) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to check transaction on near node, " +
+                            "ignoring [err=" + err + ", tx=" + tx + ']');
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to check prepared transactions, " +
+                            "invalidating transaction [err=" + err + ", tx=" + tx + ']');
 
-                cctx.tm().salvageTx(tx);
+                    cctx.tm().salvageTx(tx);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index e83db66..4f2a1d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@ -27,8 +27,7 @@ import java.io.*;
 import java.nio.*;
 
 /**
- * Message sent to check that transactions related to some optimistic transaction
- * were prepared on remote node.
+ * Message sent to check that transactions related to transaction were prepared on remote node.
  */
 public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBaseMessage {
     /** */
@@ -49,6 +48,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
     /** System transaction flag. */
     private boolean sys;
 
+    /** {@code True} if should check only tx on near node. */
+    private boolean nearTxCheck;
+
     /**
      * Empty constructor required by {@link Externalizable}
      */
@@ -59,11 +61,16 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
     /**
      * @param tx Transaction.
      * @param txNum Expected number of transactions on remote node.
+     * @param nearTxCheck
      * @param futId Future ID.
      * @param miniId Mini future ID.
      */
-    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx, int txNum, IgniteUuid futId,
-        IgniteUuid miniId) {
+    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx tx,
+        int txNum,
+        boolean nearTxCheck,
+        IgniteUuid futId,
+        IgniteUuid miniId)
+    {
         super(tx.xidVersion(), 0);
 
         nearXidVer = tx.nearXidVersion();
@@ -72,6 +79,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
         this.futId = futId;
         this.miniId = miniId;
         this.txNum = txNum;
+        this.nearTxCheck = nearTxCheck;
+    }
+
+    /**
+     * @return {@code True} if should check only tx on near node.
+     */
+    public boolean nearTxCheck() {
+        return nearTxCheck;
     }
 
     /**
@@ -137,18 +152,24 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMessage("nearXidVer", nearXidVer))
+                if (!writer.writeBoolean("nearTxCheck", nearTxCheck))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("sys", sys))
+                if (!writer.writeMessage("nearXidVer", nearXidVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
+                if (!writer.writeBoolean("sys", sys))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
                 if (!writer.writeInt("txNum", txNum))
                     return false;
 
@@ -187,7 +208,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 reader.incrementState();
 
             case 10:
-                nearXidVer = reader.readMessage("nearXidVer");
+                nearTxCheck = reader.readBoolean("nearTxCheck");
 
                 if (!reader.isLastRead())
                     return false;
@@ -195,7 +216,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 reader.incrementState();
 
             case 11:
-                sys = reader.readBoolean("sys");
+                nearXidVer = reader.readMessage("nearXidVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -203,6 +224,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
                 reader.incrementState();
 
             case 12:
+                sys = reader.readBoolean("sys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 txNum = reader.readInt("txNum");
 
                 if (!reader.isLastRead())
@@ -222,7 +251,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest extends GridDistributedBa
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 13;
+        return 14;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 5c75390..3215138 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -206,7 +206,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
     }
 
     /** {@inheritDoc} */
-    @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
+    @Override public GridTuple<CacheObject> peek(GridCacheContext cacheCtx,
         boolean failFast,
         KeyCacheObject key,
         CacheEntryPredicate[] filter)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 30367c5..8dc07cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -338,8 +338,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
 
     /**
      * Gets node ID which directly started this transaction. In case of DHT local transaction it will be
-     * near node ID, in case of DHT remote transaction it will be primary node ID, in case of replicated remote
-     * transaction it will be starter node ID.
+     * near node ID, in case of DHT remote transaction it will be primary node ID.
      *
      * @return Originating node ID.
      */
@@ -507,7 +506,7 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
      * @return Current value for the key within transaction.
      * @throws GridCacheFilterFailedException If filter failed and failFast is {@code true}.
      */
-     @Nullable public <K, V> GridTuple<CacheObject> peek(
+     @Nullable public GridTuple<CacheObject> peek(
          GridCacheContext ctx,
          boolean failFast,
          KeyCacheObject key,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 1c02356..82d68b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1964,7 +1964,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public <K, V> GridTuple<CacheObject> peek(GridCacheContext ctx,
+        @Nullable @Override public GridTuple<CacheObject> peek(GridCacheContext ctx,
             boolean failFast,
             KeyCacheObject key,
             @Nullable CacheEntryPredicate[] filter) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6b45fee..6843075 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1183,7 +1183,8 @@ public class IgniteTxHandler {
         if (log.isDebugEnabled())
             log.debug("Processing check prepared transaction requests [nodeId=" + nodeId + ", req=" + req + ']');
 
-        IgniteInternalFuture<Boolean> fut = ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
+        IgniteInternalFuture<Boolean> fut = req.nearTxCheck() ? ctx.tm().txCommitted(req.nearXidVersion()) :
+            ctx.tm().txsPreparedOrCommitted(req.nearXidVersion(), req.transactions());
 
         if (fut == null || fut.isDone()) {
             boolean prepared;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index dfce09c..fc3efba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -330,7 +330,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
 
     /** {@inheritDoc} */
     @SuppressWarnings({"RedundantTypeArguments"})
-    @Nullable @Override public <K, V> GridTuple<CacheObject> peek(
+    @Nullable @Override public GridTuple<CacheObject> peek(
         GridCacheContext cacheCtx,
         boolean failFast,
         KeyCacheObject key,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c494602..19efc5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -727,14 +727,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param txId Transaction ID.
-     * @return Transaction with given ID.
-     */
-    @Nullable public IgniteInternalTx txx(GridCacheVersion txId) {
-        return idMap.get(txId);
-    }
-
-    /**
      * Handles prepare stage of 2PC.
      *
      * @param tx Transaction to prepare.
@@ -1770,6 +1762,45 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param ver Version.
+     * @return Future for flag indicating if transactions was committed.
+     */
+    public IgniteInternalFuture<Boolean> txCommitted(GridCacheVersion ver) {
+        final GridFutureAdapter<Boolean> resFut = new GridFutureAdapter<>();
+
+        final IgniteInternalTx tx = cctx.tm().tx(ver);
+
+        if (tx != null) {
+            assert tx.near() && tx.local() : tx;
+
+            if (log.isDebugEnabled())
+                log.debug("Found near transaction, will wait for completion: " + tx);
+
+            tx.finishFuture().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() {
+                @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) {
+                    TransactionState state = tx.state();
+
+                    if (log.isDebugEnabled())
+                        log.debug("Near transaction finished with state: " + state);
+
+                    resFut.onDone(state == COMMITTED);
+                }
+            });
+
+            return resFut;
+        }
+
+        Boolean committed = completedVers.get(ver);
+
+        if (log.isDebugEnabled())
+            log.debug("Near transaction committed: " + committed);
+
+        resFut.onDone(committed != null && committed);
+
+        return resFut;
+    }
+
+    /**
      * @param nearVer Near version ID.
      * @param txNum Number of transactions.
      * @param fut Result future.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
index 00bd43f..d664aa8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxOriginatingNodeFailureAbstractSelfTest.java
@@ -156,7 +156,7 @@ public abstract class IgniteTxOriginatingNodeFailureAbstractSelfTest extends Gri
 
                 TransactionProxyImpl tx = (TransactionProxyImpl)txIgniteNode.transactions().txStart();
 
-                IgniteInternalTx txEx = GridTestUtils.getFieldValue(tx, "tx");
+                IgniteInternalTx txEx = tx.tx();
 
                 assertTrue(txEx.optimistic());
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..62d9b79
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest
+    extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..a40c989
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedPrimaryNodeFailureRecoveryTest
+    extends IgniteCachePrimaryNodeFailureRecoveryAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return new NearCacheConfiguration();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
new file mode 100644
index 0000000..70eef1d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest
+    extends IgniteCachePartitionedPrimaryNodeFailureRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        assertEquals(1, ccfg.getBackups());
+
+        ccfg.setBackups(2);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
new file mode 100644
index 0000000..7a393d8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePrimaryNodeFailureRecoveryAbstractTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.transactions.TransactionState.*;
+
+/**
+ *
+ */
+public abstract class IgniteCachePrimaryNodeFailureRecoveryAbstractTest extends IgniteCacheAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TestCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        startGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRecovery1() throws Exception {
+        primaryNodeFailure(false, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRecovery2() throws Exception {
+        primaryNodeFailure(true, false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRollback1() throws Exception {
+        primaryNodeFailure(false, true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryNodeFailureRollback2() throws Exception {
+        primaryNodeFailure(true, true, true);
+    }
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRecovery1() throws Exception {
+        primaryNodeFailure(false, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRecovery2() throws Exception {
+        primaryNodeFailure(true, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRollback1() throws Exception {
+        primaryNodeFailure(false, true, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryNodeFailureRollback2() throws Exception {
+        primaryNodeFailure(true, true, false);
+    }
+
+    /**
+     * @param locBackupKey If {@code true} uses recovery for local backup key.
+     * @param rollback If {@code true} tests rollback after primary node failure.
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void primaryNodeFailure(boolean locBackupKey, final boolean rollback, boolean optimistic) throws Exception {
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
+        IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+        Affinity<Integer> aff = ignite(0).affinity(null);
+
+        Integer key0 = null;
+
+        for (int key = 0; key < 10_000; key++) {
+            if (aff.isPrimary(ignite(1).cluster().localNode(), key)) {
+                if (locBackupKey == aff.isBackup(ignite(0).cluster().localNode(), key)) {
+                    key0 = key;
+
+                    break;
+                }
+            }
+        }
+
+        assertNotNull(key0);
+
+        final Integer key1 = key0;
+        final Integer key2 = primaryKey(cache2);
+
+        TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+        IgniteTransactions txs = ignite(0).transactions();
+
+        try (Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ)) {
+            log.info("Put key1: " + key1);
+
+            cache0.put(key1, key1);
+
+            log.info("Put key2: " + key2);
+
+            cache0.put(key2, key2);
+
+            log.info("Start prepare.");
+
+            IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+            commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+            IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+            waitPrepared(ignite(1));
+
+            log.info("Stop one primary node.");
+
+            stopGrid(1);
+
+            U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+            commSpi.stopBlock();
+
+            prepFut.get(10_000);
+
+            if (rollback) {
+                log.info("Rollback.");
+
+                tx.rollback();
+            }
+            else {
+                log.info("Commit.");
+
+                tx.commit();
+            }
+        }
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    checkKey(key1, rollback);
+                    checkKey(key2, rollback);
+
+                    return true;
+                }
+                catch (AssertionError e) {
+                    log.info("Check failed: " + e);
+
+                    return false;
+                }
+            }
+        }, 5000);
+
+        checkKey(key1, rollback);
+        checkKey(key2, rollback);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+        primaryAndOriginatingNodeFailure(false, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testOptimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+        primaryAndOriginatingNodeFailure(true, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRecovery() throws Exception {
+        primaryAndOriginatingNodeFailure(false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPessimisticPrimaryAndOriginatingNodeFailureRollback() throws Exception {
+        primaryAndOriginatingNodeFailure(true, false);
+    }
+
+    /**
+     * @param rollback If {@code true} tests rollback after primary node failure.
+     * @param optimistic If {@code true} tests optimistic transaction.
+     * @throws Exception If failed.
+     */
+    private void primaryAndOriginatingNodeFailure(final boolean rollback, boolean optimistic) throws Exception {
+        IgniteCache<Integer, Integer> cache0 = jcache(0);
+        IgniteCache<Integer, Integer> cache1 = jcache(1);
+        IgniteCache<Integer, Integer> cache2 = jcache(2);
+
+        final Integer key1 = primaryKey(cache1);
+        final Integer key2 = primaryKey(cache2);
+
+        TestCommunicationSpi commSpi = (TestCommunicationSpi)ignite(0).configuration().getCommunicationSpi();
+
+        IgniteTransactions txs = ignite(0).transactions();
+
+        Transaction tx = txs.txStart(optimistic ? OPTIMISTIC : PESSIMISTIC, REPEATABLE_READ);
+
+        log.info("Put key1: " + key1);
+
+        cache0.put(key1, key1);
+
+        log.info("Put key2: " + key2);
+
+        cache0.put(key2, key2);
+
+        log.info("Start prepare.");
+
+        IgniteInternalTx txEx = ((TransactionProxyImpl)tx).tx();
+
+        commSpi.blockMessages(ignite(2).cluster().localNode().id()); // Do not allow to finish prepare for key2.
+
+        IgniteInternalFuture<IgniteInternalTx> prepFut = txEx.prepareAsync();
+
+        waitPrepared(ignite(1));
+
+        log.info("Stop one primary node.");
+
+        stopGrid(1);
+
+        U.sleep(1000); // Wait some time to catch possible issues in tx recovery.
+
+        if (!rollback) {
+            commSpi.stopBlock();
+
+            prepFut.get(10_000);
+        }
+
+        log.info("Stop originating node.");
+
+        stopGrid(0);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    checkKey(key1, rollback);
+                    checkKey(key2, rollback);
+
+                    return true;
+                }
+                catch (AssertionError e) {
+                    log.info("Check failed: " + e);
+
+                    return false;
+                }
+            }
+        }, 5000);
+
+        checkKey(key1, rollback);
+        checkKey(key2, rollback);
+    }
+
+    /**
+     * @param key Key.
+     * @param expNull {@code True} if {@code null} value is expected.
+     */
+    private void checkKey(Integer key, boolean expNull) {
+        Affinity<Integer> aff = ignite(2).affinity(null);
+
+        Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+        assertFalse(nodes.isEmpty());
+
+        for (ClusterNode node : nodes) {
+            Ignite ignite = grid(node);
+
+            IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+            if (expNull)
+                assertNull("Unexpected value for: " + ignite.name(), cache.localPeek(key));
+            else
+                assertEquals("Unexpected value for: " + ignite.name(), key, cache.localPeek(key));
+        }
+    }
+
+    /**
+     * @param ignite Node.
+     * @throws Exception If failed.
+     */
+    private void waitPrepared(Ignite ignite) throws Exception {
+        final IgniteTxManager tm = ((IgniteKernal)ignite).context().cache().context().tm();
+
+        boolean wait = GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                GridDhtTxLocal locTx = null;
+
+                for (IgniteInternalTx tx : tm.txs()) {
+                    if (tx instanceof GridDhtTxLocal) {
+                        assertNull("Only one tx is expected.", locTx);
+
+                        locTx = (GridDhtTxLocal)tx;
+                    }
+                }
+
+                log.info("Wait for tx, state: " + (locTx != null ? locTx.state() : null));
+
+                return locTx != null && locTx.state() == PREPARED;
+            }
+        }, 5000);
+
+        assertTrue("Failed to wait for tx.", wait);
+    }
+
+    /**
+     *
+     */
+    private static class TestCommunicationSpi extends TcpCommunicationSpi {
+        /** Logger. */
+        @LoggerResource
+        private IgniteLogger log;
+
+        /** */
+        private UUID blockNodeId;
+
+        /** */
+        private List<T2<ClusterNode, GridIoMessage>> blockedMsgs = new ArrayList<>();
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage) {
+                Object msg0 = ((GridIoMessage)msg).message();
+
+                if (msg0 instanceof GridNearTxPrepareRequest) {
+                    synchronized (this) {
+                        if (blockNodeId != null && blockNodeId.equals(node.id())) {
+                            log.info("Block message: " + msg0);
+
+                            blockedMsgs.add(new T2<>(node, (GridIoMessage)msg));
+
+                            return;
+                        }
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         */
+        void blockMessages(UUID nodeId) {
+            blockNodeId = nodeId;
+        }
+
+        /**
+         *
+         */
+        void stopBlock() {
+            synchronized (this) {
+                blockNodeId = null;
+
+                for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+                    log.info("Send blocked message: " + msg.get2().message());
+
+                    super.sendMessage(msg.get1(), msg.get2());
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5daaa278/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index e832099..1bd0e5f 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -33,6 +33,10 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Cache tx recovery test suite");
 
+        suite.addTestSuite(IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.class);
+        suite.addTestSuite(IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);
+
         suite.addTestSuite(GridCachePartitionedTxOriginatingNodeFailureSelfTest.class);
         suite.addTestSuite(GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest.class);
         suite.addTestSuite(GridCacheReplicatedTxOriginatingNodeFailureSelfTest.class);