You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/13 07:32:36 UTC

[1/3] ignite git commit: Fixes: - allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction - fixed 'full_sync' mode for case when tx primary nodes fail - fixed race between statically configured cache start and Gr

Repository: ignite
Updated Branches:
  refs/heads/master 1d8c4e259 -> 457a9ae4d


http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index d07a1e6..34872c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -85,7 +85,6 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
     }
 
     /** */
-    @SuppressWarnings({"deprecation"})
     private class TestListener implements CommunicationListener<Message> {
         /** */
         private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 7521f2e..b7c0deb 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -49,11 +49,13 @@ import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
+import org.apache.ignite.testframework.junits.spi.GridSpiTest;
 import org.eclipse.jetty.util.ConcurrentHashSet;
 
 /**
  *
  */
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
 public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
     extends GridSpiAbstractTest<T> {
     /** */
@@ -87,7 +89,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
     }
 
     /** */
-    @SuppressWarnings({"deprecation"})
     private class TestListener implements CommunicationListener<Message> {
         /** */
         private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
@@ -151,6 +152,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
             int expMsgs = 0;
 
+            long totAcked = 0;
+
             for (int i = 0; i < 5; i++) {
                 info("Iteration: " + i);
 
@@ -172,6 +175,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
                 expMsgs += msgPerIter;
 
+                final long totAcked0 = totAcked;
+
                 for (TcpCommunicationSpi spi : spis) {
                     GridNioServer srv = U.field(spi, "nioSrvr");
 
@@ -189,6 +194,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
 
                             GridTestUtils.waitForCondition(new GridAbsPredicate() {
                                 @Override public boolean apply() {
+                                    long acked = GridTestUtils.getFieldValue(recoveryDesc, "acked");
+
+                                    return acked > totAcked0;
+                                }
+                            }, 5000);
+
+                            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                                @Override public boolean apply() {
                                     return recoveryDesc.messagesFutures().isEmpty();
                                 }
                             }, 10_000);
@@ -218,6 +231,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
                 }
 
                 assertEquals(msgPerIter * 2, ackMsgs.get());
+
+                totAcked += msgPerIter;
             }
         }
         finally {
@@ -337,6 +352,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
                 return expMsgs == ackMsgs.get();
             }
         }, 5000);
+
+        assertEquals(expMsgs, ackMsgs.get());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0df7da6..4f329e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -118,6 +118,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         TcpDiscoverySpi spi = nodeSpi.get();
 
         if (spi == null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index e0ffc60..949290e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -978,7 +978,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     /**
      * @param key Key.
      * @param cacheName Cache name.
-     * @return Ignite instance which has primary cache for given key.
+     * @return Ignite instance which has backup cache for given key.
      */
     protected Ignite backupNode(Object key, String cacheName) {
         List<Ignite> allGrids = Ignition.allGrids();
@@ -1001,8 +1001,38 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
     }
 
     /**
+     * @param key Key.
+     * @param cacheName Cache name.
+     * @return Ignite instances which has backup cache for given key.
+     */
+    protected List<Ignite> backupNodes(Object key, String cacheName) {
+        List<Ignite> allGrids = Ignition.allGrids();
+
+        assertFalse("There are no alive nodes.", F.isEmpty(allGrids));
+
+        Ignite ignite = allGrids.get(0);
+
+        Affinity<Object> aff = ignite.affinity(cacheName);
+
+        Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+        assertTrue("Expected more than one node for key [key=" + key + ", nodes=" + nodes +']', nodes.size() > 1);
+
+        Iterator<ClusterNode> it = nodes.iterator();
+
+        it.next(); // Skip primary.
+
+        List<Ignite> backups = new ArrayList<>(nodes.size() - 1);
+
+        while (it.hasNext())
+            backups.add(grid(it.next()));
+
+        return backups;
+    }
+
+    /**
      * In ATOMIC cache with CLOCK mode if key is updated from different nodes at same time
-     * only one update wins others are ignored (can happen in test event when updates are executed from
+     * only one update wins others are ignored (can happen in test even when updates are executed from
      * different nodes sequentially), this delay is used to avoid lost updates.
      *
      * @param cache Cache.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index d81efd9..7363c7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedTxOriginatingNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheCommitDelayTxRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedPrimaryNodeFailureRecoveryTest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest;
@@ -39,6 +40,8 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Cache tx recovery test suite");
 
+        suite.addTestSuite(IgniteCacheCommitDelayTxRecoveryTest.class);
+
         suite.addTestSuite(IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.class);
         suite.addTestSuite(IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.class);
         suite.addTestSuite(IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
index c19a8fc..e53f335 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -31,8 +31,10 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -41,7 +43,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
  * @author Raul Kripalani
  */
 public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
-
     /** ZK Cluster size. */
     private static final int ZK_CLUSTER_SIZE = 3;
 
@@ -79,7 +80,6 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         // start the Curator client so we can perform assertions on the ZK state later
         zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
         zkCurator.start();
-
     }
 
     /**
@@ -98,22 +98,21 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         }
 
         stopAllGrids();
-
     }
 
     /**
      * Enhances the default configuration with the {#TcpDiscoveryZookeeperIpFinder}.
      *
      * @param gridName Grid name.
-     * @return
-     * @throws Exception
+     * @return Ignite configuration.
+     * @throws Exception If failed.
      */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration configuration = super.getConfiguration(gridName);
 
         TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi) configuration.getDiscoverySpi();
         TcpDiscoveryZookeeperIpFinder zkIpFinder = new TcpDiscoveryZookeeperIpFinder();
-        zkIpFinder.setAllowDuplicateRegistrations(isAllowDuplicateRegistrations());
+        zkIpFinder.setAllowDuplicateRegistrations(allowDuplicateRegistrations);
 
         // first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent
         // shall be configured through system property
@@ -126,11 +125,12 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         }
 
         tcpDisco.setIpFinder(zkIpFinder);
