You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/01/13 07:32:36 UTC
[1/3] ignite git commit: Fixes: - allow 'committing' ->
'marked_rollback' tx state change only for thread committing transaction -
fixed 'full_sync' mode for case when tx primary nodes fail - fixed race
between statically configured cache start and Gr
Repository: ignite
Updated Branches:
refs/heads/master 1d8c4e259 -> 457a9ae4d
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index d07a1e6..34872c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -85,7 +85,6 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
}
/** */
- @SuppressWarnings({"deprecation"})
private class TestListener implements CommunicationListener<Message> {
/** */
private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 7521f2e..b7c0deb 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -49,11 +49,13 @@ import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
+import org.apache.ignite.testframework.junits.spi.GridSpiTest;
import org.eclipse.jetty.util.ConcurrentHashSet;
/**
*
*/
+@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI")
public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi>
extends GridSpiAbstractTest<T> {
/** */
@@ -87,7 +89,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
}
/** */
- @SuppressWarnings({"deprecation"})
private class TestListener implements CommunicationListener<Message> {
/** */
private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>();
@@ -151,6 +152,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
int expMsgs = 0;
+ long totAcked = 0;
+
for (int i = 0; i < 5; i++) {
info("Iteration: " + i);
@@ -172,6 +175,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
expMsgs += msgPerIter;
+ final long totAcked0 = totAcked;
+
for (TcpCommunicationSpi spi : spis) {
GridNioServer srv = U.field(spi, "nioSrvr");
@@ -189,6 +194,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
+ long acked = GridTestUtils.getFieldValue(recoveryDesc, "acked");
+
+ return acked > totAcked0;
+ }
+ }, 5000);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
return recoveryDesc.messagesFutures().isEmpty();
}
}, 10_000);
@@ -218,6 +231,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
}
assertEquals(msgPerIter * 2, ackMsgs.get());
+
+ totAcked += msgPerIter;
}
}
finally {
@@ -337,6 +352,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
return expMsgs == ackMsgs.get();
}
}, 5000);
+
+ assertEquals(expMsgs, ackMsgs.get());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0df7da6..4f329e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -118,6 +118,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
TcpDiscoverySpi spi = nodeSpi.get();
if (spi == null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index e0ffc60..949290e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -978,7 +978,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
/**
* @param key Key.
* @param cacheName Cache name.
- * @return Ignite instance which has primary cache for given key.
+ * @return Ignite instance which has backup cache for given key.
*/
protected Ignite backupNode(Object key, String cacheName) {
List<Ignite> allGrids = Ignition.allGrids();
@@ -1001,8 +1001,38 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
/**
+ * @param key Key.
+ * @param cacheName Cache name.
+ * @return Ignite instances which has backup cache for given key.
+ */
+ protected List<Ignite> backupNodes(Object key, String cacheName) {
+ List<Ignite> allGrids = Ignition.allGrids();
+
+ assertFalse("There are no alive nodes.", F.isEmpty(allGrids));
+
+ Ignite ignite = allGrids.get(0);
+
+ Affinity<Object> aff = ignite.affinity(cacheName);
+
+ Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+ assertTrue("Expected more than one node for key [key=" + key + ", nodes=" + nodes +']', nodes.size() > 1);
+
+ Iterator<ClusterNode> it = nodes.iterator();
+
+ it.next(); // Skip primary.
+
+ List<Ignite> backups = new ArrayList<>(nodes.size() - 1);
+
+ while (it.hasNext())
+ backups.add(grid(it.next()));
+
+ return backups;
+ }
+
+ /**
* In ATOMIC cache with CLOCK mode if key is updated from different nodes at same time
- * only one update wins others are ignored (can happen in test event when updates are executed from
+ * only one update wins others are ignored (can happen in test even when updates are executed from
* different nodes sequentially), this delay is used to avoid lost updates.
*
* @param cache Cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
index d81efd9..7363c7c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTxRecoverySelfTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheColocatedTxPessimisticOriginatingNodeFailureSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedNearDisabledTxOriginatingNodeFailureSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCachePartitionedTxOriginatingNodeFailureSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheCommitDelayTxRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedPrimaryNodeFailureRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest;
@@ -39,6 +40,8 @@ public class IgniteCacheTxRecoverySelfTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Cache tx recovery test suite");
+ suite.addTestSuite(IgniteCacheCommitDelayTxRecoveryTest.class);
+
suite.addTestSuite(IgniteCachePartitionedPrimaryNodeFailureRecoveryTest.class);
suite.addTestSuite(IgniteCachePartitionedNearDisabledPrimaryNodeFailureRecoveryTest.class);
suite.addTestSuite(IgniteCachePartitionedTwoBackupsPrimaryNodeFailureRecoveryTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
----------------------------------------------------------------------
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
index c19a8fc..e53f335 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/zk/ZookeeperIpFinderTest.java
@@ -31,8 +31,10 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -41,7 +43,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
* @author Raul Kripalani
*/
public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
-
/** ZK Cluster size. */
private static final int ZK_CLUSTER_SIZE = 3;
@@ -79,7 +80,6 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
// start the Curator client so we can perform assertions on the ZK state later
zkCurator = CuratorFrameworkFactory.newClient(zkCluster.getConnectString(), new RetryNTimes(10, 1000));
zkCurator.start();
-
}
/**
@@ -98,22 +98,21 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
stopAllGrids();
-
}
/**
* Enhances the default configuration with the {#TcpDiscoveryZookeeperIpFinder}.
*
* @param gridName Grid name.
- * @return
- * @throws Exception
+ * @return Ignite configuration.
+ * @throws Exception If failed.
*/
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration configuration = super.getConfiguration(gridName);
TcpDiscoverySpi tcpDisco = (TcpDiscoverySpi) configuration.getDiscoverySpi();
TcpDiscoveryZookeeperIpFinder zkIpFinder = new TcpDiscoveryZookeeperIpFinder();
- zkIpFinder.setAllowDuplicateRegistrations(isAllowDuplicateRegistrations());
+ zkIpFinder.setAllowDuplicateRegistrations(allowDuplicateRegistrations);
// first node => configure with zkUrl; second node => configure with CuratorFramework; third and subsequent
// shall be configured through system property
@@ -126,11 +125,12 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
tcpDisco.setIpFinder(zkIpFinder);
+
return configuration;
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testOneIgniteNodeIsAlone() throws Exception {
startGrid(0);
@@ -141,7 +141,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testTwoIgniteNodesFindEachOther() throws Exception {
// start one node
@@ -164,7 +164,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testThreeNodesWithThreeDifferentConfigMethods() throws Exception {
// start one node
@@ -195,7 +195,7 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testFourNodesStartingAndStopping() throws Exception {
// start one node
@@ -242,10 +242,10 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testFourNodesWithDuplicateRegistrations() throws Exception {
- setAllowDuplicateRegistrations(true);
+ allowDuplicateRegistrations = true;
// start 4 nodes
System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -265,10 +265,10 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testFourNodesWithNoDuplicateRegistrations() throws Exception {
- setAllowDuplicateRegistrations(false);
+ allowDuplicateRegistrations = false;
// start 4 nodes
System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -288,10 +288,10 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testFourNodesRestartLastSeveralTimes() throws Exception {
- setAllowDuplicateRegistrations(false);
+ allowDuplicateRegistrations = false;
// start 4 nodes
System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -321,14 +321,13 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
stopAllGrids();
assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
-
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testFourNodesKillRestartZookeeper() throws Exception {
- setAllowDuplicateRegistrations(false);
+ allowDuplicateRegistrations = false;
// start 4 nodes
System.setProperty(TcpDiscoveryZookeeperIpFinder.PROP_ZK_CONNECTION_STRING, zkCluster.getConnectString());
@@ -357,14 +356,28 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
// stop all grids
stopAllGrids();
- Thread.sleep(2000);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ return zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size() == 0;
+ }
+ catch (Exception e) {
+ fail("Unexpected error: ");
+
+ return true;
+ }
+ }
+ }, 5000);
// check that all nodes are gone in ZK
assertEquals(0, zkCurator.getChildren().forPath(SERVICES_IGNITE_ZK_PATH).size());
}
/**
- * @throws Exception
+ * @param ignite Node.
+ * @param joinEventCount Expected events number.
+ * @return Events latch.
*/
private CountDownLatch expectJoinEvents(Ignite ignite, int joinEventCount) {
final CountDownLatch latch = new CountDownLatch(joinEventCount);
@@ -378,18 +391,4 @@ public class ZookeeperIpFinderTest extends GridCommonAbstractTest {
return latch;
}
-
- /**
- * @throws Exception
- */
- public void setAllowDuplicateRegistrations(boolean allowDuplicateRegistrations) {
- this.allowDuplicateRegistrations = allowDuplicateRegistrations;
- }
-
- /**
- * @throws Exception
- */
- public boolean isAllowDuplicateRegistrations() {
- return allowDuplicateRegistrations;
- }
}
\ No newline at end of file
[3/3] ignite git commit: Fixes: - allow 'committing' ->
'marked_rollback' tx state change only for thread committing transaction -
fixed 'full_sync' mode for case when tx primary nodes fail - fixed race
between statically configured cache start and Gr
Posted by sb...@apache.org.
Fixes:
- allow 'committing' -> 'marked_rollback' tx state change only for thread committing transaction
- fixed 'full_sync' mode for case when tx primary nodes fail
- fixed race between statically configured cache start and GridDhtAffinityAssignmentRequest
- fixed 'prepareMarshal' methods to marshal only once (ignite-2219)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/457a9ae4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/457a9ae4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/457a9ae4
Branch: refs/heads/master
Commit: 457a9ae4d3b0d6eef6e92a15f5ef79c15ccf1f95
Parents: 1d8c4e2
Author: sboikov <sb...@gridgain.com>
Authored: Wed Jan 13 09:21:09 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Jan 13 09:29:17 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteTransactions.java | 4 +-
.../apache/ignite/internal/IgniteKernal.java | 95 +++-
.../cache/CacheEntrySerializablePredicate.java | 3 +-
.../cache/CacheInvokeDirectResult.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 23 +
.../processors/cache/GridCacheProcessor.java | 52 ++-
.../processors/cache/GridCacheReturn.java | 2 +
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../GridDistributedLockResponse.java | 2 +-
.../GridDistributedTxFinishRequest.java | 11 +-
.../GridDistributedTxPrepareRequest.java | 2 +-
.../GridDistributedTxPrepareResponse.java | 4 +-
.../dht/GridDhtAffinityAssignmentResponse.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 44 +-
.../distributed/dht/GridDhtLockRequest.java | 2 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 90 ++--
.../dht/GridDhtTxFinishResponse.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 1 +
.../dht/GridDhtTxPrepareRequest.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 21 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 6 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 22 +-
.../atomic/GridNearAtomicUpdateResponse.java | 4 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionDemandMessage.java | 6 +-
.../GridDhtPartitionSupplyMessageV2.java | 6 +-
.../preloader/GridDhtPartitionsFullMessage.java | 2 +-
.../GridDhtPartitionsSingleMessage.java | 4 +-
.../distributed/near/GridNearGetResponse.java | 4 +-
...ridNearOptimisticTxPrepareFutureAdapter.java | 6 +-
.../near/GridNearSingleGetResponse.java | 2 +-
.../near/GridNearTxFinishFuture.java | 432 ++++++++++++++-----
.../near/GridNearTxFinishRequest.java | 5 +
.../near/GridNearTxFinishResponse.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../near/GridNearTxPrepareResponse.java | 4 +-
.../cache/query/GridCacheQueryRequest.java | 16 +-
.../cache/query/GridCacheQueryResponse.java | 18 +-
.../cache/transactions/IgniteInternalTx.java | 6 +
.../cache/transactions/IgniteTxAdapter.java | 23 +-
.../cache/transactions/IgniteTxEntry.java | 11 +-
.../cache/transactions/IgniteTxHandler.java | 26 +-
.../transactions/IgniteTxLocalAdapter.java | 6 +-
.../cache/transactions/IgniteTxManager.java | 20 +
.../datastreamer/DataStreamerRequest.java | 1 +
.../datastructures/DataStructuresProcessor.java | 11 +-
.../processors/igfs/IgfsAckMessage.java | 4 +-
.../handlers/cache/GridCacheCommandHandler.java | 6 +-
.../ignite/spi/discovery/DiscoverySpi.java | 2 +
.../ignite/stream/socket/SocketStreamer.java | 3 +-
...cheAbstractFullApiMultithreadedSelfTest.java | 13 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
.../processors/cache/GridCacheStopSelfTest.java | 2 +-
.../cache/IgniteDynamicCacheStartSelfTest.java | 30 +-
.../IgniteClientDataStructuresAbstractTest.java | 3 +
.../dht/GridCacheTxNodeFailureSelfTest.java | 7 +-
.../IgniteCacheCommitDelayTxRecoveryTest.java | 376 ++++++++++++++++
.../IgniteCachePutRetryAbstractSelfTest.java | 36 +-
...gniteCachePutRetryTransactionalSelfTest.java | 21 +
.../continuous/GridEventConsumeSelfTest.java | 3 +
.../internal/util/nio/GridNioSelfTest.java | 11 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 1 -
...CommunicationRecoveryAckClosureSelfTest.java | 19 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +
.../junits/common/GridCommonAbstractTest.java | 34 +-
.../IgniteCacheTxRecoverySelfTestSuite.java | 3 +
.../tcp/ipfinder/zk/ZookeeperIpFinderTest.java | 69 ++-
67 files changed, 1314 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
index 875b647..dfe6a1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteTransactions.java
@@ -18,7 +18,7 @@
package org.apache.ignite;
import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -54,7 +54,7 @@ import org.apache.ignite.transactions.TransactionMetrics;
public interface IgniteTransactions {
/**
* Starts transaction with default isolation, concurrency, timeout, and invalidation policy.
- * All defaults are set in {@link CacheConfiguration} at startup.
+ * All defaults are set in {@link TransactionConfiguration} at startup.
*
* @return New transaction
* @throws IllegalStateException If transaction is already started by this thread.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 14b5816..3def718 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1424,8 +1424,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
/** @throws IgniteCheckedException If registration failed. */
- private void registerExecutorMBeans(ExecutorService execSvc, ExecutorService sysExecSvc, ExecutorService p2pExecSvc,
- ExecutorService mgmtExecSvc, ExecutorService restExecSvc) throws IgniteCheckedException {
+ private void registerExecutorMBeans(ExecutorService execSvc,
+ ExecutorService sysExecSvc,
+ ExecutorService p2pExecSvc,
+ ExecutorService mgmtExecSvc,
+ ExecutorService restExecSvc) throws IgniteCheckedException {
pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor");
sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor");
mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor");
@@ -2414,7 +2417,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- return ctx.cache().publicJCache(name, false);
+ return ctx.cache().publicJCache(name, false, true);
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
@@ -2431,7 +2434,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, true, true).get();
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ null,
+ true,
+ true,
+ true).get();
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2467,8 +2475,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- if (ctx.cache().cache(cacheCfg.getName()) == null)
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), null, false, true).get();
+ if (ctx.cache().cache(cacheCfg.getName()) == null) {
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ null,
+ false,
+ true,
+ true).get();
+ }
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2491,7 +2505,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, true, true).get();
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ nearCfg,
+ true,
+ true,
+ true).get();
return ctx.cache().publicJCache(cacheCfg.getName());
}
@@ -2514,11 +2533,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
IgniteInternalCache<Object, Object> cache = ctx.cache().cache(cacheCfg.getName());
- if (cache == null)
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get();
+ if (cache == null) {
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
else {
- if (cache.configuration().getNearConfiguration() == null)
- ctx.cache().dynamicStartCache(cacheCfg, cacheCfg.getName(), nearCfg, false, true).get();
+ if (cache.configuration().getNearConfiguration() == null) {
+ ctx.cache().dynamicStartCache(cacheCfg,
+ cacheCfg.getName(),
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
}
return ctx.cache().publicJCache(cacheCfg.getName());
@@ -2538,7 +2569,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
guard();
try {
- ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true, true).get();
+ ctx.cache().dynamicStartCache(null,
+ cacheName,
+ nearCfg,
+ true,
+ true,
+ true).get();
IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
@@ -2564,11 +2600,23 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
IgniteInternalCache<Object, Object> internalCache = ctx.cache().cache(cacheName);
- if (internalCache == null)
- ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get();
+ if (internalCache == null) {
+ ctx.cache().dynamicStartCache(null,
+ cacheName,
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
else {
- if (internalCache.configuration().getNearConfiguration() == null)
- ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false, true).get();
+ if (internalCache.configuration().getNearConfiguration() == null) {
+ ctx.cache().dynamicStartCache(null,
+ cacheName,
+ nearCfg,
+ false,
+ true,
+ true).get();
+ }
}
IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
@@ -2587,6 +2635,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
* @param cache Cache.
+ * @throws IgniteCheckedException If cache without near cache was already started.
*/
private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) throws IgniteCheckedException {
if (!cache.context().isNear())
@@ -2596,7 +2645,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** {@inheritDoc} */
@Override public void destroyCache(String cacheName) {
- IgniteInternalFuture stopFut = destroyCacheAsync(cacheName);
+ IgniteInternalFuture stopFut = destroyCacheAsync(cacheName, true);
try {
stopFut.get();
@@ -2608,13 +2657,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
* @param cacheName Cache name.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Ignite future.
*/
- public IgniteInternalFuture<?> destroyCacheAsync(String cacheName) {
+ public IgniteInternalFuture<?> destroyCacheAsync(String cacheName, boolean checkThreadTx) {
guard();
try {
- return ctx.cache().dynamicDestroyCache(cacheName);
+ return ctx.cache().dynamicDestroyCache(cacheName, checkThreadTx);
}
finally {
unguard();
@@ -2627,7 +2677,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
if (ctx.cache().cache(cacheName) == null)
- ctx.cache().getOrCreateFromTemplate(cacheName).get();
+ ctx.cache().getOrCreateFromTemplate(cacheName, true).get();
return ctx.cache().publicJCache(cacheName);
}
@@ -2641,14 +2691,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/**
* @param cacheName Cache name.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
- public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName) {
+ public IgniteInternalFuture<?> getOrCreateCacheAsync(String cacheName, boolean checkThreadTx) {
guard();
try {
if (ctx.cache().cache(cacheName) == null)
- return ctx.cache().getOrCreateFromTemplate(cacheName);
+ return ctx.cache().getOrCreateFromTemplate(cacheName, checkThreadTx);
return new GridFinishedFuture<>();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
index a243c4e..20cc005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntrySerializablePredicate.java
@@ -86,7 +86,8 @@ public class CacheEntrySerializablePredicate implements CacheEntryPredicate {
p.prepareMarshal(ctx);
- bytes = ctx.marshaller().marshal(p);
+ if (bytes == null)
+ bytes = ctx.marshaller().marshal(p);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
index bee1427..fefa422 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
@@ -104,7 +104,7 @@ public class CacheInvokeDirectResult implements Message {
public void prepareMarshal(GridCacheContext ctx) throws IgniteCheckedException {
key.prepareMarshal(ctx.cacheObjectContext());
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
if (res != null)
@@ -119,7 +119,7 @@ public class CacheInvokeDirectResult implements Message {
public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException {
key.finishUnmarshal(ctx.cacheObjectContext(), ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
if (res != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0aa8b1b..b297827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
@@ -122,6 +123,28 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
IgniteInternalFuture<?> fut = null;
if (cacheMsg.partitionExchangeMessage()) {
+ if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
+ assert cacheMsg.topologyVersion() != null : cacheMsg;
+
+ AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
+
+ assert cacheMsg.topologyVersion().compareTo(startTopVer) > 0 :
+ "Invalid affinity request [startTopVer=" + startTopVer + ", msg=" + cacheMsg + ']';
+
+ // Need to wait for initial exchange to avoid race between cache start and affinity request.
+ fut = cctx.exchange().affinityReadyFuture(startTopVer);
+
+ if (fut != null && !fut.isDone()) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ lsnr.onMessage(nodeId, cacheMsg);
+ }
+ });
+
+ return;
+ }
+ }
+
long locTopVer = cctx.discovery().topologyVersion();
long rmtTopVer = cacheMsg.topologyVersion().topologyVersion();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index ff02e70..eb6d98e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2030,7 +2030,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
CacheConfiguration cfg = createConfigFromTemplate(cacheName);
- return dynamicStartCache(cfg, cacheName, null, true, true);
+ return dynamicStartCache(cfg, cacheName, null, true, true, true);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -2041,16 +2041,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* Dynamically starts cache using template configuration.
*
* @param cacheName Cache name.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
- public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName) {
+ public IgniteInternalFuture<?> getOrCreateFromTemplate(String cacheName, boolean checkThreadTx) {
try {
- if (publicJCache(cacheName, false) != null) // Cache with given name already started.
+ if (publicJCache(cacheName, false, checkThreadTx) != null) // Cache with given name already started.
return new GridFinishedFuture<>();
CacheConfiguration cfg = createConfigFromTemplate(cacheName);
- return dynamicStartCache(cfg, cacheName, null, false, true);
+ return dynamicStartCache(cfg, cacheName, null, false, true, checkThreadTx);
}
catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
@@ -2060,6 +2061,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cacheName Cache name.
* @return Cache configuration.
+ * @throws IgniteCheckedException If failed.
*/
private CacheConfiguration createConfigFromTemplate(String cacheName) throws IgniteCheckedException {
CacheConfiguration cfgTemplate = null;
@@ -2138,6 +2140,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param cacheName Cache name.
* @param nearCfg Near cache configuration.
* @param failIfExists Fail if exists flag.
+ * @param failIfNotStarted If {@code true} fails if cache is not started.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
@SuppressWarnings("IfMayBeConditional")
@@ -2146,9 +2150,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
String cacheName,
@Nullable NearCacheConfiguration nearCfg,
boolean failIfExists,
- boolean failIfNotStarted
+ boolean failIfNotStarted,
+ boolean checkThreadTx
) {
- return dynamicStartCache(ccfg, cacheName, nearCfg, CacheType.USER, failIfExists, failIfNotStarted);
+ return dynamicStartCache(ccfg,
+ cacheName,
+ nearCfg,
+ CacheType.USER,
+ failIfExists,
+ failIfNotStarted,
+ checkThreadTx);
}
/**
@@ -2157,7 +2168,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @param ccfg Cache configuration.
* @param cacheName Cache name.
* @param nearCfg Near cache configuration.
+ * @param cacheType Cache type.
* @param failIfExists Fail if exists flag.
+ * @param failIfNotStarted If {@code true} fails if cache is not started.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is deployed.
*/
@SuppressWarnings("IfMayBeConditional")
@@ -2167,9 +2181,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Nullable NearCacheConfiguration nearCfg,
CacheType cacheType,
boolean failIfExists,
- boolean failIfNotStarted
+ boolean failIfNotStarted,
+ boolean checkThreadTx
) {
- checkEmptyTransactions();
+ if (checkThreadTx)
+ checkEmptyTransactions();
DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName));
@@ -2260,10 +2276,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param cacheName Cache name to destroy.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Future that will be completed when cache is destroyed.
*/
- public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName) {
- checkEmptyTransactions();
+ public IgniteInternalFuture<?> dynamicDestroyCache(String cacheName, boolean checkThreadTx) {
+ if (checkThreadTx)
+ checkEmptyTransactions();
DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(cacheName, ctx.localNodeId());
@@ -2898,7 +2916,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
IgniteCacheProxy<?, ?> cache = jCacheProxies.get(masked);
if (cache == null) {
- dynamicStartCache(null, name, null, false, true).get();
+ dynamicStartCache(null, name, null, false, true, true).get();
cache = jCacheProxies.get(masked);
}
@@ -3001,21 +3019,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If failed.
*/
public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName) throws IgniteCheckedException {
- return publicJCache(cacheName, true);
+ return publicJCache(cacheName, true, true);
}
/**
* @param cacheName Cache name.
* @param failIfNotStarted If {@code true} throws {@link IllegalArgumentException} if cache is not started,
* otherwise returns {@code null} in this case.
- * @param <K> type of keys.
- * @param <V> type of values.
+ * @param checkThreadTx If {@code true} checks that current thread does not have active transactions.
* @return Cache instance for given name.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"unchecked", "ConstantConditions"})
- @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName, boolean failIfNotStarted)
- throws IgniteCheckedException
+ @Nullable public <K, V> IgniteCacheProxy<K, V> publicJCache(@Nullable String cacheName,
+ boolean failIfNotStarted,
+ boolean checkThreadTx) throws IgniteCheckedException
{
if (log.isDebugEnabled())
log.debug("Getting public cache for name: " + cacheName);
@@ -3030,7 +3048,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
throw new IllegalStateException("Failed to get cache because it is a system cache: " + cacheName);
if (cache == null) {
- dynamicStartCache(null, cacheName, null, false, failIfNotStarted).get();
+ dynamicStartCache(null, cacheName, null, false, failIfNotStarted, checkThreadTx).get();
cache = jCacheProxies.get(masked);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index 21154c9..a9edb95 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -169,6 +169,7 @@ public class GridCacheReturn implements Externalizable, Message {
* @param cctx Cache context.
* @param cacheObj Value to set.
* @param success Success flag to set.
+ * @param keepBinary Keep binary flag.
* @return This instance for chaining.
*/
public GridCacheReturn set(
@@ -187,6 +188,7 @@ public class GridCacheReturn implements Externalizable, Message {
/**
* @param cctx Cache context.
* @param cacheObj Cache object.
+ * @param keepBinary Keep binary flag.
*/
private void initValue(GridCacheContext cctx, @Nullable CacheObject cacheObj, boolean keepBinary) {
if (loc)
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 27a7587..b64c69c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1626,7 +1626,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
IgniteInternalFuture<?> fut;
try {
- fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name());
+ fut = ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), true);
}
finally {
onLeave(gate);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index bb3f9ff..f088e1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -194,7 +194,7 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId));
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 34b3112..a761fec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.lang.IgniteUuid;
@@ -85,6 +83,8 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
* @param invalidate Invalidate flag.
* @param sys System transaction flag.
* @param plc IO policy.
+ * @param syncCommit Sync commit flag.
+ * @param syncRollback Sync rollback flag.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -184,6 +184,13 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage {
}
/**
+ * @param syncCommit Sync commit flag.
+ */
+ public void syncCommit(boolean syncCommit) {
+ this.syncCommit = syncCommit;
+ }
+
+ /**
* @return Sync rollback flag.
*/
public boolean syncRollback() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index e595942..0d26c84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -317,7 +317,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
if (reads != null)
marshalTx(reads, ctx);
- if (dhtVers != null) {
+ if (dhtVers != null && dhtVerKeys == null) {
for (IgniteTxKey key : dhtVers.keySet()) {
GridCacheContext cctx = ctx.cacheContext(key.cacheId());
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
index d2c5aa4..4d22213 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java
@@ -93,7 +93,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
@@ -101,7 +101,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index e731406..8e041c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -104,7 +104,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (affAssignment != null)
+ if (affAssignment != null && affAssignmentBytes == null)
affAssignmentBytes = ctx.marshaller().marshal(affAssignment);
}
@@ -113,7 +113,7 @@ public class GridDhtAffinityAssignmentResponse extends GridCacheMessage {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (affAssignmentBytes != null) {
+ if (affAssignmentBytes != null && affAssignment == null) {
affAssignment = ctx.marshaller().unmarshal(affAssignmentBytes, ldr);
// TODO IGNITE-2110: setting 'local' for nodes not needed when IGNITE-2110 is implemented.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 98711b8..1c3e052 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -494,14 +494,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
boolean found = false;
for (IgniteInternalFuture<?> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MiniFuture f = (MiniFuture)fut;
- if (f.node().id().equals(nodeId)) {
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId));
+ if (f.node().id().equals(nodeId)) {
+ f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will ignore): " + nodeId));
- found = true;
- }
+ found = true;
}
}
@@ -551,12 +549,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
synchronized (futs) {
// Avoid iterator creation.
for (int i = 0; i < futs.size(); i++) {
- IgniteInternalFuture<Boolean> fut = futs.get(i);
-
- if (!isMini(fut))
- continue;
-
- MiniFuture mini = (MiniFuture)fut;
+ MiniFuture mini = (MiniFuture)futs.get(i);
if (mini.futureId().equals(miniId)) {
if (!mini.isDone())
@@ -772,14 +765,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
}
/**
- * @param f Future.
- * @return {@code True} if mini-future.
- */
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
- }
-
- /**
*
*/
public void map() {
@@ -1006,7 +991,24 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtLockFuture.class, this, super.toString());
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ MiniFuture m = (MiniFuture)f;
+
+ return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ }
+ });
+
+ Collection<KeyCacheObject> locks;
+
+ synchronized (this) {
+ locks = new HashSet<>(pendingLocks);
+ }
+
+ return S.toString(GridDhtLockFuture.class, this,
+ "innerFuts", futs,
+ "pendingLocks", locks,
+ "super", super.toString());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 62cf69d..50167d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -311,7 +311,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
prepareMarshalCacheObjects(nearKeys, ctx.cacheContext(cacheId));
- if (owned != null) {
+ if (owned != null && ownedKeys == null) {
ownedKeys = new KeyCacheObject[owned.size()];
ownedValues = new GridCacheVersion[ownedKeys.length];
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 65f1cb4..2d98e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.Collections;
import java.util.UUID;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -44,6 +43,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01;
+
/** Near node ID. */
private UUID nearNodeId;
@@ -64,7 +66,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> pendingVers;
- /** Check comitted flag. */
+ /** Check committed flag. */
private boolean checkCommitted;
/** Partition update counter. */
@@ -81,6 +83,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** Task name hash. */
private int taskNameHash;
+ /** */
+ private byte flags;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -100,6 +105,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
* @param sysInvalidate System invalidation flag.
* @param syncCommit Synchronous commit flag.
* @param syncRollback Synchronous rollback flag.
@@ -180,6 +186,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
* @param sysInvalidate System invalidation flag.
* @param syncCommit Synchronous commit flag.
* @param syncRollback Synchronous rollback flag.
@@ -302,16 +309,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
}
/**
- * Gets versions of not acquired locks with version less then one of transaction being committed.
- *
- * @return Versions of locks for entries participating in transaction that have not been acquired yet
- * have version less then one of transaction being committed.
- */
- public Collection<GridCacheVersion> pendingVersions() {
- return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
- }
-
- /**
* @return Check committed flag.
*/
public boolean checkCommitted() {
@@ -325,6 +322,23 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
this.checkCommitted = checkCommitted;
}
+ /**
+ * @return {@code True}
+ */
+ public boolean waitRemoteTransactions() {
+ return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param waitRemoteTxs Wait remote transactions flag.
+ */
+ public void waitRemoteTransactions(boolean waitRemoteTxs) {
+ if (waitRemoteTxs)
+ flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK);
+ else
+ flags &= ~WAIT_REMOTE_TX_FLAG_MASK;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
@@ -352,60 +366,66 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
writer.incrementState();
case 19:
- if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
+ if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
case 20:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1))
return false;
writer.incrementState();
case 21:
- if (!writer.writeUuid("nearNodeId", nearNodeId))
+ if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
case 22:
- if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
case 23:
- if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("partUpdateCnt", partUpdateCnt))
return false;
writer.incrementState();
case 24:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 25:
- if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 26:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeBoolean("sysInvalidate", sysInvalidate))
return false;
writer.incrementState();
case 27:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 28:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 29:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -436,6 +456,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
case 19:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
byte isolationOrd;
isolationOrd = reader.readByte("isolation");
@@ -447,7 +475,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 20:
+ case 21:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -455,7 +483,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 21:
+ case 22:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -463,7 +491,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 22:
+ case 23:
partUpdateCnt = reader.readMessage("partUpdateCnt");
if (!reader.isLastRead())
@@ -471,7 +499,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 23:
+ case 24:
pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -479,7 +507,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 24:
+ case 25:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -487,7 +515,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 25:
+ case 26:
sysInvalidate = reader.readBoolean("sysInvalidate");
if (!reader.isLastRead())
@@ -495,7 +523,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 26:
+ case 27:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -503,7 +531,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 27:
+ case 28:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -511,7 +539,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
reader.incrementState();
- case 28:
+ case 29:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -531,6 +559,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 29;
+ return 30;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
index fb4d97d..626ad89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java
@@ -109,7 +109,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (checkCommittedErr != null)
+ if (checkCommittedErr != null && checkCommittedErrBytes == null)
checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr);
}
@@ -118,7 +118,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse {
throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (checkCommittedErrBytes != null)
+ if (checkCommittedErrBytes != null && checkCommittedErr == null)
checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index e026b4e..ebf1002 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -421,6 +421,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (!state(PREPARING)) {
if (state() == PREPARED && isSystemInvalidate())
fut.complete();
+
if (setRollbackOnly()) {
if (timedOut())
fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 394ff89..d31ecba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -281,7 +281,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (owned != null) {
+ if (owned != null && ownedKeys == null) {
ownedKeys = owned.keySet();
ownedVals = owned.values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7bee5a3..7cc276f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -656,11 +656,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
if (!addDepInfo && ctx.deploymentEnabled())
addDepInfo = true;
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
- nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+ if (nearEntryProcessorsBytes == null)
+ nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
}
}
@@ -681,13 +684,15 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
finishUnmarshalCacheObjects(prevVals, cctx, ldr);
if (forceTransformBackups) {
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- }
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- if (forceTransformBackups)
- nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+ if (nearEntryProcessors == null)
+ nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index f1bb323..95fdeb6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -165,7 +165,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
prepareMarshalCacheObjects(nearEvicted, cctx);
- errBytes = ctx.marshaller().marshal(err);
+ if (err != null && errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
}
/** {@inheritDoc} */
@@ -178,7 +179,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
- err = ctx.marshaller().unmarshal(errBytes, ldr);
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 7c0aba5..9c4b486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -184,6 +184,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
* @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
@@ -593,7 +594,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
filter = null;
}
- if (expiryPlc != null)
+ if (expiryPlc != null && expiryPlcBytes == null)
expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
if (op == TRANSFORM) {
@@ -601,9 +602,11 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
if (!addDepInfo && ctx.deploymentEnabled())
addDepInfo = true;
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
}
else
prepareMarshalCacheObjects(vals, cctx);
@@ -617,8 +620,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
finishUnmarshalCacheObjects(keys, cctx, ldr);
- if (op == TRANSFORM)
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+ if (op == TRANSFORM) {
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+ }
else
finishUnmarshalCacheObjects(vals, cctx, ldr);
@@ -629,9 +637,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
}
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
-
- if (expiryPlcBytes != null)
+ if (expiryPlcBytes != null && expiryPlc == null)
expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index b164e7e..3e3ac29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -394,7 +394,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
GridCacheContext cctx = ctx.cacheContext(cacheId);
@@ -413,7 +413,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
GridCacheContext cctx = ctx.cacheContext(cacheId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 4cdecec..9c5238a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -170,7 +170,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
info.marshal(cctx);
}
- errBytes = ctx.marshaller().marshal(err);
+ if (err != null && errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
}
/** {@inheritDoc} */
@@ -187,7 +188,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
info.unmarshal(cctx, ldr);
}
- err = ctx.marshaller().unmarshal(errBytes, ldr);
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
index 53c3d90..5cb84dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandMessage.java
@@ -66,6 +66,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/**
* @param updateSeq Update sequence for this node.
* @param topVer Topology version.
+ * @param cacheId Cache ID.
*/
GridDhtPartitionDemandMessage(long updateSeq, @NotNull AffinityTopologyVersion topVer, int cacheId) {
this.cacheId = cacheId;
@@ -75,6 +76,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
/**
* @param cp Message to copy from.
+ * @param parts Partitions.
*/
GridDhtPartitionDemandMessage(GridDhtPartitionDemandMessage cp, Collection<Integer> parts) {
cacheId = cp.cacheId;
@@ -181,7 +183,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (topic != null)
+ if (topic != null && topicBytes == null)
topicBytes = ctx.marshaller().marshal(topic);
}
@@ -189,7 +191,7 @@ public class GridDhtPartitionDemandMessage extends GridCacheMessage {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (topicBytes != null)
+ if (topicBytes != null && topic == null)
topic = ctx.marshaller().unmarshal(topicBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
index 41454f9..4451cbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessageV2.java
@@ -75,9 +75,13 @@ public class GridDhtPartitionSupplyMessageV2 extends GridCacheMessage implements
/**
* @param updateSeq Update sequence for this node.
* @param cacheId Cache ID.
+ * @param topVer Topology version.
* @param addDepInfo Deployment info flag.
*/
- GridDhtPartitionSupplyMessageV2(long updateSeq, int cacheId, AffinityTopologyVersion topVer, boolean addDepInfo) {
+ GridDhtPartitionSupplyMessageV2(long updateSeq,
+ int cacheId,
+ AffinityTopologyVersion topVer,
+ boolean addDepInfo) {
this.cacheId = cacheId;
this.updateSeq = updateSeq;
this.topVer = topVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 0cbdc91..6afb9b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -134,7 +134,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
if (parts != null && partsBytes == null)
partsBytes = ctx.marshaller().marshal(parts);
- if (partCntrs != null)
+ if (partCntrs != null && partCntrsBytes == null)
partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index c07a508..1185913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -138,7 +138,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes == null && parts != null)
partsBytes = ctx.marshaller().marshal(parts);
- if (partCntrs != null)
+ if (partCntrsBytes == null && partCntrs != null)
partCntrsBytes = ctx.marshaller().marshal(partCntrs);
}
@@ -149,7 +149,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
if (partsBytes != null && parts == null)
parts = ctx.marshaller().unmarshal(partsBytes, ldr);
- if (partCntrsBytes != null)
+ if (partCntrsBytes != null && partCntrs == null)
partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 15a791f..6ac91cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -188,7 +188,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
info.marshal(cctx);
}
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
@@ -203,7 +203,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
info.unmarshal(cctx, ldr);
}
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index fe6180a..7132567 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -54,9 +54,13 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
AffinityTopologyVersion topVer = null;
- if (tx.system())
+ if (tx.system()) {
topVer = tx.topologyVersionSnapshot();
+ if (topVer == null)
+ topVer = cctx.exchange().readyAffinityVersion();
+ }
+
if (topVer == null)
topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index 42ad7ed..314c35c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -167,7 +167,7 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC
((GridCacheEntryInfo)res).marshal(cctx);
}
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
[2/3] ignite git commit: Fixes: - allow 'committing' ->
'marked_rollback' tx state change only for thread committing transaction -
fixed 'full_sync' mode for case when tx primary nodes fail - fixed race
between statically configured cache start and Gr
Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 291c88a..1b40d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -19,6 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
@@ -44,6 +47,7 @@ import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -64,6 +68,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public static final IgniteProductVersion FINISH_NEAR_ONE_PHASE_SINCE = IgniteProductVersion.fromString("1.4.0");
/** */
+ public static final IgniteProductVersion WAIT_REMOTE_TXS_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+ /** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
@@ -122,22 +129,23 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override public boolean onNodeLeft(UUID nodeId) {
+ boolean found = false;
+
for (IgniteInternalFuture<?> fut : futures())
if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MinFuture f = (MinFuture)fut;
- if (f.node().id().equals(nodeId)) {
+ if (f.onNodeLeft(nodeId)) {
// Remove previous mapping.
mappings.remove(nodeId);
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid (will fail): " + nodeId));
-
- return true;
+ found = true;
}
}
- return false;
+ return found;
}
/** {@inheritDoc} */
@@ -156,19 +164,32 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
* @param nodeId Sender.
* @param res Result.
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
public void onResult(UUID nodeId, GridNearTxFinishResponse res) {
- if (!isDone())
- for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ if (!isDone()) {
+ FinishMiniFuture finishFut = null;
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
+ synchronized (futs) {
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i);
+
+ if (fut.getClass() == FinishMiniFuture.class) {
+ FinishMiniFuture f = (FinishMiniFuture)fut;
- f.onResult(res);
+ if (f.futureId().equals(res.miniId())) {
+ assert f.node().id().equals(nodeId);
+
+ finishFut = f;
+
+ break;
+ }
}
}
}
+
+ if (finishFut != null)
+ finishFut.onNearFinishResponse(res);
+ }
}
/**
@@ -178,15 +199,21 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
public void onResult(UUID nodeId, GridDhtTxFinishResponse res) {
if (!isDone())
for (IgniteInternalFuture<IgniteInternalTx> fut : futures()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ if (fut.getClass() == CheckBackupMiniFuture.class) {
+ CheckBackupMiniFuture f = (CheckBackupMiniFuture)fut;
if (f.futureId().equals(res.miniId())) {
assert f.node().id().equals(nodeId);
- f.onResult(res);
+ f.onDhtFinishResponse(res);
}
}
+ else if (fut.getClass() == CheckRemoteTxMiniFuture.class) {
+ CheckRemoteTxMiniFuture f = (CheckRemoteTxMiniFuture)fut;
+
+ if (f.futureId().equals(res.miniId()))
+ f.onDhtFinishResponse(nodeId);
+ }
}
}
@@ -204,9 +231,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
boolean marked = tx.setRollbackOnly();
- if (err instanceof NodeStoppingException)
- return super.onDone(null, err);
-
if (err instanceof IgniteTxRollbackCheckedException) {
if (marked) {
try {
@@ -289,11 +313,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- * @param f Future.
+ * @param fut Future.
* @return {@code True} if mini-future.
*/
- private boolean isMini(IgniteInternalFuture<?> f) {
- return f.getClass().equals(MiniFuture.class);
+ private boolean isMini(IgniteInternalFuture<?> fut) {
+ return fut.getClass() == FinishMiniFuture.class ||
+ fut.getClass() == CheckBackupMiniFuture.class ||
+ fut.getClass() == CheckRemoteTxMiniFuture.class;
}
/**
@@ -393,7 +419,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
ClusterNode backup = cctx.discovery().node(backupId);
- MiniFuture mini = new MiniFuture(backup, mapping);
+ final CheckBackupMiniFuture mini = new CheckBackupMiniFuture(backup, mapping);
add(mini);
@@ -414,8 +440,25 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
readyNearMappingFromBackup(mapping);
- if (committed)
+ if (committed) {
+ if (tx.syncCommit()) {
+ GridCacheVersion nearXidVer = tx.nearXidVersion();
+
+ assert nearXidVer != null : tx;
+
+ IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(nearXidVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ mini.onDone(tx);
+ }
+ });
+
+ return;
+ }
+
mini.onDone(tx);
+ }
else {
ClusterTopologyCheckedException cause =
new ClusterTopologyCheckedException("Primary node left grid: " + nodeId);
@@ -427,46 +470,26 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
}
else {
- GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
- cctx.localNodeId(),
- futureId(),
- mini.futureId(),
- tx.topologyVersion(),
- tx.xidVersion(),
- tx.commitVersion(),
- tx.threadId(),
- tx.isolation(),
- true,
- false,
- tx.system(),
- tx.ioPolicy(),
- false,
- true,
- true,
- null,
- null,
- null,
- null,
- 0,
- null,
- 0,
- tx.activeCachesDeploymentEnabled());
-
- finishReq.checkCommitted(true);
+ GridDhtTxFinishRequest finishReq = checkCommittedRequest(mini.futureId());
+
+ // Preserve old behavior, otherwise response is not sent.
+ if (WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) > 0)
+ finishReq.syncCommit(true);
try {
if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(backup.version()) <= 0)
cctx.io().send(backup, finishReq, tx.ioPolicy());
- else
+ else {
mini.onDone(new IgniteTxHeuristicCheckedException("Failed to check for tx commit on " +
"the backup node (node has an old Ignite version) [rmtNodeId=" + backup.id() +
", ver=" + backup.version() + ']'));
+ }
}
catch (ClusterTopologyCheckedException e) {
- mini.onResult(e);
+ mini.onNodeLeft(backupId);
}
catch (IgniteCheckedException e) {
- mini.onResult(e);
+ mini.onDone(e);
}
}
}
@@ -476,7 +499,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
- *
+ * @return {@code True} if need to send finish request for one phase commit transaction.
*/
private boolean needFinishOnePhase() {
if (tx.mappings().empty())
@@ -584,7 +607,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
add(fut);
}
else {
- MiniFuture fut = new MiniFuture(m);
+ FinishMiniFuture fut = new FinishMiniFuture(m);
req.miniId(fut.futureId());
@@ -604,11 +627,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
// Remove previous mapping.
mappings.remove(m.node().id());
- fut.onResult(e);
+ fut.onNodeLeft(n.id());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
- fut.onResult(e);
+ fut.onDone(e);
}
}
}
@@ -618,10 +641,24 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@SuppressWarnings("unchecked")
@Override public String apply(IgniteInternalFuture<?> f) {
- if (isMini(f)) {
- MiniFuture m = (MiniFuture)f;
+ if (f.getClass() == FinishMiniFuture.class) {
+ FinishMiniFuture fut = (FinishMiniFuture)f;
+
+ return "FinishFuture[node=" + fut.node().id() +
+ ", loc=" + fut.node().isLocal() +
+ ", done=" + fut.isDone() + "]";
+ }
+ else if (f.getClass() == CheckBackupMiniFuture.class) {
+ CheckBackupMiniFuture fut = (CheckBackupMiniFuture)f;
- return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ return "CheckBackupFuture[node=" + fut.node().id() +
+ ", loc=" + fut.node().isLocal() +
+ ", done=" + f.isDone() + "]";
+ }
+ else if (f.getClass() == CheckRemoteTxMiniFuture.class) {
+ CheckRemoteTxMiniFuture fut = (CheckRemoteTxMiniFuture)f;
+
+ return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
}
else
return "[loc=true, done=" + f.isDone() + "]";
@@ -634,108 +671,217 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
}
/**
+ * @param miniId Mini future ID.
+ * @return Finish request.
+ */
+ private GridDhtTxFinishRequest checkCommittedRequest(IgniteUuid miniId) {
+ GridDhtTxFinishRequest finishReq = new GridDhtTxFinishRequest(
+ cctx.localNodeId(),
+ futureId(),
+ miniId,
+ tx.topologyVersion(),
+ tx.xidVersion(),
+ tx.commitVersion(),
+ tx.threadId(),
+ tx.isolation(),
+ true,
+ false,
+ tx.system(),
+ tx.ioPolicy(),
+ false,
+ tx.syncCommit(),
+ tx.syncRollback(),
+ null,
+ null,
+ null,
+ null,
+ 0,
+ null,
+ 0,
+ tx.activeCachesDeploymentEnabled());
+
+ finishReq.checkCommitted(true);
+
+ return finishReq;
+ }
+
+ /**
+ *
+ */
+ private abstract class MinFuture extends GridFutureAdapter<IgniteInternalTx> {
+ /** */
+ private final IgniteUuid futId = IgniteUuid.randomUuid();
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if future processed node failure.
+ */
+ abstract boolean onNodeLeft(UUID nodeId);
+
+ /**
+ * @return Future ID.
+ */
+ final IgniteUuid futureId() {
+ return futId;
+ }
+ }
+
+ /**
* Mini-future for get operations. Mini-futures are only waiting on a single
* node as opposed to multiple nodes.
*/
- private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
+ private class FinishMiniFuture extends MinFuture {
/** */
private static final long serialVersionUID = 0L;
- /** */
- private final IgniteUuid futId = IgniteUuid.randomUuid();
-
/** Keys. */
@GridToStringInclude
private GridDistributedTxMapping m;
- /** Backup check flag. */
- private ClusterNode backup;
-
/**
* @param m Mapping.
*/
- MiniFuture(GridDistributedTxMapping m) {
+ FinishMiniFuture(GridDistributedTxMapping m) {
this.m = m;
}
/**
- * @param backup Backup to check.
- * @param m Mapping associated with the backup.
+ * @return Node ID.
*/
- MiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
- this.backup = backup;
- this.m = m;
+ ClusterNode node() {
+ return m.node();
}
/**
- * @return Future ID.
+ * @return Keys.
*/
- IgniteUuid futureId() {
- return futId;
+ public GridDistributedTxMapping mapping() {
+ return m;
}
/**
- * @return Node ID.
+ * @param nodeId Failed node ID.
*/
- public ClusterNode node() {
- assert m != null || backup != null;
+ boolean onNodeLeft(UUID nodeId) {
+ if (nodeId.equals(m.node().id())) {
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply: " + this);
+
+ if (isSync()) {
+ Map<UUID, Collection<UUID>> txNodes = tx.transactionNodes();
+
+ if (txNodes != null) {
+ Collection<UUID> backups = txNodes.get(nodeId);
+
+ if (!F.isEmpty(backups)) {
+ final CheckRemoteTxMiniFuture mini = new CheckRemoteTxMiniFuture(new HashSet<>(backups));
+
+ add(mini);
+
+ GridDhtTxFinishRequest req = checkCommittedRequest(mini.futureId());
+
+ req.waitRemoteTransactions(true);
+
+ for (UUID backupId : backups) {
+ ClusterNode backup = cctx.discovery().node(backupId);
+
+ if (backup != null && WAIT_REMOTE_TXS_SINCE.compareTo(backup.version()) <= 0) {
+ if (backup.isLocal()) {
+ IgniteInternalFuture<?> fut = cctx.tm().remoteTxFinishFuture(tx.nearXidVersion());
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ mini.onDhtFinishResponse(cctx.localNodeId());
+ }
+ });
+ }
+ else {
+ try {
+ cctx.io().send(backup, req, tx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ mini.onNodeLeft(backupId);
+ }
+ catch (IgniteCheckedException e) {
+ mini.onDone(e);
+ }
+ }
+ }
+ else
+ mini.onDhtFinishResponse(backupId);
+ }
+ }
+ }
+ }
- return backup != null ? backup : m.node();
+ onDone(tx);
+
+ return true;
+ }
+
+ return false;
}
/**
- * @return Keys.
+ * @param res Result callback.
*/
- public GridDistributedTxMapping mapping() {
- return m;
+ void onNearFinishResponse(GridNearTxFinishResponse res) {
+ if (res.error() != null)
+ onDone(res.error());
+ else
+ onDone(tx);
}
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(FinishMiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ }
+ }
+
+ /**
+ *
+ */
+ private class CheckBackupMiniFuture extends MinFuture {
+ /** Keys. */
+ @GridToStringInclude
+ private GridDistributedTxMapping m;
+
+ /** Backup node to check. */
+ private ClusterNode backup;
+
/**
- * @param e Error.
+ * @param backup Backup to check.
+ * @param m Mapping associated with the backup.
*/
- void onResult(Throwable e) {
- if (log.isDebugEnabled())
- log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
- // Fail.
- onDone(e);
+ CheckBackupMiniFuture(ClusterNode backup, GridDistributedTxMapping m) {
+ this.backup = backup;
+ this.m = m;
}
/**
- * @param e Node failure.
+ * @return Node ID.
*/
- void onResult(ClusterTopologyCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will fail): " + this);
+ public ClusterNode node() {
+ return backup;
+ }
- if (backup != null) {
+ /** {@inheritDoc} */
+ @Override boolean onNodeLeft(UUID nodeId) {
+ if (nodeId.equals(backup.id())) {
readyNearMappingFromBackup(m);
- onDone(e);
- }
- else
- // Complete future with tx.
- onDone(tx);
- }
+ onDone(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
- /**
- * @param res Result callback.
- */
- void onResult(GridNearTxFinishResponse res) {
- assert backup == null;
+ return true;
+ }
- if (res.error() != null)
- onDone(res.error());
- else
- onDone(tx);
+ return false;
}
/**
* @param res Response.
*/
- void onResult(GridDhtTxFinishResponse res) {
- assert backup != null;
-
+ void onDhtFinishResponse(GridDhtTxFinishResponse res) {
readyNearMappingFromBackup(m);
Throwable err = res.checkCommittedError();
@@ -755,9 +901,67 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
onDone(tx);
}
+ }
+
+ /**
+ *
+ */
+ private class CheckRemoteTxMiniFuture extends MinFuture {
+ /** */
+ private Set<UUID> nodes;
+
+ /**
+ * @param nodes Backup nodes.
+ */
+ public CheckRemoteTxMiniFuture(Set<UUID> nodes) {
+ this.nodes = nodes;
+ }
+
+ /**
+ * @return Backup nodes.
+ */
+ Set<UUID> nodes() {
+ synchronized (this) {
+ return new HashSet<>(nodes);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override boolean onNodeLeft(UUID nodeId) {
+ return onResponse(nodeId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ void onDhtFinishResponse(UUID nodeId) {
+ onResponse(nodeId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if processed node response.
+ */
+ private boolean onResponse(UUID nodeId) {
+ boolean done;
+
+ boolean ret;
+
+ synchronized (this) {
+ ret = nodes.remove(nodeId);
+
+ done = nodes.isEmpty();
+ }
+
+ if (done)
+ onDone(tx);
+
+ return ret;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error());
+ return S.toString(CheckRemoteTxMiniFuture.class, this);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 3e5e28f..65eac63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -70,6 +70,9 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
* @param commit Commit flag.
* @param invalidate Invalidate flag.
* @param sys System flag.
+ * @param plc IO policy.
+ * @param syncCommit Sync commit flag.
+ * @param syncRollback Sync rollback flag.
* @param explicitLock Explicit lock flag.
* @param storeEnabled Store enabled flag.
* @param topVer Topology version.
@@ -77,6 +80,8 @@ public class GridNearTxFinishRequest extends GridDistributedTxFinishRequest {
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
* @param txSize Expected transaction size.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash.
* @param addDepInfo Deployment info flag.
*/
public GridNearTxFinishRequest(
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
index 4904ad8..b84d2fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishResponse.java
@@ -99,7 +99,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
}
@@ -107,7 +107,7 @@ public class GridNearTxFinishResponse extends GridDistributedTxFinishResponse {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index aa4e929f..b7b480e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -202,7 +202,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheVersion nearXidVersion() {
+ @Override public GridCacheVersion nearXidVersion() {
return xidVer;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index d886243..8812709 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@ -264,7 +264,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (ownedVals != null) {
+ if (ownedVals != null && ownedValKeys == null) {
ownedValKeys = ownedVals.keySet();
ownedValVals = ownedVals.values();
@@ -287,7 +287,7 @@ public class GridNearTxPrepareResponse extends GridDistributedTxPrepareResponse
}
if (filterFailedKeys != null) {
- for (IgniteTxKey key :filterFailedKeys) {
+ for (IgniteTxKey key : filterFailedKeys) {
GridCacheContext cctx = ctx.cacheContext(key.cacheId());
key.prepareMarshal(cctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 59d8b5b..dc98eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -280,28 +280,28 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
GridCacheContext cctx = ctx.cacheContext(cacheId);
- if (keyValFilter != null) {
+ if (keyValFilter != null && keyValFilterBytes == null) {
if (addDepInfo)
prepareObject(keyValFilter, cctx);
keyValFilterBytes = CU.marshal(cctx, keyValFilter);
}
- if (rdc != null) {
+ if (rdc != null && rdcBytes == null) {
if (addDepInfo)
prepareObject(rdc, cctx);
rdcBytes = CU.marshal(cctx, rdc);
}
- if (trans != null) {
+ if (trans != null && transBytes == null) {
if (addDepInfo)
prepareObject(trans, cctx);
transBytes = CU.marshal(cctx, trans);
}
- if (!F.isEmpty(args)) {
+ if (!F.isEmpty(args) && argsBytes == null) {
if (addDepInfo) {
for (Object arg : args)
prepareObject(arg, cctx);
@@ -317,16 +317,16 @@ public class GridCacheQueryRequest extends GridCacheMessage implements GridCache
Marshaller mrsh = ctx.marshaller();
- if (keyValFilterBytes != null)
+ if (keyValFilterBytes != null && keyValFilter == null)
keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr);
- if (rdcBytes != null)
+ if (rdcBytes != null && rdc == null)
rdc = mrsh.unmarshal(rdcBytes, ldr);
- if (transBytes != null)
+ if (transBytes != null && trans == null)
trans = mrsh.unmarshal(transBytes, ldr);
- if (argsBytes != null)
+ if (argsBytes != null && args == null)
args = mrsh.unmarshal(argsBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index cce465b..ab882d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@ -122,11 +122,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
GridCacheContext cctx = ctx.cacheContext(cacheId);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = ctx.marshaller().marshal(err);
- metaDataBytes = marshalCollection(metadata, cctx);
- dataBytes = marshalCollection(data, cctx);
+ if (metaDataBytes == null)
+ metaDataBytes = marshalCollection(metadata, cctx);
+
+ if (dataBytes == null)
+ dataBytes = marshalCollection(data, cctx);
if (addDepInfo && !F.isEmpty(data)) {
for (Object o : data) {
@@ -144,11 +147,14 @@ public class GridCacheQueryResponse extends GridCacheMessage implements GridCach
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
- metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
- data = unmarshalCollection(dataBytes, ctx, ldr);
+ if (metadata == null)
+ metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
+
+ if (data == null)
+ data = unmarshalCollection(dataBytes, ctx, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index f5f99f5..914b4ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -160,6 +160,12 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
public long timeout(long timeout);
/**
+ * Changes transaction state from COMMITTING to MARKED_ROLLBACK.
+ * Must be called only from thread committing transaction.
+ */
+ public void errorWhenCommitting();
+
+ /**
* Modify the transaction associated with the current thread such that the
* only possible outcome of the transaction is to roll back the
* transaction.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 22e27c3..ed44c49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -804,6 +804,22 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
+ public final void errorWhenCommitting() {
+ synchronized (this) {
+ TransactionState prev = state;
+
+ assert prev == COMMITTING : prev;
+
+ state = MARKED_ROLLBACK;
+
+ if (log.isDebugEnabled())
+ log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']');
+
+ notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public boolean setRollbackOnly() {
return state(MARKED_ROLLBACK);
}
@@ -1083,7 +1099,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
case MARKED_ROLLBACK: {
- valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == COMMITTING;
+ valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED;
break;
}
@@ -1705,6 +1721,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
/** {@inheritDoc} */
+ @Override public void errorWhenCommitting() {
+ throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+ }
+
+ /** {@inheritDoc} */
@Override public void commit() {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index c42bc7f..f731975 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -828,7 +828,12 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
val.marshal(ctx, context());
- expiryPlcBytes = transferExpiryPlc ? CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc)) : null;
+ if (transferExpiryPlc) {
+ if (expiryPlcBytes == null)
+ expiryPlcBytes = CU.marshal(this.ctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+ }
+ else
+ expiryPlcBytes = null;
}
/**
@@ -871,8 +876,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
val.unmarshal(this.ctx, clsLdr);
- if (expiryPlcBytes != null)
- expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
+ if (expiryPlcBytes != null && expiryPlc == null)
+ expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, clsLdr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index b25baf8..547c018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -721,14 +721,12 @@ public class IgniteTxHandler {
IgniteInternalFuture<IgniteInternalTx> res = null;
- if (tx != null) {
- IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
+ IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync();
- // Only for error logging.
- rollbackFut.listen(CU.errorLogger(log));
+ // Only for error logging.
+ rollbackFut.listen(CU.errorLogger(log));
- res = rollbackFut;
- }
+ res = rollbackFut;
if (e instanceof Error)
throw (Error)e;
@@ -875,7 +873,19 @@ public class IgniteTxHandler {
log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']');
if (req.checkCommitted()) {
- sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version()));
+ boolean committed = req.waitRemoteTransactions() || !ctx.tm().addRolledbackTx(null, req.version());
+
+ if (!committed || !req.syncCommit())
+ sendReply(nodeId, req, committed);
+ else {
+ IgniteInternalFuture<?> fut = ctx.tm().remoteTxFinishFuture(req.version());
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ sendReply(nodeId, req, true);
+ }
+ });
+ }
return;
}
@@ -1044,7 +1054,7 @@ public class IgniteTxHandler {
* @param committed {@code True} if transaction committed on this node.
*/
protected void sendReply(UUID nodeId, GridDhtTxFinishRequest req, boolean committed) {
- if (req.replyRequired()) {
+ if (req.replyRequired() || req.checkCommitted()) {
GridDhtTxFinishResponse res = new GridDhtTxFinishResponse(req.version(), req.futureId(), req.miniId());
if (req.checkCommitted()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 21ff0cf..926eaf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -809,7 +809,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
catch (IgniteCheckedException ex) {
commitError(ex);
- setRollbackOnly();
+ errorWhenCommitting();
// Safe to remove transaction from committed tx list because nothing was committed yet.
cctx.tm().removeCommittedTx(this);
@@ -819,7 +819,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
catch (Throwable ex) {
commitError(ex);
- setRollbackOnly();
+ errorWhenCommitting();
// Safe to remove transaction from committed tx list because nothing was committed yet.
cctx.tm().removeCommittedTx(this);
@@ -1161,7 +1161,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
// Set operation to NOOP.
txEntry.op(NOOP);
- setRollbackOnly();
+ errorWhenCommitting();
throw ex;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index d384e4e..ca15e20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1063,6 +1063,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) {
uncommitTx(tx);
+ tx.errorWhenCommitting();
+
throw new IgniteException("Missing commit version (consider increasing " +
IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
", tx=" + tx.getClass().getSimpleName() + ']');
@@ -1616,6 +1618,24 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param nearVer Near version.
+ * @return Finish future for related remote transactions.
+ */
+ @SuppressWarnings("unchecked")
+ public IgniteInternalFuture<?> remoteTxFinishFuture(GridCacheVersion nearVer) {
+ GridCompoundFuture<Void, Void> fut = new GridCompoundFuture<>();
+
+ for (final IgniteInternalTx tx : txs()) {
+ if (!tx.local() && nearVer.equals(tx.nearXidVersion()))
+ fut.add((IgniteInternalFuture) tx.finishFuture());
+ }
+
+ fut.markInitialized();
+
+ return fut;
+ }
+
+ /**
* @param nearVer Near version ID.
* @param txNum Number of transactions.
* @param fut Result future.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index 3d65304..77c802d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -105,6 +105,7 @@ public class DataStreamerRequest implements Message {
* @param entries Entries to put.
* @param ignoreDepOwnership Ignore ownership.
* @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
* @param depMode Deployment mode.
* @param sampleClsName Sample class name.
* @param userVer User version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index cd783e4..98848ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -926,8 +926,15 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
CacheConfiguration newCfg = cacheConfiguration(cfg, cacheName);
- if (ctx.cache().cache(cacheName) == null)
- ctx.cache().dynamicStartCache(newCfg, cacheName, null, CacheType.INTERNAL, false, true).get();
+ if (ctx.cache().cache(cacheName) == null) {
+ ctx.cache().dynamicStartCache(newCfg,
+ cacheName,
+ null,
+ CacheType.INTERNAL,
+ false,
+ true,
+ true).get();
+ }
assert ctx.cache().cache(cacheName) != null : cacheName;
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
index f4a8fad..ecb892e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAckMessage.java
@@ -90,7 +90,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
super.prepareMarshal(marsh);
- if (err != null)
+ if (err != null && errBytes == null)
errBytes = marsh.marshal(err);
}
@@ -98,7 +98,7 @@ public class IgfsAckMessage extends IgfsCommunicationMessage {
@Override public void finishUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(marsh, ldr);
- if (errBytes != null)
+ if (errBytes != null && err == null)
err = marsh.unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
index 65dca08..a89913f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/cache/GridCacheCommandHandler.java
@@ -357,7 +357,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
switch (cmd) {
case DESTROY_CACHE: {
- fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName).chain(
+ // Do not check thread tx here since there can be active system cache txs.
+ fut = ((IgniteKernal)ctx.grid()).destroyCacheAsync(cacheName, false).chain(
new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
@Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
throws IgniteCheckedException {
@@ -369,7 +370,8 @@ public class GridCacheCommandHandler extends GridRestCommandHandlerAdapter {
}
case GET_OR_CREATE_CACHE: {
- fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName).chain(
+ // Do not check thread tx here since there can be active system cache txs.
+ fut = ((IgniteKernal)ctx.grid()).getOrCreateCacheAsync(cacheName, false).chain(
new CX1<IgniteInternalFuture<?>, GridRestResponse>() {
@Override public GridRestResponse applyx(IgniteInternalFuture<?> f)
throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 1ea5014..8c23d92 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -104,6 +104,7 @@ public interface DiscoverySpi extends IgniteSpi {
* Sets a handler for initial data exchange between Ignite nodes.
*
* @param exchange Discovery data exchange handler.
+ * @return {@code this} for chaining.
*/
public TcpDiscoverySpi setDataExchange(DiscoverySpiDataExchange exchange);
@@ -113,6 +114,7 @@ public interface DiscoverySpi extends IgniteSpi {
* dynamic metrics between nodes.
*
* @param metricsProvider Provider of metrics data.
+ * @return {@code this} for chaining.
*/
public TcpDiscoverySpi setMetricsProvider(DiscoveryMetricsProvider metricsProvider);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 066a5fd..21204c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -204,7 +204,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* Stops streamer.
*/
public void stop() {
- srv.stop();
+ if (srv != null)
+ srv.stop();
if (log.isDebugEnabled())
log.debug("Socket streaming server stopped");
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
index 95ca9b5..9908b87 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiMultithreadedSelfTest.java
@@ -324,10 +324,14 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
@Override public void applyx(IgniteCache<String, Integer> cache) {
int rnd = random();
+ Set<Integer> ids = new HashSet<>(set);
+
cache.removeAll(rangeKeys(0, rnd));
- for (int i = 0; i < rnd; i++)
- assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+ for (int i = 0; i < rnd; i++) {
+ if (ids.contains(i))
+ assertNull(cache.localPeek("key" + i));
+ }
}
});
}
@@ -350,7 +354,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
for (int i = 0; i < rnd; i++) {
if (ids.contains(i))
- assert cache.localPeek("key" + i, CachePeekMode.ONHEAP) == null;
+ assertNull(cache.localPeek("key" + i));
}
}
});
@@ -359,6 +363,7 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
/**
* @param cache Cache.
* @param key Key.
+ * @return Removed value.
*/
private <K, V> V removeAsync(IgniteCache<K, V> cache, K key) {
IgniteCache<K, V> cacheAsync = cache.withAsync();
@@ -371,6 +376,8 @@ public abstract class GridCacheAbstractFullApiMultithreadedSelfTest extends Grid
/**
* @param cache Cache.
* @param key Key.
+ * @param val Value.
+ * @return Remove result.
*/
private <K, V> boolean removeAsync(IgniteCache<K, V> cache, K key, V val) {
IgniteCache<K, V> cacheAsync = cache.withAsync();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
index 0d9c541..1e0071e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java
@@ -4337,7 +4337,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract
log.info("Set iterators not cleared, will wait");
- Thread.sleep(500);
+ Thread.sleep(1000);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
index a34857f..e70c97b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheStopSelfTest.java
@@ -299,7 +299,7 @@ public class GridCacheStopSelfTest extends GridCommonAbstractTest {
return null;
}
- }));
+ }, "cache-thread"));
}
readyLatch.await();
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
index 8a5dfd4..c9cd750 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheStartSelfTest.java
@@ -158,7 +158,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
ccfg.setName(DYNAMIC_CACHE_NAME);
- futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+ futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+ ccfg.getName(),
+ null,
+ true,
+ true,
+ true));
return null;
}
@@ -190,7 +195,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
- futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+ futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
return null;
}
@@ -218,7 +223,12 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
- futs.add(kernal.context().cache().dynamicStartCache(ccfg, ccfg.getName(), null, true, true));
+ futs.add(kernal.context().cache().dynamicStartCache(ccfg,
+ ccfg.getName(),
+ null,
+ true,
+ true,
+ true));
return null;
}
@@ -252,7 +262,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
@Override public Object call() throws Exception {
IgniteEx kernal = grid(ThreadLocalRandom.current().nextInt(nodeCount()));
- futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME));
+ futs.add(kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true));
return null;
}
@@ -315,7 +325,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount(); g++)
caches[g] = grid(g).cache(DYNAMIC_CACHE_NAME);
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
for (int g = 0; g < nodeCount(); g++) {
final IgniteKernal kernal0 = (IgniteKernal) grid(g);
@@ -368,7 +378,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}
// Undeploy cache.
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
startGrid(nodeCount() + 1);
@@ -445,7 +455,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
}, IllegalArgumentException.class, null);
}
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
stopGrid(nodeCount() + 1);
stopGrid(nodeCount());
@@ -512,7 +522,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
}
finally {
stopGrid(nodeCount());
@@ -554,7 +564,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
}
finally {
stopGrid(nodeCount());
@@ -600,7 +610,7 @@ public class IgniteDynamicCacheStartSelfTest extends GridCommonAbstractTest {
for (int g = 0; g < nodeCount() + 1; g++)
assertEquals("1", ignite(g).cache(DYNAMIC_CACHE_NAME).get("1"));
- kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME).get();
+ kernal.context().cache().dynamicDestroyCache(DYNAMIC_CACHE_NAME, true).get();
}
finally {
stopGrid(nodeCount());
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
index bf6dcda..34e7080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteClientDataStructuresAbstractTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -59,6 +60,8 @@ public abstract class IgniteClientDataStructuresAbstractTest extends GridCommonA
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
return cfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
index 84838db..a08d080 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java
@@ -188,6 +188,9 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
}
/**
+ * @param conc Transaction concurrency.
+ * @param backup Check backup flag.
+ * @param commit Check commit flag.
* @throws Exception If failed.
*/
private void checkPrimaryNodeFailureBackupCommit(
@@ -197,6 +200,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
) throws Exception {
try {
startGrids(gridCount());
+
awaitPartitionMapExchange();
for (int i = 0; i < gridCount(); i++)
@@ -290,7 +294,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
return null;
}
- });
+ }, "tx-thread");
commitLatch.await();
@@ -366,6 +370,7 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest {
/**
* @param ignite Ignite instance to generate key.
+ * @param backup Backup key flag.
* @return Generated key that is not primary nor backup for {@code ignite(0)} and primary for
* {@code ignite(1)}.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
new file mode 100644
index 0000000..c47401c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheCommitDelayTxRecoveryTest.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheCommitDelayTxRecoveryTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int SRVS = 4;
+
+ /** */
+ private static volatile boolean commit;
+
+ /** */
+ private static volatile CountDownLatch commitStartedLatch;
+
+ /** */
+ private static volatile CountDownLatch commitFinishLatch;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecovery1() throws Exception {
+ checkRecovery(1, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecovery2() throws Exception {
+ checkRecovery(2, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecoveryStoreEnabled1() throws Exception {
+ checkRecovery(1, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRecoveryStoreEnabled2() throws Exception {
+ checkRecovery(2, true);
+ }
+
+ /**
+ * @param backups Number of cache backups.
+ * @param useStore If {@code true} tests cache with store configured.
+ * @throws Exception If failed.
+ */
+ private void checkRecovery(int backups, boolean useStore) throws Exception {
+ startGridsMultiThreaded(SRVS, false);
+
+ client = true;
+
+ Ignite clientNode = startGrid(SRVS);
+
+ assertTrue(clientNode.configuration().isClientMode());
+
+ client = false;
+
+ clientNode.createCache(cacheConfiguration(backups, useStore));
+
+ awaitPartitionMapExchange();
+
+ Ignite srv = ignite(0);
+
+ assertFalse(srv.configuration().isClientMode());
+
+ for (Boolean pessimistic : Arrays.asList(false, true)) {
+ checkRecovery(backupKey(srv.cache(null)), srv, pessimistic, useStore);
+
+ checkRecovery(nearKey(srv.cache(null)), srv, pessimistic, useStore);
+
+ checkRecovery(nearKey(clientNode.cache(null)), clientNode, pessimistic, useStore);
+
+ srv = ignite(0);
+
+ assertFalse(srv.configuration().isClientMode());
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param ignite Node executing update.
+ * @param pessimistic If {@code true} uses pessimistic transaction.
+ * @param useStore {@code True} if store is used.
+ * @throws Exception If failed.
+ */
+ private void checkRecovery(final Integer key,
+ final Ignite ignite,
+ final boolean pessimistic,
+ final boolean useStore) throws Exception {
+ Ignite primary = primaryNode(key, null);
+
+ assertNotSame(ignite, primary);
+
+ List<Ignite> backups = backupNodes(key, null);
+
+ assertFalse(backups.isEmpty());
+
+ final Set<String> backupNames = new HashSet<>();
+
+ for (Ignite node : backups)
+ backupNames.add(node.name());
+
+ log.info("Check recovery [key=" + key +
+ ", pessimistic=" + pessimistic +
+ ", primary=" + primary.name() +
+ ", backups=" + backupNames +
+ ", node=" + ignite.name() + ']');
+
+ final IgniteCache<Integer, Integer> cache = ignite.cache(null);
+
+ cache.put(key, 0);
+
+ commitStartedLatch = new CountDownLatch(backupNames.size());
+ commitFinishLatch = new CountDownLatch(1);
+
+ commit = false;
+
+ TestEntryProcessor.skipFirst = useStore ? ignite.name() : null;
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ log.info("Start update.");
+
+ if (pessimistic) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.invoke(key, new TestEntryProcessor(backupNames));
+
+ commit = true;
+
+ log.info("Start commit.");
+
+ assertEquals(backupNames.size(), commitStartedLatch.getCount());
+
+ tx.commit();
+ }
+ }
+ else {
+ commit = true;
+
+ cache.invoke(key, new TestEntryProcessor(backupNames));
+ }
+
+ log.info("End update, execute get.");
+
+ Integer val = cache.get(key);
+
+ log.info("Get value: " + val);
+
+ assertEquals(1, (Object)val);
+
+ return null;
+ }
+ }, "update-thread");
+
+ assertTrue(commitStartedLatch.await(30, SECONDS));
+
+ log.info("Stop node: " + primary.name());
+
+ primary.close();
+
+ commitFinishLatch.countDown();
+
+ fut.get();
+
+ for (Ignite node : G.allGrids())
+ assertEquals(1, node.cache(null).get(key));
+
+ cache.put(key, 2);
+
+ for (Ignite node : G.allGrids())
+ assertEquals(2, node.cache(null).get(key));
+
+ startGrid(primary.name());
+
+ for (Ignite node : G.allGrids())
+ assertEquals(2, node.cache(null).get(key));
+
+ cache.put(key, 3);
+
+ for (Ignite node : G.allGrids())
+ assertEquals(3, node.cache(null).get(key));
+
+ awaitPartitionMapExchange();
+ }
+
+ /**
+ *
+ */
+ static class TestEntryProcessor implements CacheEntryProcessor<Integer, Integer, Void> {
+ /** */
+ private Set<String> nodeNames;
+
+ /** Skips first call for given node (used to skip call for store update). */
+ private static String skipFirst;
+
+ /**
+ * @param nodeNames Node names where sleep will be called.
+ */
+ public TestEntryProcessor(Set<String> nodeNames) {
+ this.nodeNames = nodeNames;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<Integer, Integer> entry, Object... args) {
+ Ignite ignite = entry.unwrap(Ignite.class);
+
+ System.out.println(Thread.currentThread().getName() + " process [node=" + ignite.name() +
+ ", commit=" + commit + ", skipFirst=" + skipFirst + ']');
+
+ boolean skip = false;
+
+ if (commit && ignite.name().equals(skipFirst)) {
+ skipFirst = null;
+
+ skip = true;
+ }
+
+ if (!skip && commit && nodeNames.contains(ignite.name())) {
+ try {
+ System.out.println(Thread.currentThread().getName() + " start process invoke.");
+
+ assertTrue(commitStartedLatch != null && commitStartedLatch.getCount() > 0);
+
+ commitStartedLatch.countDown();
+
+ assertTrue(commitFinishLatch.await(10, SECONDS));
+
+ System.out.println(Thread.currentThread().getName() + " end process invoke.");
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ else
+ System.out.println(Thread.currentThread().getName() + " invoke set value.");
+
+ entry.setValue(1);
+
+ return null;
+ }
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param useStore If {@code true} adds cache store.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Object, Object> cacheConfiguration(int backups, boolean useStore) {
+ CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
+
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setBackups(backups);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setRebalanceMode(SYNC);
+
+ if (useStore) {
+ ccfg.setCacheStoreFactory(new TestStoreFactory());
+
+ ccfg.setWriteThrough(true);
+ }
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> {
+ /** {@inheritDoc} */
+ @Override public CacheStore<Object, Object> create() {
+ return new CacheStoreAdapter<Object, Object>() {
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ @Override public void write(Cache.Entry entry) throws CacheWriterException {
+ // No-op.
+ }
+
+ @Override public void delete(Object key) throws CacheWriterException {
+ // No-op.
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 4eb8a6b..7532354 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -30,6 +30,8 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
+
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -238,7 +240,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
* @param store If {@code true} uses cache with store.
* @throws Exception If failed.
*/
- private void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
+ protected final void checkRetry(Test test, TestMemoryMode memMode, boolean store) throws Exception {
ignite(0).createCache(cacheConfiguration(memMode, store));
final AtomicBoolean finished = new AtomicBoolean();
@@ -259,7 +261,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
}
});
- IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
+ final IgniteCache<Integer, Integer> cache = ignite(0).cache(null);
int iter = 0;
@@ -309,6 +311,31 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
break;
}
+ case TX_PUT: {
+ while (System.currentTimeMillis() < stopTime) {
+ final Integer val = ++iter;
+
+ Ignite ignite = ignite(0);
+
+ for (int i = 0; i < keysCnt; i++) {
+ final Integer key = i;
+
+ doInTransaction(ignite, new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ cache.put(key, val);
+
+ return null;
+ }
+ });
+ }
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
+
+ break;
+ }
+
case PUT_ALL: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
@@ -541,7 +568,10 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
INVOKE,
/** */
- INVOKE_ALL
+ INVOKE_ALL,
+
+ /** */
+ TX_PUT
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 7655464..9204bc8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -101,6 +101,20 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
/**
* @throws Exception If failed.
*/
+ public void testExplicitTransactionRetriesSingleValue() throws Exception {
+ checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testExplicitTransactionRetriesSingleValueStoreEnabled() throws Exception {
+ checkRetry(Test.TX_PUT, TestMemoryMode.HEAP, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testExplicitTransactionRetries() throws Exception {
explicitTransactionRetries(TestMemoryMode.HEAP, false);
}
@@ -108,6 +122,13 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
/**
* @throws Exception If failed.
*/
+ public void testExplicitTransactionRetriesSingleOperation() throws Exception {
+ explicitTransactionRetries(TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testExplicitTransactionRetriesStoreEnabled() throws Exception {
explicitTransactionRetries(TestMemoryMode.HEAP, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
index d239ea8..91eecbb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -104,6 +105,8 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
cfg.setDiscoverySpi(disc);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
if (include)
cfg.setUserAttributes(F.asMap("include", true));
http://git-wip-us.apache.org/repos/asf/ignite/blob/457a9ae4/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 6089795..552dd28 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -569,9 +569,9 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
GridNioServerListener lsnr,
@Nullable Integer queueLimit) throws Exception {
for (int i = 0; i < 10; i++) {
- try {
- int srvPort = port++;
+ int srvPort = port++;
+ try {
GridNioServer.Builder<?> builder = serverBuilder(srvPort, parser, lsnr);
if (queueLimit != null)
@@ -584,8 +584,11 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
return srvr;
}
catch (IgniteCheckedException e) {
- if (i < 9 && e.hasCause(BindException.class))
- log.error("Failed to start server, will try another port: " + e);
+ if (i < 9 && e.hasCause(BindException.class)) {
+ log.error("Failed to start server, will try another port [err=" + e + ", port=" + srvPort + ']');
+
+ U.sleep(5000);
+ }
else
throw e;
}