+
         return configuration;
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testOneIgniteNodeIsAlone() throws Exception {
         startGrid(0);
@@ -141,7 +141,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testTwoIgniteNodesFindEachOther() throws Exception {
         // start one node
@@ -164,7 +164,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testThreeNodesWithThreeDifferentConfigMethods() throws Exception {
         // start one node
@@ -195,7 +195,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testFourNodesStartingAndStopping() throws Exception {
         // start one node
@@ -242,10 +242,10 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testFourNodesWithDuplicateRegistrations() throws Exception {
-        setAllowDuplicateRegistrations(true);
+        allowDuplicateRegistrations = true;
 
         // start 4 nodes
         System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -265,10 +265,10 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testFourNodesWithNoDuplicateRegistrations() throws Exception {
-        setAllowDuplicateRegistrations(false);
+        allowDuplicateRegistrations = false;
 
         // start 4 nodes
         System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -288,10 +288,10 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testFourNodesRestartLastSeveralTimes() throws Exception {
-        setAllowDuplicateRegistrations(false);
+        allowDuplicateRegistrations = false;
 
         // start 4 nodes
         System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -321,14 +321,13 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
         stopAllGrids();
 
         assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
     }
 
     /**
-     * @throws Exception
+     * @throws Exception If failed.
      */
     public void testFourNodesKillRestartZookeeper() throws Exception {
-        setAllowDuplicateRegistrations(false);
+        allowDuplicateRegistrations = false;
 
         // start 4 nodes
         System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -357,14 +356,28 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
 
         // stop all grids
         stopAllGrids();
-        Thread.sleep(2000);
+
+        GridTestUtils.waitForCondition(new GridAbsPredicate() {
+            @Override public boolean apply() {
+                try {
+                    return zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size() == 0;
+                }
+                catch (Exception e) {
+                    fail("Unexpected error: ");
+
+                    return true;
+                }
+            }
+        }, 5000);
 
         // check that all nodes are gone in ZK
         assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
     }
 
     /**
-     * @throws Exception
+     * @param ignite Node.
+     * @param joinEventCount Expected events number.
+     * @return Events latch.
      */
     private CountDownLatch expectJoinEvents(Ignite ignite, int joinEventCount) {
         final CountDownLatch latch = new CountDownLatch(joinEventCount);
@@ -378,18 +391,4 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
 
         return latch;
     }
-
-    /**
-     * @throws Exception
-     */
-    public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
-        this.allowDuplicateRegistrations = allowDuplicateRegistrations;
-    }
-
-    /**
-     * @throws Exception
-     */
-    public boolean isAllowDuplicateRegistrations() {
-        return allowDuplicateRegistrations;
-    }
 }
\ No newline at end of file


[3/3] ignite git commit: Fixes: - allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction - fixed 'full_sync' mode for case when tx primary nodes fail - fixed race between statically configured cache start and Gr

Posted by sb...@apache.org.
Fixes:
 - allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction
 - fixed 'full_sync' mode for case when tx primary nodes fail
 - fixed race between statically configured cache start and GridDhtAffinityAssignmentRequest
 - fixed 'prepareMarshal' methods to marshal only once (ignite-2219)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/457a9ae4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/457a9ae4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/457a9ae4

Branch: refs/heads/master
Commit: 457a9ae4d3b0d6eef6e92a15f5ef79c15ccf1f95
Parents: 1d8c4e2
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jan 13 09:21:09 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jan 13 09:29:17 2016 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteTransactions.java   |   4 +-
 .../apache/ignite/internal/IgniteKernal.java    |  95 +++-
 .../cache/CacheEntrySerializablePredicate.java  |   3 +-
 .../cache/CacheInvokeDirectResult.java          |   4 +-
 .../processors/cache/GridCacheIoManager.java    |  23 +
 .../processors/cache/GridCacheProcessor.java    |  52 ++-
 .../processors/cache/GridCacheReturn.java       |   2 +
 .../processors/cache/IgniteCacheProxy.java      |   2 +-
 .../GridDistributedLockResponse.java            |   2 +-
 .../GridDistributedTxFinishRequest.java         |  11 +-
 .../GridDistributedTxPrepareRequest.java        |   2 +-
 .../GridDistributedTxPrepareResponse.java       |   4 +-
 .../dht/GridDhtAffinityAssignmentResponse.java  |   4 +-
 .../distributed/dht/GridDhtLockFuture.java      |  44 +-
 .../distributed/dht/GridDhtLockRequest.java     |   2 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |  90 ++--
 .../dht/GridDhtTxFinishResponse.java            |   4 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |   1 +
 .../dht/GridDhtTxPrepareRequest.java            |   2 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |  21 +-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |   6 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java |  22 +-
 .../atomic/GridNearAtomicUpdateResponse.java    |   4 +-
 .../dht/preloader/GridDhtForceKeysResponse.java |   6 +-
 .../GridDhtPartitionDemandMessage.java          |   6 +-
 .../GridDhtPartitionSupplyMessageV2.java        |   6 +-
 .../preloader/GridDhtPartitionsFullMessage.java |   2 +-
 .../GridDhtPartitionsSingleMessage.java         |   4 +-
 .../distributed/near/GridNearGetResponse.java   |   4 +-
 ...ridNearOptimisticTxPrepareFutureAdapter.java |   6 +-
 .../near/GridNearSingleGetResponse.java         |   2 +-
 .../near/GridNearTxFinishFuture.java            | 432 ++++++++++++++-----
 .../near/GridNearTxFinishRequest.java           |   5 +
 .../near/GridNearTxFinishResponse.java          |   4 +-
 .../cache/distributed/near/GridNearTxLocal.java |   2 +-
 .../near/GridNearTxPrepareResponse.java         |   4 +-
 .../cache/query/GridCacheQueryRequest.java      |  16 +-
 .../cache/query/GridCacheQueryResponse.java     |  18 +-
 .../cache/transactions/IgniteInternalTx.java    |   6 +
 .../cache/transactions/IgniteTxAdapter.java     |  23 +-
 .../cache/transactions/IgniteTxEntry.java       |  11 +-
 .../cache/transactions/IgniteTxHandler.java     |  26 +-
 .../transactions/IgniteTxLocalAdapter.java      |   6 +-
 .../cache/transactions/IgniteTxManager.java     |  20 +
 .../datastreamer/DataStreamerRequest.java       |   1 +
 .../datastructures/DataStructuresProcessor.java |  11 +-
 .../processors/igfs/IgfsAckMessage.java         |   4 +-
 .../handlers/cache/GridCacheCommandHandler.java |   6 +-
 .../ignite/spi/discovery/DiscoverySpi.java      |   2 +
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 ...cheAbstractFullApiMultithreadedSelfTest.java |  13 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   2 +-
 .../processors/cache/GridCacheStopSelfTest.java |   2 +-
 .../cache/IgniteDynamicCacheStartSelfTest.java  |  30 +-
 .../IgniteClientDataStructuresAbstractTest.java |   3 +
 .../dht/GridCacheTxNodeFailureSelfTest.java     |   7 +-
 .../IgniteCacheCommitDelayTxRecoveryTest.java   | 376 ++++++++++++++++
 .../IgniteCachePutRetryAbstractSelfTest.java    |  36 +-
 ...gniteCachePutRetryTransactionalSelfTest.java |  21 +
 .../continuous/GridEventConsumeSelfTest.java    |   3 +
 .../internal/util/nio/GridNioSelfTest.java      |  11 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   1 -
 ...CommunicationRecoveryAckClosureSelfTest.java |  19 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java |   2 +
 .../junits/common/GridCommonAbstractTest.java   |  34 +-
 .../IgniteCacheTxRecoverySelfTestSuite.java     |   3 +
 .../tcp/ipfinder/zk/ZookeeperIpFinderTest.java  |  69 ++-
 67 files changed, 1314 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
index 875b647..dfe6a1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
@@ -18,7 +18,7 @@
 package org.apache.ignite;
 
 import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -54,7 +54,7 @@ import org.apache.ignite.transactions.TransactionMetrics;
 public interface IgniteTransactions {
     /**
      * Starts transaction with default isolation, concurrency, timeout, and invalidation policy.
-     * All defaults are set in {@link CacheConfiguration} at startup.
+     * All defaults are set in {@link TransactionConfiguration} at startup.
      *
      * @return New transaction
      * @throws IllegalStateException If transaction is already started by this thread.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 14b5816..3def718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1424,8 +1424,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     }
 
     /** @throws IgniteCheckedException If registration failed. */
-    private void registerExecutorMBeans(ExecutorService execSvc, ExecutorService sysExecSvc, ExecutorService p2pExecSvc,
-        ExecutorService mgmtExecSvc, ExecutorService restExecSvc) throws IgniteCheckedException {
+    private void registerExecutorMBeans(ExecutorService execSvc,
+        ExecutorService sysExecSvc,
+        ExecutorService p2pExecSvc,
+        ExecutorService mgmtExecSvc,
+        ExecutorService restExecSvc) throws IgniteCheckedException {
         pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor");
         sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor");
         mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor");
@@ -2414,7 +2417,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         guard();
 
         try {
-            return ctx.cache().publicJCache(name, false);
+            return ctx.cache().publicJCache(name, false, true);
         }
         catch (IgniteCheckedException e) {
             throw CU.convertToCacheException(e);
@@ -2431,7 +2434,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         guard();
 
         try {
-            ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, true, true).get();
+            ctx.cache().dynamicStartCache(cacheCfg,
+                cacheCfg.getName(),
+                null,
+                true,
+                true,
+                true).get();
 
             return ctx.cache().publicJCache(cacheCfg.getName());
         }
@@ -2467,8 +2475,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         guard();
 
         try {
-            if (ctx.cache().cache(cacheCfg.getName()) == null)
-                ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, false, true).get();
+            if (ctx.cache().cache(cacheCfg.getName()) == null) {
+                ctx.cache().dynamicStartCache(cacheCfg,
+                    cacheCfg.getName(),
+                    null,
+                    false,
+                    true,
+                    true).get();
+            }
 
             return ctx.cache().publicJCache(cacheCfg.getName());
         }
@@ -2491,7 +2505,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         guard();
 
         try {
-            ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, true, true).get();
+            ctx.cache().dynamicStartCache(cacheCfg,
+                cacheCfg.getName(),
+                nearCfg,
+                true,
+                true,
+                true).get();
 
             return ctx.cache().publicJCache(cacheCfg.getName());
         }
@@ -2514,11 +2533,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             IgniteInternalCache<Object, Object> cache = ctx.cache().cache(cacheCfg.getName());
 
-            if (cache == null)
-                ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get();
+            if (cache == null) {
+                ctx.cache().dynamicStartCache(cacheCfg,
+                    cacheCfg.getName(),
+                    nearCfg,
+                    false,
+                    true,
+                    true).get();
+            }
             else {
-                if (cache.configuration().getNearConfiguration() == null)
-                    ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get();
+                if (cache.configuration().getNearConfiguration() == null) {
+                    ctx.cache().dynamicStartCache(cacheCfg,
+                        cacheCfg.getName(),
+                        nearCfg,
+                        false,
+                        true,
+                        true).get();
+                }
             }
 
             return ctx.cache().publicJCache(cacheCfg.getName());
@@ -2538,7 +2569,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         guard();
 
         try {
-            ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true, true).get();
+            ctx.cache().dynamicStartCache(null,
+                cacheName,
+                nearCfg,
+                true,
+                true,
+                true).get();
 
             IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
 
@@ -2564,11 +2600,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         try {
             IgniteInternalCache<Object, Object> internalCache = ctx.cache().cache(cacheName);
 
-            if (internalCache == null)
-                ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get();
+            if (internalCache == null) {
+                ctx.cache().dynamicStartCache(null,
+                    cacheName,
+                    nearCfg,
+                    false,
+                    true,
+                    true).get();
+            }
             else {
-                if (internalCache.configuration().getNearConfiguration() == null)
-                    ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get();
+                if (internalCache.configuration().getNearConfiguration() == null) {
+                    ctx.cache().dynamicStartCache(null,
+                        cacheName,
+                        nearCfg,
+                        false,
+                        true,
+                        true).get();
+                }
             }
 
             IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
@@ -2587,6 +2635,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
     /**
      * @param cache Cache.
+     * @throws IgniteCheckedException If cache without near cache was already started.
      */
     private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) throws IgniteCheckedException {
         if (!cache.context().isNear())
@@ -2596,7 +2645,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
     /** {@inheritDoc} */
     @Override public void destroyCache(String cacheName) {
-        IgniteInternalFuture stopFut = destroyCacheAsync(cacheName);
+        IgniteInternalFuture stopFut = destroyCacheAsync(cacheName, true);
 
         try {
             stopFut.get();
@@ -2608,13 +2657,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
     /**
      * @param cacheName Cache name.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @return Ignite future.
      */
-    public IgniteInternalFuture<?> destroyCacheAsync(String cacheName) {
+    public IgniteInternalFuture<?> destroyCacheAsync(String cacheName, boolean checkThreadTx) {
         guard();
 
         try {
-            return ctx.cache().dynamicDestroyCache(cacheName);
+            return ctx.cache().dynamicDestroyCache(cacheName, checkThreadTx);
         }
         finally {
             unguard();
@@ -2627,7 +2677,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
         try {
             if (ctx.cache().cache(cacheName) == null)
-                ctx.cache().getOrCreateFromTemplate(cacheName).get();
+                ctx.cache().getOrCreateFromTemplate(cacheName, true).get();
 
             return ctx.cache().publicJCache(cacheName);
         }
@@ -2641,14 +2691,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
     /**
      * @param cacheName Cache name.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @return Future that will be completed when cache is deployed.
      */
-    public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName) {
+    public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName, boolean checkThreadTx) {
         guard();
 
         try {
             if (ctx.cache().cache(cacheName) == null)
-                return ctx.cache().getOrCreateFromTemplate(cacheName);
+                return ctx.cache().getOrCreateFromTemplate(cacheName, checkThreadTx);
 
             return new GridFinishedFuture<>();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
index a243c4e..20cc005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
@@ -86,7 +86,8 @@ public class CacheEntrySerializablePredicate implements CacheEntryPredicate {
 
         p.prepareMarshal(ctx);
 
-        bytes = ctx.marshaller().marshal(p);
+        if (bytes == null)
+            bytes = ctx.marshaller().marshal(p);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
index bee1427..fefa422 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
@@ -104,7 +104,7 @@ public class CacheInvokeDirectResult implements Message {
     public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException {
         key.prepareMarshal(ctx.cacheObjectContext());
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
 
         if (res != null)
@@ -119,7 +119,7 @@ public class CacheInvokeDirectResult implements Message {
     public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         key.finishUnmarshal(ctx.cacheObjectContext(), ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
 
         if (res != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0aa8b1b..b297827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -122,6 +123,28 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
             IgniteInternalFuture<?> fut = null;
 
             if (cacheMsg.partitionExchangeMessage()) {
+                if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
+                    assert cacheMsg.topologyVersion() != null : cacheMsg;
+
+                    AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
+
+                    assert cacheMsg.topologyVersion().compareTo(startTopVer) > 0 :
+                        "Invalid affinity request [startTopVer=" + startTopVer + ", msg=" + cacheMsg + ']';
+
+                    // Need to wait for initial exchange to avoid race between cache start and affinity request.
+                    fut = cctx.exchange().affinityReadyFuture(startTopVer);
+
+                    if (fut != null && !fut.isDone()) {
+                        cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+                            @Override public void run() {
+                                lsnr.onMessage(nodeId, cacheMsg);
+                            }
+                        });
+
+                        return;
+                    }
+                }
+
                 long locTopVer = cctx.discovery().topologyVersion();
                 long rmtTopVer = cacheMsg.topologyVersion().topologyVersion();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ff02e70..eb6d98e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2030,7 +2030,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         try {
             CacheConfiguration cfg = createConfigFromTemplate(cacheName);
 
-            return dynamicStartCache(cfg, cacheName, null, true, true);
+            return dynamicStartCache(cfg, cacheName, null, true, true, true);
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -2041,16 +2041,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * Dynamically starts cache using template configuration.
      *
      * @param cacheName Cache name.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @return Future that will be completed when cache is deployed.
      */
-    public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName) {
+    public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean checkThreadTx) {
         try {
-            if (publicJCache(cacheName, false) != null) // Cache with given name already started.
+            if (publicJCache(cacheName, false, checkThreadTx) != null) // Cache with given name already started.
                 return new GridFinishedFuture<>();
 
             CacheConfiguration cfg = createConfigFromTemplate(cacheName);
 
-            return dynamicStartCache(cfg, cacheName, null, false, true);
+            return dynamicStartCache(cfg, cacheName, null, false, true, checkThreadTx);
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
@@ -2060,6 +2061,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     /**
      * @param cacheName Cache name.
      * @return Cache configuration.
+     * @throws IgniteCheckedException If failed.
      */
     private CacheConfiguration createConfigFromTemplate(String cacheName) throws IgniteCheckedException {
         CacheConfiguration cfgTemplate = null;
@@ -2138,6 +2140,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param cacheName Cache name.
      * @param nearCfg Near cache configuration.
      * @param failIfExists Fail if exists flag.
+     * @param failIfNotStarted If {@code true} fails if cache is not started.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @return Future that will be completed when cache is deployed.
      */
     @SuppressWarnings("IfMayBeConditional")
@@ -2146,9 +2150,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         String cacheName,
         @Nullable NearCacheConfiguration nearCfg,
         boolean failIfExists,
-        boolean failIfNotStarted
+        boolean failIfNotStarted,
+        boolean checkThreadTx
     ) {
-        return dynamicStartCache(ccfg, cacheName, nearCfg, CacheType.USER, failIfExists, failIfNotStarted);
+        return dynamicStartCache(ccfg,
+            cacheName,
+            nearCfg,
+            CacheType.USER,
+            failIfExists,
+            failIfNotStarted,
+            checkThreadTx);
     }
 
     /**
@@ -2157,7 +2168,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @param ccfg Cache configuration.
      * @param cacheName Cache name.
      * @param nearCfg Near cache configuration.
+     * @param cacheType Cache type.
      * @param failIfExists Fail if exists flag.
+     * @param failIfNotStarted If {@code true} fails if cache is not started.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @return Future that will be completed when cache is deployed.
      */
     @SuppressWarnings("IfMayBeConditional")
@@ -2167,9 +2181,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         @Nullable NearCacheConfiguration nearCfg,
         CacheType cacheType,
         boolean failIfExists,
-        boolean failIfNotStarted
+        boolean failIfNotStarted,
+        boolean checkThreadTx
     ) {
-        checkEmptyTransactions();
+        if (checkThreadTx)
+            checkEmptyTransactions();
 
         DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName));
 
@@ -2260,10 +2276,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
     /**
      * @param cacheName Cache name to destroy.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @return Future that will be completed when cache is destroyed.
      */
-    public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) {
-        checkEmptyTransactions();
+    public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName, boolean checkThreadTx) {
+        if (checkThreadTx)
+            checkEmptyTransactions();
 
         DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
 
@@ -2898,7 +2916,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked);
 
         if (cache == null) {
-            dynamicStartCache(null, name, null, false, true).get();
+            dynamicStartCache(null, name, null, false, true, true).get();
 
             cache = jCacheProxies.get(masked);
         }
@@ -3001,21 +3019,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      * @throws IgniteCheckedException If failed.
      */
     public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
-        return publicJCache(cacheName, true);
+        return publicJCache(cacheName, true, true);
     }
 
     /**
      * @param cacheName Cache name.
      * @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
      *        otherwise returns {@code null} in this case.
-     * @param <K> type of keys.
-     * @param <V> type of values.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
-        throws IgniteCheckedException
+    @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName,
+        boolean failIfNotStarted,
+        boolean checkThreadTx) throws IgniteCheckedException
     {
         if (log.isDebugEnabled())
             log.debug("Getting public cache for name: " + cacheName);
@@ -3030,7 +3048,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
 
         if (cache == null) {
-            dynamicStartCache(null, cacheName, null, false, failIfNotStarted).get();
+            dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();
 
             cache = jCacheProxies.get(masked);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 21154c9..a9edb95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -169,6 +169,7 @@ public class GridCacheReturn implements Externalizable, Message {
      * @param cctx Cache context.
      * @param cacheObj Value to set.
      * @param success Success flag to set.
+     * @param keepBinary Keep binary flag.
      * @return This instance for chaining.
      */
     public GridCacheReturn set(
@@ -187,6 +188,7 @@ public class GridCacheReturn implements Externalizable, Message {
     /**
      * @param cctx Cache context.
      * @param cacheObj Cache object.
+     * @param keepBinary Keep binary flag.
      */
     private void initValue(GridCacheContext cctx, @Nullable CacheObject cacheObj, boolean keepBinary) {
         if (loc)

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 27a7587..b64c69c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1626,7 +1626,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
         IgniteInternalFuture<?> fut;
 
         try {
-            fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name());
+            fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), true);
         }
         finally {
             onLeave(gate);

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index bb3f9ff..f088e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -194,7 +194,7 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
 
         prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId));
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 34b3112..a761fec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
 import org.apache.ignite.lang.IgniteUuid;
@@ -85,6 +83,8 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
      * @param invalidate Invalidate flag.
      * @param sys System transaction flag.
      * @param plc IO policy.
+     * @param syncCommit Sync commit flag.
+     * @param syncRollback Sync rollback flag.
      * @param baseVer Base version.
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
@@ -184,6 +184,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
     }
 
     /**
+     * @param syncCommit Sync commit flag.
+     */
+    public void syncCommit(boolean syncCommit) {
+        this.syncCommit = syncCommit;
+    }
+
+    /**
      * @return Sync rollback flag.
      */
     public boolean syncRollback() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index e595942..0d26c84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -317,7 +317,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
         if (reads != null)
             marshalTx(reads, ctx);
 
-        if (dhtVers != null) {
+        if (dhtVers != null && dhtVerKeys == null) {
             for (IgniteTxKey key : dhtVers.keySet()) {
                 GridCacheContext cctx = ctx.cacheContext(key.cacheId());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index d2c5aa4..4d22213 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -93,7 +93,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
     }
 
@@ -101,7 +101,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e731406..8e041c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -104,7 +104,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (affAssignment != null)
+        if (affAssignment != null && affAssignmentBytes == null)
             affAssignmentBytes = ctx.marshaller().marshal(affAssignment);
     }
 
@@ -113,7 +113,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (affAssignmentBytes != null) {
+        if (affAssignmentBytes != null && affAssignment == null) {
             affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr);
 
             // TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 98711b8..1c3e052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -494,14 +494,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         boolean found = false;
 
         for (IgniteInternalFuture<?> fut : futures()) {
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
+            MiniFuture f = (MiniFuture)fut;
 
-                if (f.node().id().equals(nodeId)) {
-                    f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId));
+            if (f.node().id().equals(nodeId)) {
+                f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId));
 
-                    found = true;
-                }
+                found = true;
             }
         }
 
@@ -551,12 +549,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
         synchronized (futs) {
             // Avoid iterator creation.
             for (int i = 0; i < futs.size(); i++) {
-                IgniteInternalFuture<Boolean> fut = futs.get(i);
-
-                if (!isMini(fut))
-                    continue;
-
-                MiniFuture mini = (MiniFuture)fut;
+                MiniFuture mini = (MiniFuture)futs.get(i);
 
                 if (mini.futureId().equals(miniId)) {
                     if (!mini.isDone())
@@ -772,14 +765,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
     }
 
     /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteInternalFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-    /**
      *
      */
     public void map() {
@@ -1006,7 +991,24 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridDhtLockFuture.class, this, super.toString());
+        Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+            @Override public String apply(IgniteInternalFuture<?> f) {
+                MiniFuture m = (MiniFuture)f;
+
+                return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+            }
+        });
+
+        Collection<KeyCacheObject> locks;
+
+        synchronized (this) {
+            locks = new HashSet<>(pendingLocks);
+        }
+
+        return S.toString(GridDhtLockFuture.class, this,
+            "innerFuts", futs,
+            "pendingLocks", locks,
+            "super", super.toString());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 62cf69d..50167d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -311,7 +311,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
 
         prepareMarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId));
 
-        if (owned != null) {
+        if (owned != null && ownedKeys == null) {
             ownedKeys = new KeyCacheObject[owned.size()];
             ownedValues = new GridCacheVersion[ownedKeys.length];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 65f1cb4..2d98e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.UUID;
 import org.apache.ignite.internal.GridDirectCollection;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -44,6 +43,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** */
     private static final long serialVersionUID = 0L;
 
+    /** */
+    public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
     /** Near node ID. */
     private UUID nearNodeId;
 
@@ -64,7 +66,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     @GridDirectCollection(GridCacheVersion.class)
     private Collection<GridCacheVersion> pendingVers;
 
-    /** Check comitted flag. */
+    /** Check committed flag. */
     private boolean checkCommitted;
 
     /** Partition update counter. */
@@ -81,6 +83,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     /** Task name hash. */
     private int taskNameHash;
 
+    /** */
+    private byte flags;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -100,6 +105,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
      * @param sys System flag.
+     * @param plc IO policy.
      * @param sysInvalidate System invalidation flag.
      * @param syncCommit Synchronous commit flag.
      * @param syncRollback Synchronous rollback flag.
@@ -180,6 +186,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
      * @param sys System flag.
+     * @param plc IO policy.
      * @param sysInvalidate System invalidation flag.
      * @param syncCommit Synchronous commit flag.
      * @param syncRollback Synchronous rollback flag.
@@ -302,16 +309,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
     }
 
     /**
-     * Gets versions of not acquired locks with version less then one of transaction being committed.
-     *
-     * @return Versions of locks for entries participating in transaction that have not been acquired yet
-     *      have version less then one of transaction being committed.
-     */
-    public Collection<GridCacheVersion> pendingVersions() {
-        return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
-    }
-
-    /**
      * @return Check committed flag.
      */
     public boolean checkCommitted() {
@@ -325,6 +322,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
         this.checkCommitted = checkCommitted;
     }
 
+    /**
+     * @return {@code True}
+     */
+    public boolean waitRemoteTransactions() {
+        return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+    }
+
+    /**
+     * @param waitRemoteTxs Wait remote transactions flag.
+     */
+    public void waitRemoteTransactions(boolean waitRemoteTxs) {
+        if (waitRemoteTxs)
+            flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
+        else
+            flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
@@ -352,60 +366,66 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeIgniteUuid("miniId", miniId))
+                if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeUuid("nearNodeId", nearNodeId))
+                if (!writer.writeIgniteUuid("miniId", miniId))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+                if (!writer.writeUuid("nearNodeId", nearNodeId))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
                     return false;
 
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 25:
-                if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
                     return false;
 
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 28:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 29:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -436,6 +456,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
                 reader.incrementState();
 
             case 19:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 20:
                 byte isolationOrd;
 
                 isolationOrd = reader.readByte("isolation");
@@ -447,7 +475,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 miniId = reader.readIgniteUuid("miniId");
 
                 if (!reader.isLastRead())
@@ -455,7 +483,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 nearNodeId = reader.readUuid("nearNodeId");
 
                 if (!reader.isLastRead())
@@ -463,7 +491,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 partUpdateCnt = reader.readMessage("partUpdateCnt");
 
                 if (!reader.isLastRead())
@@ -471,7 +499,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -479,7 +507,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -487,7 +515,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 25:
+            case 26:
                 sysInvalidate = reader.readBoolean("sysInvalidate");
 
                 if (!reader.isLastRead())
@@ -495,7 +523,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 26:
+            case 27:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -503,7 +531,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 27:
+            case 28:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -511,7 +539,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
                 reader.incrementState();
 
-            case 28:
+            case 29:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -531,6 +559,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 29;
+        return 30;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index fb4d97d..626ad89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -109,7 +109,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (checkCommittedErr != null)
+        if (checkCommittedErr != null && checkCommittedErrBytes == null)
             checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr);
     }
 
@@ -118,7 +118,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
         throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (checkCommittedErrBytes != null)
+        if (checkCommittedErrBytes != null && checkCommittedErr == null)
             checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index e026b4e..ebf1002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -421,6 +421,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
             if (!state(PREPARING)) {
                 if (state() == PREPARED && isSystemInvalidate())
                     fut.complete();
+
                 if (setRollbackOnly()) {
                     if (timedOut())
                         fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 394ff89..d31ecba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -281,7 +281,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (owned != null) {
+        if (owned != null && ownedKeys == null) {
             ownedKeys = owned.keySet();
 
             ownedVals = owned.values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7bee5a3..7cc276f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -656,11 +656,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
             if (!addDepInfo && ctx.deploymentEnabled())
                 addDepInfo = true;
 
-            invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+            if (invokeArgsBytes == null)
+                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
 
-            entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+            if (entryProcessorsBytes == null)
+                entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
 
-            nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+            if (nearEntryProcessorsBytes == null)
+                nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
         }
     }
 
@@ -681,13 +684,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         finishUnmarshalCacheObjects(prevVals, cctx, ldr);
 
         if (forceTransformBackups) {
-            entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+            if (entryProcessors == null)
+                entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
 
-            invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
-        }
+            if (invokeArgs == null)
+                invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
 
-        if (forceTransformBackups)
-            nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+            if (nearEntryProcessors == null)
+                nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index f1bb323..95fdeb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -165,7 +165,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         prepareMarshalCacheObjects(nearEvicted, cctx);
 
-        errBytes = ctx.marshaller().marshal(err);
+        if (err != null && errBytes == null)
+            errBytes = ctx.marshaller().marshal(err);
     }
 
     /** {@inheritDoc} */
@@ -178,7 +179,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
 
         finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
 
-        err = ctx.marshaller().unmarshal(errBytes, ldr);
+        if (errBytes != null && err == null)
+            err = ctx.marshaller().unmarshal(errBytes, ldr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 7c0aba5..9c4b486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -184,6 +184,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
+     * @param keepBinary Keep binary flag.
      * @param clientReq Client node request flag.
      * @param addDepInfo Deployment info flag.
      */
@@ -593,7 +594,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
                 filter = null;
         }
 
-        if (expiryPlc != null)
+        if (expiryPlc != null && expiryPlcBytes == null)
             expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
 
         if (op == TRANSFORM) {
@@ -601,9 +602,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
             if (!addDepInfo && ctx.deploymentEnabled())
                 addDepInfo = true;
 
-            entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+            if (entryProcessorsBytes == null)
+                entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
 
-            invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+            if (invokeArgsBytes == null)
+                invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
         }
         else
             prepareMarshalCacheObjects(vals, cctx);
@@ -617,8 +620,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
 
         finishUnmarshalCacheObjects(keys, cctx, ldr);
 
-        if (op == TRANSFORM)
-            entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+        if (op == TRANSFORM) {
+            if (entryProcessors == null)
+                entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+            if (invokeArgs == null)
+                invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+        }
         else
             finishUnmarshalCacheObjects(vals, cctx, ldr);
 
@@ -629,9 +637,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
             }
         }
 
-        invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
-
-        if (expiryPlcBytes != null)
+        if (expiryPlcBytes != null && expiryPlc == null)
             expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index b164e7e..3e3ac29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -394,7 +394,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
@@ -413,7 +413,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 4cdecec..9c5238a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -170,7 +170,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
                 info.marshal(cctx);
         }
 
-        errBytes = ctx.marshaller().marshal(err);
+        if (err != null && errBytes == null)
+            errBytes = ctx.marshaller().marshal(err);
     }
 
     /** {@inheritDoc} */
@@ -187,7 +188,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
                 info.unmarshal(cctx, ldr);
         }
 
-        err = ctx.marshaller().unmarshal(errBytes, ldr);
+        if (errBytes != null && err == null)
+            err = ctx.marshaller().unmarshal(errBytes, ldr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 53c3d90..5cb84dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -66,6 +66,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     /**
      * @param updateSeq Update sequence for this node.
      * @param topVer Topology version.
+     * @param cacheId Cache ID.
      */
     GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
         this.cacheId = cacheId;
@@ -75,6 +76,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
 
     /**
      * @param cp Message to copy from.
+     * @param parts Partitions.
      */
     GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts) {
         cacheId = cp.cacheId;
@@ -181,7 +183,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (topic != null)
+        if (topic != null && topicBytes == null)
             topicBytes = ctx.marshaller().marshal(topic);
     }
 
@@ -189,7 +191,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (topicBytes != null)
+        if (topicBytes != null && topic == null)
             topic = ctx.marshaller().unmarshal(topicBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index 41454f9..4451cbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -75,9 +75,13 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
     /**
      * @param updateSeq Update sequence for this node.
      * @param cacheId Cache ID.
+     * @param topVer Topology version.
      * @param addDepInfo Deployment info flag.
      */
-    GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) {
+    GridDhtPartitionSupplyMessageV2(long updateSeq,
+        int cacheId,
+        AffinityTopologyVersion topVer,
+        boolean addDepInfo) {
         this.cacheId = cacheId;
         this.updateSeq = updateSeq;
         this.topVer = topVer;

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0cbdc91..6afb9b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -134,7 +134,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
         if (parts != null && partsBytes == null)
             partsBytes = ctx.marshaller().marshal(parts);
 
-        if (partCntrs != null)
+        if (partCntrs != null && partCntrsBytes == null)
             partCntrsBytes = ctx.marshaller().marshal(partCntrs);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index c07a508..1185913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -138,7 +138,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         if (partsBytes == null && parts != null)
             partsBytes = ctx.marshaller().marshal(parts);
 
-        if (partCntrs != null)
+        if (partCntrsBytes == null && partCntrs != null)
             partCntrsBytes = ctx.marshaller().marshal(partCntrs);
     }
 
@@ -149,7 +149,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
         if (partsBytes != null && parts == null)
             parts = ctx.marshaller().unmarshal(partsBytes, ldr);
 
-        if (partCntrsBytes != null)
+        if (partCntrsBytes != null && partCntrs == null)
             partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 15a791f..6ac91cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -188,7 +188,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
                 info.marshal(cctx);
         }
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
     }
 
@@ -203,7 +203,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
                 info.unmarshal(cctx, ldr);
         }
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index fe6180a..7132567 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -54,9 +54,13 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
 
         AffinityTopologyVersion topVer = null;
 
-        if (tx.system())
+        if (tx.system()) {
             topVer = tx.topologyVersionSnapshot();
 
+            if (topVer == null)
+                topVer = cctx.exchange().readyAffinityVersion();
+        }
+
         if (topVer == null)
             topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index 42ad7ed..314c35c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -167,7 +167,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
                 ((GridCacheEntryInfo)res).marshal(cctx);
         }
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
     }
 


[2/3] ignite git commit: Fixes: - allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction - fixed 'full_sync' mode for case when tx primary nodes fail - fixed race between statically configured cache start and Gr

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 291c88a..1b40d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteCheckedException;
@@ -44,6 +47,7 @@ import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -64,6 +68,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
 
     /** */
+    public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+    /** */
     private static final long serialVersionUID = 0L;
 
     /** Logger reference. */
@@ -122,22 +129,23 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
     @Override public boolean onNodeLeft(UUID nodeId) {
+        boolean found = false;
+
         for (IgniteInternalFuture<?> fut : futures())
             if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
+                MinFuture f = (MinFuture)fut;
 
-                if (f.node().id().equals(nodeId)) {
+                if (f.onNodeLeft(nodeId)) {
                     // Remove previous mapping.
                     mappings.remove(nodeId);
 
-                    f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will fail): " + nodeId));
-
-                    return true;
+                    found = true;
                 }
             }
 
-        return false;
+        return found;
     }
 
     /** {@inheritDoc} */
@@ -156,19 +164,32 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
      * @param nodeId Sender.
      * @param res Result.
      */
+    @SuppressWarnings("ForLoopReplaceableByForEach")
     public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
-        if (!isDone())
-            for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+        if (!isDone()) {
+            FinishMiniFuture finishFut = null;
 
-                    if (f.futureId().equals(res.miniId())) {
-                        assert f.node().id().equals(nodeId);
+            synchronized (futs) {
+                for (int i = 0; i < futs.size(); i++) {
+                    IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i);
+
+                    if (fut.getClass() == FinishMiniFuture.class) {
+                        FinishMiniFuture f = (FinishMiniFuture)fut;
 
-                        f.onResult(res);
+                        if (f.futureId().equals(res.miniId())) {
+                            assert f.node().id().equals(nodeId);
+
+                            finishFut = f;
+
+                            break;
+                        }
                     }
                 }
             }
+
+            if (finishFut != null)
+                finishFut.onNearFinishResponse(res);
+        }
     }
 
     /**
@@ -178,15 +199,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
         if (!isDone())
             for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
-                if (isMini(fut)) {
-                    MiniFuture f = (MiniFuture)fut;
+                if (fut.getClass() == CheckBackupMiniFuture.class) {
+                    CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
 
                     if (f.futureId().equals(res.miniId())) {
                         assert f.node().id().equals(nodeId);
 
-                        f.onResult(res);
+                        f.onDhtFinishResponse(res);
                     }
                 }
+                else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+                    CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+                    if (f.futureId().equals(res.miniId()))
+                        f.onDhtFinishResponse(nodeId);
+                }
             }
     }
 
@@ -204,9 +231,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                 boolean marked = tx.setRollbackOnly();
 
-                if (err instanceof NodeStoppingException)
-                    return super.onDone(null, err);
-
                 if (err instanceof IgniteTxRollbackCheckedException) {
                     if (marked) {
                         try {
@@ -289,11 +313,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     * @param f Future.
+     * @param fut Future.
      * @return {@code True} if mini-future.
      */
-    private boolean isMini(IgniteInternalFuture<?> f) {
-        return f.getClass().equals(MiniFuture.class);
+    private boolean isMini(IgniteInternalFuture<?> fut) {
+        return fut.getClass() == FinishMiniFuture.class ||
+            fut.getClass() == CheckBackupMiniFuture.class ||
+            fut.getClass() == CheckRemoteTxMiniFuture.class;
     }
 
     /**
@@ -393,7 +419,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                 ClusterNode backup = cctx.discovery().node(backupId);
 
-                MiniFuture mini = new MiniFuture(backup, mapping);
+                final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
 
                 add(mini);
 
@@ -414,8 +440,25 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
 
                     readyNearMappingFromBackup(mapping);
 
-                    if (committed)
+                    if (committed) {
+                        if (tx.syncCommit()) {
+                            GridCacheVersion nearXidVer = tx.nearXidVersion();
+
+                            assert nearXidVer != null : tx;
+
+                            IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+                            fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                @Override public void apply(IgniteInternalFuture<?> fut) {
+                                    mini.onDone(tx);
+                                }
+                            });
+
+                            return;
+                        }
+
                         mini.onDone(tx);
+                    }
                     else {
                         ClusterTopologyCheckedException cause =
                             new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
@@ -427,46 +470,26 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                     }
                 }
                 else {
-                    GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
-                        cctx.localNodeId(),
-                        futureId(),
-                        mini.futureId(),
-                        tx.topologyVersion(),
-                        tx.xidVersion(),
-                        tx.commitVersion(),
-                        tx.threadId(),
-                        tx.isolation(),
-                        true,
-                        false,
-                        tx.system(),
-                        tx.ioPolicy(),
-                        false,
-                        true,
-                        true,
-                        null,
-                        null,
-                        null,
-                        null,
-                        0,
-                        null,
-                        0,
-                        tx.activeCachesDeploymentEnabled());
-
-                    finishReq.checkCommitted(true);
+                    GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
+
+                    // Preserve old behavior, otherwise response is not sent.
+                    if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
+                        finishReq.syncCommit(true);
 
                     try {
                         if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0)
                             cctx.io().send(backup, finishReq, tx.ioPolicy());
-                        else
+                        else {
                             mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " +
                                 "the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() +
                                 ", ver=" + backup.version() + ']'));
+                        }
                     }
                     catch (ClusterTopologyCheckedException e) {
-                        mini.onResult(e);
+                        mini.onNodeLeft(backupId);
                     }
                     catch (IgniteCheckedException e) {
-                        mini.onResult(e);
+                        mini.onDone(e);
                     }
                 }
             }
@@ -476,7 +499,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
-     *
+     * @return {@code True} if need to send finish request for one phase commit transaction.
      */
     private boolean needFinishOnePhase() {
         if (tx.mappings().empty())
@@ -584,7 +607,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 add(fut);
         }
         else {
-            MiniFuture fut = new MiniFuture(m);
+            FinishMiniFuture fut = new FinishMiniFuture(m);
 
             req.miniId(fut.futureId());
 
@@ -604,11 +627,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 // Remove previous mapping.
                 mappings.remove(m.node().id());
 
-                fut.onResult(e);
+                fut.onNodeLeft(n.id());
             }
             catch (IgniteCheckedException e) {
                 // Fail the whole thing.
-                fut.onResult(e);
+                fut.onDone(e);
             }
         }
     }
@@ -618,10 +641,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
         Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
             @SuppressWarnings("unchecked")
             @Override public String apply(IgniteInternalFuture<?> f) {
-                if (isMini(f)) {
-                    MiniFuture m = (MiniFuture)f;
+                if (f.getClass() == FinishMiniFuture.class) {
+                    FinishMiniFuture fut = (FinishMiniFuture)f;
+
+                    return "FinishFuture[node=" + fut.node().id() +
+                        ", loc=" + fut.node().isLocal() +
+                        ", done=" + fut.isDone() + "]";
+                }
+                else if (f.getClass() == CheckBackupMiniFuture.class) {
+                    CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
 
-                    return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+                    return "CheckBackupFuture[node=" + fut.node().id() +
+                        ", loc=" + fut.node().isLocal() +
+                        ", done=" + f.isDone() + "]";
+                }
+                else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
+                    CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
+
+                    return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
                 }
                 else
                     return "[loc=true, done=" + f.isDone() + "]";
@@ -634,108 +671,217 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
     }
 
     /**
+     * @param miniId Mini future ID.
+     * @return Finish request.
+     */
+    private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+        GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
+            cctx.localNodeId(),
+            futureId(),
+            miniId,
+            tx.topologyVersion(),
+            tx.xidVersion(),
+            tx.commitVersion(),
+            tx.threadId(),
+            tx.isolation(),
+            true,
+            false,
+            tx.system(),
+            tx.ioPolicy(),
+            false,
+            tx.syncCommit(),
+            tx.syncRollback(),
+            null,
+            null,
+            null,
+            null,
+            0,
+            null,
+            0,
+            tx.activeCachesDeploymentEnabled());
+
+        finishReq.checkCommitted(true);
+
+        return finishReq;
+    }
+
+    /**
+     *
+     */
+    private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
+        /** */
+        private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+        /**
+          * @param nodeId Node ID.
+         * @return {@code True} if future processed node failure.
+         */
+        abstract boolean onNodeLeft(UUID nodeId);
+
+        /**
+         * @return Future ID.
+         */
+        final IgniteUuid futureId() {
+            return futId;
+        }
+    }
+
+    /**
      * Mini-future for get operations. Mini-futures are only waiting on a single
      * node as opposed to multiple nodes.
      */
-    private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+    private class FinishMiniFuture extends MinFuture {
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
         /** Keys. */
         @GridToStringInclude
         private GridDistributedTxMapping m;
 
-        /** Backup check flag. */
-        private ClusterNode backup;
-
         /**
          * @param m Mapping.
          */
-        MiniFuture(GridDistributedTxMapping m) {
+        FinishMiniFuture(GridDistributedTxMapping m) {
             this.m = m;
         }
 
         /**
-         * @param backup Backup to check.
-         * @param m Mapping associated with the backup.
+         * @return Node ID.
          */
-        MiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
-            this.backup = backup;
-            this.m = m;
+        ClusterNode node() {
+            return m.node();
         }
 
         /**
-         * @return Future ID.
+         * @return Keys.
          */
-        IgniteUuid futureId() {
-            return futId;
+        public GridDistributedTxMapping mapping() {
+            return m;
         }
 
         /**
-         * @return Node ID.
+         * @param nodeId Failed node ID.
          */
-        public ClusterNode node() {
-            assert m != null || backup != null;
+        boolean onNodeLeft(UUID nodeId) {
+            if (nodeId.equals(m.node().id())) {
+                if (log.isDebugEnabled())
+                    log.debug("Remote node left grid while sending or waiting for reply: " + this);
+
+                if (isSync()) {
+                    Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+                    if (txNodes != null) {
+                        Collection<UUID> backups = txNodes.get(nodeId);
+
+                        if (!F.isEmpty(backups)) {
+                            final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+
+                            add(mini);
+
+                            GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
+
+                            req.waitRemoteTransactions(true);
+
+                            for (UUID backupId : backups) {
+                                ClusterNode backup = cctx.discovery().node(backupId);
+
+                                if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+                                    if (backup.isLocal()) {
+                                        IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
+
+                                        fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                                mini.onDhtFinishResponse(cctx.localNodeId());
+                                            }
+                                        });
+                                    }
+                                    else {
+                                        try {
+                                            cctx.io().send(backup, req, tx.ioPolicy());
+                                        }
+                                        catch (ClusterTopologyCheckedException e) {
+                                            mini.onNodeLeft(backupId);
+                                        }
+                                        catch (IgniteCheckedException e) {
+                                            mini.onDone(e);
+                                        }
+                                    }
+                                }
+                                else
+                                    mini.onDhtFinishResponse(backupId);
+                            }
+                        }
+                    }
+                }
 
-            return backup != null ? backup : m.node();
+                onDone(tx);
+
+                return true;
+            }
+
+            return false;
         }
 
         /**
-         * @return Keys.
+         * @param res Result callback.
          */
-        public GridDistributedTxMapping mapping() {
-            return m;
+        void onNearFinishResponse(GridNearTxFinishResponse res) {
+            if (res.error() != null)
+                onDone(res.error());
+            else
+                onDone(tx);
         }
 
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+        }
+    }
+
+    /**
+     *
+     */
+    private class CheckBackupMiniFuture extends MinFuture {
+        /** Keys. */
+        @GridToStringInclude
+        private GridDistributedTxMapping m;
+
+        /** Backup node to check. */
+        private ClusterNode backup;
+
         /**
-         * @param e Error.
+         * @param backup Backup to check.
+         * @param m Mapping associated with the backup.
          */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-            // Fail.
-            onDone(e);
+        CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+            this.backup = backup;
+            this.m = m;
         }
 
         /**
-         * @param e Node failure.
+         * @return Node ID.
          */
-        void onResult(ClusterTopologyCheckedException e) {
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+        public ClusterNode node() {
+            return backup;
+        }
 
-            if (backup != null) {
+        /** {@inheritDoc} */
+        @Override boolean onNodeLeft(UUID nodeId) {
+            if (nodeId.equals(backup.id())) {
                 readyNearMappingFromBackup(m);
 
-                onDone(e);
-            }
-            else
-                // Complete future with tx.
-                onDone(tx);
-        }
+                onDone(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
 
-        /**
-         * @param res Result callback.
-         */
-        void onResult(GridNearTxFinishResponse res) {
-            assert backup == null;
+                return true;
+            }
 
-            if (res.error() != null)
-                onDone(res.error());
-            else
-                onDone(tx);
+            return false;
         }
 
         /**
          * @param res Response.
          */
-        void onResult(GridDhtTxFinishResponse res) {
-            assert backup != null;
-
+        void onDhtFinishResponse(GridDhtTxFinishResponse res) {
             readyNearMappingFromBackup(m);
 
             Throwable err = res.checkCommittedError();
@@ -755,9 +901,67 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
                 onDone(tx);
         }
 
+    }
+
+    /**
+     *
+     */
+    private class CheckRemoteTxMiniFuture extends MinFuture {
+        /** */
+        private Set<UUID> nodes;
+
+        /**
+         * @param nodes Backup nodes.
+         */
+        public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+            this.nodes = nodes;
+        }
+
+        /**
+         * @return Backup nodes.
+         */
+        Set<UUID> nodes() {
+            synchronized (this) {
+                return new HashSet<>(nodes);
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override boolean onNodeLeft(UUID nodeId) {
+            return onResponse(nodeId);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         */
+        void onDhtFinishResponse(UUID nodeId) {
+            onResponse(nodeId);
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @return {@code True} if processed node response.
+         */
+        private boolean onResponse(UUID nodeId) {
+            boolean done;
+
+            boolean ret;
+
+            synchronized (this) {
+                ret = nodes.remove(nodeId);
+
+                done = nodes.isEmpty();
+            }
+
+            if (done)
+                onDone(tx);
+
+            return ret;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+            return S.toString(CheckRemoteTxMiniFuture.class, this);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 3e5e28f..65eac63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -70,6 +70,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param commit Commit flag.
      * @param invalidate Invalidate flag.
      * @param sys System flag.
+     * @param plc IO policy.
+     * @param syncCommit Sync commit flag.
+     * @param syncRollback Sync rollback flag.
      * @param explicitLock Explicit lock flag.
      * @param storeEnabled Store enabled flag.
      * @param topVer Topology version.
@@ -77,6 +80,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
      * @param committedVers Committed versions.
      * @param rolledbackVers Rolled back versions.
      * @param txSize Expected transaction size.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
      * @param addDepInfo Deployment info flag.
      */
     public GridNearTxFinishRequest(

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 4904ad8..b84d2fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -99,7 +99,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
     }
 
@@ -107,7 +107,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index aa4e929f..b7b480e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -202,7 +202,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheVersion nearXidVersion() {
+    @Override public GridCacheVersion nearXidVersion() {
         return xidVer;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index d886243..8812709 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -264,7 +264,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
         super.prepareMarshal(ctx);
 
-        if (ownedVals != null) {
+        if (ownedVals != null && ownedValKeys == null) {
             ownedValKeys = ownedVals.keySet();
 
             ownedValVals = ownedVals.values();
@@ -287,7 +287,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
         }
 
         if (filterFailedKeys != null) {
-            for (IgniteTxKey key :filterFailedKeys) {
+            for (IgniteTxKey key : filterFailedKeys) {
                 GridCacheContext cctx = ctx.cacheContext(key.cacheId());
 
                 key.prepareMarshal(cctx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 59d8b5b..dc98eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -280,28 +280,28 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        if (keyValFilter != null) {
+        if (keyValFilter != null && keyValFilterBytes == null) {
             if (addDepInfo)
                 prepareObject(keyValFilter, cctx);
 
             keyValFilterBytes = CU.marshal(cctx, keyValFilter);
         }
 
-        if (rdc != null) {
+        if (rdc != null && rdcBytes == null) {
             if (addDepInfo)
                 prepareObject(rdc, cctx);
 
             rdcBytes = CU.marshal(cctx, rdc);
         }
 
-        if (trans != null) {
+        if (trans != null && transBytes == null) {
             if (addDepInfo)
                 prepareObject(trans, cctx);
 
             transBytes = CU.marshal(cctx, trans);
         }
 
-        if (!F.isEmpty(args)) {
+        if (!F.isEmpty(args) && argsBytes == null) {
             if (addDepInfo) {
                 for (Object arg : args)
                     prepareObject(arg, cctx);
@@ -317,16 +317,16 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
 
         Marshaller mrsh = ctx.marshaller();
 
-        if (keyValFilterBytes != null)
+        if (keyValFilterBytes != null && keyValFilter == null)
             keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr);
 
-        if (rdcBytes != null)
+        if (rdcBytes != null && rdc == null)
             rdc = mrsh.unmarshal(rdcBytes, ldr);
 
-        if (transBytes != null)
+        if (transBytes != null && trans == null)
             trans = mrsh.unmarshal(transBytes, ldr);
 
-        if (argsBytes != null)
+        if (argsBytes != null && args == null)
             args = mrsh.unmarshal(argsBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index cce465b..ab882d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -122,11 +122,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = ctx.marshaller().marshal(err);
 
-        metaDataBytes = marshalCollection(metadata, cctx);
-        dataBytes = marshalCollection(data, cctx);
+        if (metaDataBytes == null)
+            metaDataBytes = marshalCollection(metadata, cctx);
+
+        if (dataBytes == null)
+            dataBytes = marshalCollection(data, cctx);
 
         if (addDepInfo && !F.isEmpty(data)) {
             for (Object o : data) {
@@ -144,11 +147,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
     @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(ctx, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = ctx.marshaller().unmarshal(errBytes, ldr);
 
-        metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
-        data = unmarshalCollection(dataBytes, ctx, ldr);
+        if (metadata == null)
+            metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
+
+        if (data == null)
+            data = unmarshalCollection(dataBytes, ctx, ldr);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index f5f99f5..914b4ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -160,6 +160,12 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
     public long timeout(long timeout);
 
     /**
+     * Changes transaction state from COMMITTING to MARKED_ROLLBACK.
+     * Must be called only from thread committing transaction.
+     */
+    public void errorWhenCommitting();
+
+    /**
      * Modify the transaction associated with the current thread such that the
      * only possible outcome of the transaction is to roll back the
      * transaction.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 22e27c3..ed44c49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -804,6 +804,22 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
     }
 
     /** {@inheritDoc} */
+    public final void errorWhenCommitting() {
+        synchronized (this) {
+            TransactionState prev = state;
+
+            assert prev == COMMITTING : prev;
+
+            state = MARKED_ROLLBACK;
+
+            if (log.isDebugEnabled())
+                log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
+
+            notifyAll();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean setRollbackOnly() {
         return state(MARKED_ROLLBACK);
     }
@@ -1083,7 +1099,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                 }
 
                 case MARKED_ROLLBACK: {
-                    valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING;
+                    valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED;
 
                     break;
                 }
@@ -1705,6 +1721,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
         }
 
         /** {@inheritDoc} */
+        @Override public void errorWhenCommitting() {
+            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+        }
+
+        /** {@inheritDoc} */
         @Override public void commit() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index c42bc7f..f731975 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -828,7 +828,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
         val.marshal(ctx, context());
 
-        expiryPlcBytes = transferExpiryPlc ?  CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)) : null;
+        if (transferExpiryPlc) {
+            if (expiryPlcBytes == null)
+                expiryPlcBytes = CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+        }
+        else
+            expiryPlcBytes = null;
     }
 
     /**
@@ -871,8 +876,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
         val.unmarshal(this.ctx, clsLdr);
 
-        if (expiryPlcBytes != null)
-            expiryPlc =  ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
+        if (expiryPlcBytes != null && expiryPlc == null)
+            expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b25baf8..547c018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -721,14 +721,12 @@ public class IgniteTxHandler {
 
             IgniteInternalFuture<IgniteInternalTx> res = null;
 
-            if (tx != null) {
-                IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+            IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
 
-                // Only for error logging.
-                rollbackFut.listen(CU.errorLogger(log));
+            // Only for error logging.
+            rollbackFut.listen(CU.errorLogger(log));
 
-                res = rollbackFut;
-            }
+            res = rollbackFut;
 
             if (e instanceof Error)
                 throw (Error)e;
@@ -875,7 +873,19 @@ public class IgniteTxHandler {
             log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
 
         if (req.checkCommitted()) {
-            sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version()));
+            boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
+
+            if (!committed || !req.syncCommit())
+                sendReply(nodeId, req, committed);
+            else {
+                IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
+
+                fut.listen(new CI1<IgniteInternalFuture<?>>() {
+                    @Override public void apply(IgniteInternalFuture<?> fut) {
+                        sendReply(nodeId, req, true);
+                    }
+                });
+            }
 
             return;
         }
@@ -1044,7 +1054,7 @@ public class IgniteTxHandler {
      * @param committed {@code True} if transaction committed on this node.
      */
     protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
-        if (req.replyRequired()) {
+        if (req.replyRequired() || req.checkCommitted()) {
             GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
 
             if (req.checkCommitted()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 21ff0cf..926eaf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -809,7 +809,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             catch (IgniteCheckedException ex) {
                 commitError(ex);
 
-                setRollbackOnly();
+                errorWhenCommitting();
 
                 // Safe to remove transaction from committed tx list because nothing was committed yet.
                 cctx.tm().removeCommittedTx(this);
@@ -819,7 +819,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             catch (Throwable ex) {
                 commitError(ex);
 
-                setRollbackOnly();
+                errorWhenCommitting();
 
                 // Safe to remove transaction from committed tx list because nothing was committed yet.
                 cctx.tm().removeCommittedTx(this);
@@ -1161,7 +1161,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                             // Set operation to NOOP.
                             txEntry.op(NOOP);
 
-                            setRollbackOnly();
+                            errorWhenCommitting();
 
                             throw ex;
                         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d384e4e..ca15e20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1063,6 +1063,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
         if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
             uncommitTx(tx);
 
+            tx.errorWhenCommitting();
+
             throw new IgniteException("Missing commit version (consider increasing " +
                 IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
                 ", tx=" + tx.getClass().getSimpleName() + ']');
@@ -1616,6 +1618,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
     }
 
     /**
+     * @param nearVer Near version.
+     * @return Finish future for related remote transactions.
+     */
+    @SuppressWarnings("unchecked")
+    public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
+        GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+
+        for (final IgniteInternalTx tx : txs()) {
+            if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
+                fut.add((IgniteInternalFuture) tx.finishFuture());
+        }
+
+        fut.markInitialized();
+
+        return fut;
+    }
+
+    /**
      * @param nearVer Near version ID.
      * @param txNum Number of transactions.
      * @param fut Result future.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index 3d65304..77c802d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -105,6 +105,7 @@ public class DataStreamerRequest implements Message {
      * @param entries Entries to put.
      * @param ignoreDepOwnership Ignore ownership.
      * @param skipStore Skip store flag.
+     * @param keepBinary Keep binary flag.
      * @param depMode Deployment mode.
      * @param sampleClsName Sample class name.
      * @param userVer User version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index cd783e4..98848ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -926,8 +926,15 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
 
         CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName);
 
-        if (ctx.cache().cache(cacheName) == null)
-            ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.INTERNAL, false, true).get();
+        if (ctx.cache().cache(cacheName) == null) {
+            ctx.cache().dynamicStartCache(newCfg,
+                cacheName,
+                null,
+                CacheType.INTERNAL,
+                false,
+                true,
+                true).get();
+        }
 
         assert ctx.cache().cache(cacheName) != null : cacheName;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index f4a8fad..ecb892e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -90,7 +90,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
     @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
         super.prepareMarshal(marsh);
 
-        if (err != null)
+        if (err != null && errBytes == null)
             errBytes = marsh.marshal(err);
     }
 
@@ -98,7 +98,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
     @Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
         super.finishUnmarshal(marsh, ldr);
 
-        if (errBytes != null)
+        if (errBytes != null && err == null)
             err = marsh.unmarshal(errBytes, ldr);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 65dca08..a89913f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -357,7 +357,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
 
             switch (cmd) {
                 case DESTROY_CACHE: {
-                    fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName).chain(
+                    // Do not check thread tx here since there can be active system cache txs.
+                    fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName, false).chain(
                         new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
                             @Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
                                 throws IgniteCheckedException {
@@ -369,7 +370,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
                 }
 
                 case GET_OR_CREATE_CACHE: {
-                    fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName).chain(
+                    // Do not check thread tx here since there can be active system cache txs.
+                    fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName, false).chain(
                         new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
                             @Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
                                 throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 1ea5014..8c23d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -104,6 +104,7 @@ public interface DiscoverySpi extends IgniteSpi {
      * Sets a handler for initial data exchange between Ignite nodes.
      *
      * @param exchange Discovery data exchange handler.
+     * @return {@code this} for chaining.
      */
     public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange);
 
@@ -113,6 +114,7 @@ public interface DiscoverySpi extends IgniteSpi {
      * dynamic metrics between nodes.
      *
      * @param metricsProvider Provider of metrics data.
+     * @return {@code this} for chaining.
      */
     public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 066a5fd..21204c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -204,7 +204,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * Stops streamer.
      */
     public void stop() {
-        srv.stop();
+        if (srv != null)
+            srv.stop();
 
         if (log.isDebugEnabled())
             log.debug("Socket streaming server stopped");

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
index 95ca9b5..9908b87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
@@ -324,10 +324,14 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
             @Override public void applyx(IgniteCache<String, Integer> cache) {
                 int rnd = random();
 
+                Set<Integer> ids = new HashSet<>(set);
+
                 cache.removeAll(rangeKeys(0, rnd));
 
-                for (int i = 0; i < rnd; i++)
-                    assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+                for (int i = 0; i < rnd; i++) {
+                    if (ids.contains(i))
+                        assertNull(cache.localPeek("key" + i));
+                }
             }
         });
     }
@@ -350,7 +354,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
 
                 for (int i = 0; i < rnd; i++) {
                     if (ids.contains(i))
-                        assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+                        assertNull(cache.localPeek("key" + i));
                 }
             }
         });
@@ -359,6 +363,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
     /**
      * @param cache Cache.
      * @param key Key.
+     * @return Removed value.
      */
     private <K, V> V removeAsync(IgniteCache<K, V> cache, K key) {
         IgniteCache<K, V> cacheAsync = cache.withAsync();
@@ -371,6 +376,8 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
     /**
      * @param cache Cache.
      * @param key Key.
+     * @param val Value.
+     * @return Remove result.
      */
     private <K, V> boolean removeAsync(IgniteCache<K, V> cache, K key, V val) {
         IgniteCache<K, V> cacheAsync = cache.withAsync();

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 0d9c541..1e0071e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4337,7 +4337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
 
                 log.info("Set iterators not cleared, will wait");
 
-                Thread.sleep(500);
+                Thread.sleep(1000);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
index a34857f..e70c97b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
@@ -299,7 +299,7 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest {
 
                         return null;
                     }
-                }));
+                }, "cache-thread"));
             }
 
             readyLatch.await();

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 8a5dfd4..c9cd750 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -158,7 +158,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
                 ccfg.setName(DYNAMIC_CACHE_NAME);
 
-                futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+                futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+                    ccfg.getName(),
+                    null,
+                    true,
+                    true,
+                    true));
 
                 return null;
             }
@@ -190,7 +195,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
         GridTestUtils.runMultiThreaded(new Callable<Object>() {
             @Override public Object call() throws Exception {
-                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
 
                 return null;
             }
@@ -218,7 +223,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
 
                 IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
-                futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+                futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+                    ccfg.getName(),
+                    null,
+                    true,
+                    true,
+                    true));
 
                 return null;
             }
@@ -252,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             @Override public Object call() throws Exception {
                 IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
 
-                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+                futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
 
                 return null;
             }
@@ -315,7 +325,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
         for (int g = 0; g < nodeCount(); g++)
             caches[g] = grid(g).cache(DYNAMIC_CACHE_NAME);
 
-        kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+        kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
 
         for (int g = 0; g < nodeCount(); g++) {
             final IgniteKernal kernal0 = (IgniteKernal) grid(g);
@@ -368,7 +378,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             }
 
             // Undeploy cache.
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
 
             startGrid(nodeCount() + 1);
 
@@ -445,7 +455,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
                     }, IllegalArgumentException.class, null);
             }
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
 
             stopGrid(nodeCount() + 1);
             stopGrid(nodeCount());
@@ -512,7 +522,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             for (int g = 0; g < nodeCount() + 1; g++)
                 assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
         }
         finally {
             stopGrid(nodeCount());
@@ -554,7 +564,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             for (int g = 0; g < nodeCount() + 1; g++)
                 assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
         }
         finally {
             stopGrid(nodeCount());
@@ -600,7 +610,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
             for (int g = 0; g < nodeCount() + 1; g++)
                 assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
 
-            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+            kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
         }
         finally {
             stopGrid(nodeCount());

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index bf6dcda..34e7080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.configuration.CollectionConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -59,6 +60,8 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index 84838db..a08d080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -188,6 +188,9 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param conc Transaction concurrency.
+     * @param backup Check backup flag.
+     * @param commit Check commit flag.
      * @throws Exception If failed.
      */
     private void checkPrimaryNodeFailureBackupCommit(
@@ -197,6 +200,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
     ) throws Exception {
         try {
             startGrids(gridCount());
+
             awaitPartitionMapExchange();
 
             for (int i = 0; i < gridCount(); i++)
@@ -290,7 +294,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
 
                     return null;
                 }
-            });
+            }, "tx-thread");
 
             commitLatch.await();
 
@@ -366,6 +370,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
 
     /**
      * @param ignite Ignite instance to generate key.
+     * @param backup Backup key flag.
      * @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for
      *      {@code ignite(1)}.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
new file mode 100644
index 0000000..c47401c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheCommitDelayTxRecoveryTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int SRVS = 4;
+
+    /** */
+    private static volatile boolean commit;
+
+    /** */
+    private static volatile CountDownLatch commitStartedLatch;
+
+    /** */
+    private static volatile CountDownLatch commitFinishLatch;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecovery1() throws Exception {
+        checkRecovery(1, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecovery2() throws Exception {
+        checkRecovery(2, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecoveryStoreEnabled1() throws Exception {
+        checkRecovery(1, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRecoveryStoreEnabled2() throws Exception {
+        checkRecovery(2, true);
+    }
+
+    /**
+     * @param backups Number of cache backups.
+     * @param useStore If {@code true} tests cache with store configured.
+     * @throws Exception If failed.
+     */
+    private void checkRecovery(int backups, boolean useStore) throws Exception {
+        startGridsMultiThreaded(SRVS, false);
+
+        client = true;
+
+        Ignite clientNode = startGrid(SRVS);
+
+        assertTrue(clientNode.configuration().isClientMode());
+
+        client = false;
+
+        clientNode.createCache(cacheConfiguration(backups, useStore));
+
+        awaitPartitionMapExchange();
+
+        Ignite srv = ignite(0);
+
+        assertFalse(srv.configuration().isClientMode());
+
+        for (Boolean pessimistic : Arrays.asList(false, true)) {
+            checkRecovery(backupKey(srv.cache(null)), srv, pessimistic, useStore);
+
+            checkRecovery(nearKey(srv.cache(null)), srv, pessimistic, useStore);
+
+            checkRecovery(nearKey(clientNode.cache(null)), clientNode, pessimistic, useStore);
+
+            srv = ignite(0);
+
+            assertFalse(srv.configuration().isClientMode());
+        }
+    }
+
+    /**
+     * @param key Key.
+     * @param ignite Node executing update.
+     * @param pessimistic If {@code true} uses pessimistic transaction.
+     * @param useStore {@code True} if store is used.
+     * @throws Exception If failed.
+     */
+    private void checkRecovery(final Integer key,
+        final Ignite ignite,
+        final boolean pessimistic,
+        final boolean useStore) throws Exception {
+        Ignite primary = primaryNode(key, null);
+
+        assertNotSame(ignite, primary);
+
+        List<Ignite> backups = backupNodes(key, null);
+
+        assertFalse(backups.isEmpty());
+
+        final Set<String> backupNames = new HashSet<>();
+
+        for (Ignite node : backups)
+            backupNames.add(node.name());
+
+        log.info("Check recovery [key=" + key +
+            ", pessimistic=" + pessimistic +
+            ", primary=" + primary.name() +
+            ", backups=" + backupNames +
+            ", node=" + ignite.name() + ']');
+
+        final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+        cache.put(key, 0);
+
+        commitStartedLatch = new CountDownLatch(backupNames.size());
+        commitFinishLatch = new CountDownLatch(1);
+
+        commit = false;
+
+        TestEntryProcessor.skipFirst = useStore ? ignite.name() : null;
+
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Start update.");
+
+                if (pessimistic) {
+                    try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                        cache.invoke(key, new TestEntryProcessor(backupNames));
+
+                        commit = true;
+
+                        log.info("Start commit.");
+
+                        assertEquals(backupNames.size(), commitStartedLatch.getCount());
+
+                        tx.commit();
+                    }
+                }
+                else {
+                    commit = true;
+
+                    cache.invoke(key, new TestEntryProcessor(backupNames));
+                }
+
+                log.info("End update, execute get.");
+
+                Integer val = cache.get(key);
+
+                log.info("Get value: " + val);
+
+                assertEquals(1, (Object)val);
+
+                return null;
+            }
+        }, "update-thread");
+
+        assertTrue(commitStartedLatch.await(30, SECONDS));
+
+        log.info("Stop node: " + primary.name());
+
+        primary.close();
+
+        commitFinishLatch.countDown();
+
+        fut.get();
+
+        for (Ignite node : G.allGrids())
+            assertEquals(1, node.cache(null).get(key));
+
+        cache.put(key, 2);
+
+        for (Ignite node : G.allGrids())
+            assertEquals(2, node.cache(null).get(key));
+
+        startGrid(primary.name());
+
+        for (Ignite node : G.allGrids())
+            assertEquals(2, node.cache(null).get(key));
+
+        cache.put(key, 3);
+
+        for (Ignite node : G.allGrids())
+            assertEquals(3, node.cache(null).get(key));
+
+        awaitPartitionMapExchange();
+    }
+
+    /**
+     *
+     */
+    static class TestEntryProcessor implements CacheEntryProcessor<Integer, Integer, Void> {
+        /** */
+        private Set<String> nodeNames;
+
+        /** Skips first call for given node (used to skip call for store update). */
+        private static String skipFirst;
+
+        /**
+         * @param nodeNames Node names where sleep will be called.
+         */
+        public TestEntryProcessor(Set<String> nodeNames) {
+            this.nodeNames = nodeNames;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<Integer, Integer> entry, Object... args) {
+            Ignite ignite = entry.unwrap(Ignite.class);
+
+            System.out.println(Thread.currentThread().getName() + " process [node=" + ignite.name() +
+                ", commit=" + commit + ", skipFirst=" + skipFirst + ']');
+
+            boolean skip = false;
+
+            if (commit && ignite.name().equals(skipFirst)) {
+                skipFirst = null;
+
+                skip = true;
+            }
+
+            if (!skip && commit && nodeNames.contains(ignite.name())) {
+                try {
+                    System.out.println(Thread.currentThread().getName() + " start process invoke.");
+
+                    assertTrue(commitStartedLatch != null && commitStartedLatch.getCount() > 0);
+
+                    commitStartedLatch.countDown();
+
+                    assertTrue(commitFinishLatch.await(10, SECONDS));
+
+                    System.out.println(Thread.currentThread().getName() + " end process invoke.");
+                }
+                catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            else
+                System.out.println(Thread.currentThread().getName() + " invoke set value.");
+
+            entry.setValue(1);
+
+            return null;
+        }
+    }
+
+    /**
+     * @param backups Number of backups.
+     * @param useStore If {@code true} adds cache store.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Object, Object> cacheConfiguration(int backups, boolean useStore) {
+        CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setBackups(backups);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+
+        if (useStore) {
+            ccfg.setCacheStoreFactory(new TestStoreFactory());
+
+            ccfg.setWriteThrough(true);
+        }
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+        /** {@inheritDoc} */
+        @Override public CacheStore<Object, Object> create() {
+            return new CacheStoreAdapter<Object, Object>() {
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                @Override public void write(Cache.Entry entry) throws CacheWriterException {
+                    // No-op.
+                }
+
+                @Override public void delete(Object key) throws CacheWriterException {
+                    // No-op.
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 4eb8a6b..7532354 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -30,6 +30,8 @@ import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
+
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -238,7 +240,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
      * @param store If {@code true} uses cache with store.
      * @throws Exception If failed.
      */
-    private void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
+    protected final void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
         ignite(0).createCache(cacheConfiguration(memMode, store));
 
         final AtomicBoolean finished = new AtomicBoolean();
@@ -259,7 +261,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
             }
         });
 
-        IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+        final IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
 
         int iter = 0;
 
@@ -309,6 +311,31 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
                     break;
                 }
 
+                case TX_PUT: {
+                    while (System.currentTimeMillis() < stopTime) {
+                        final Integer val = ++iter;
+
+                        Ignite ignite = ignite(0);
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            final Integer key = i;
+
+                            doInTransaction(ignite, new Callable<Void>() {
+                                @Override public Void call() throws Exception {
+                                    cache.put(key, val);
+
+                                    return null;
+                                }
+                            });
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
+
                 case PUT_ALL: {
                     while (System.currentTimeMillis() < stopTime) {
                         Integer val = ++iter;
@@ -541,7 +568,10 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
         INVOKE,
 
         /** */
-        INVOKE_ALL
+        INVOKE_ALL,
+
+        /** */
+        TX_PUT
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 7655464..9204bc8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -101,6 +101,20 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     /**
      * @throws Exception If failed.
      */
+    public void testExplicitTransactionRetriesSingleValue() throws Exception {
+        checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testExplicitTransactionRetriesSingleValueStoreEnabled() throws Exception {
+        checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testExplicitTransactionRetries() throws Exception {
         explicitTransactionRetries(TestMemoryMode.HEAP, false);
     }
@@ -108,6 +122,13 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
     /**
      * @throws Exception If failed.
      */
+    public void testExplicitTransactionRetriesSingleOperation() throws Exception {
+        explicitTransactionRetries(TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testExplicitTransactionRetriesStoreEnabled() throws Exception {
         explicitTransactionRetries(TestMemoryMode.HEAP, true);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index d239ea8..91eecbb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -104,6 +105,8 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
 
         cfg.setDiscoverySpi(disc);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         if (include)
             cfg.setUserAttributes(F.asMap("include", true));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 6089795..552dd28 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -569,9 +569,9 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
         GridNioServerListener lsnr,
         @Nullable Integer queueLimit) throws Exception {
         for (int i = 0; i < 10; i++) {
-            try {
-                int srvPort = port++;
+            int srvPort = port++;
 
+            try {
                 GridNioServer.Builder<?> builder = serverBuilder(srvPort, parser, lsnr);
 
                 if (queueLimit != null)
@@ -584,8 +584,11 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
                 return srvr;
             }
             catch (IgniteCheckedException e) {
-                if (i < 9 && e.hasCause(BindException.class))
-                    log.error("Failed to start server, will try another port: " + e);
+                if (i < 9 && e.hasCause(BindException.class)) {
+                    log.error("Failed to start server, will try another port [err=" + e + ", port=" + srvPort + ']');
+
+                    U.sleep(5000);
+                }
                 else
                     throw e;
             }