You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/02 09:03:15 UTC
[01/11] ignite git commit: IGNITE-5866: Fix MetadataJob filter to
exclude any of system caches - Fixes #2367.
Repository: ignite
Updated Branches:
refs/heads/ignite-5578 ee7062e4e -> 760adcab1
IGNITE-5866: Fix MetadataJob filter to exclude any of system caches - Fixes #2367.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bdb838f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bdb838f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bdb838f0
Branch: refs/heads/ignite-5578
Commit: bdb838f0fe364f07049b9441f8689ef606aff002
Parents: f575a245
Author: Nikolay Izhikov <NV...@sberbank.ru>
Authored: Tue Aug 1 14:35:16 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Aug 1 14:35:16 2017 +0300
----------------------------------------------------------------------
.../internal/processors/cache/query/GridCacheQueryManager.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bdb838f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index f107038..3e772cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.datastructures.GridSetQueryPredicate;
import org.apache.ignite.internal.processors.datastructures.SetItemKey;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
@@ -2067,7 +2068,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
},
new P1<IgniteInternalCache<?, ?>>() {
@Override public boolean apply(IgniteInternalCache<?, ?> c) {
- return !CU.UTILITY_CACHE_NAME.equals(c.name());
+ return !CU.isSystemCache(c.name()) && !DataStructuresProcessor.isDataStructureCache(c.name());
}
}
);
[09/11] ignite git commit: 5578
Posted by sb...@apache.org.
5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1e9602fb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1e9602fb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1e9602fb
Branch: refs/heads/ignite-5578
Commit: 1e9602fbf9653106302418f914d565ded4db4bc9
Parents: 977b752
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 2 11:55:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 2 11:55:41 2017 +0300
----------------------------------------------------------------------
.../affinity/GridAffinityAssignmentCache.java | 8 --------
.../cache/CacheAffinitySharedManager.java | 20 +++++++++++---------
...CacheExchangeMessageDuplicatedStateTest.java | 3 ---
3 files changed, 11 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9602fb/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index c60a690..410da73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -337,14 +337,6 @@ public class GridAffinityAssignmentCache {
}
/**
- * @param topVer
- * @return
- */
- public boolean lastVersionEquals(AffinityTopologyVersion topVer) {
- return topVer.equals(lastVersion());
- }
-
- /**
* @return Last calculated affinity version.
*/
public AffinityTopologyVersion lastVersion() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9602fb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2138261..1ff2b56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1053,13 +1053,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
- * @return All registered cache groups.
- */
- public Collection<CacheGroupDescriptor> cacheGroups() {
- return caches.allGroups();
- }
-
- /**
* @param c Cache closure.
* @throws IgniteCheckedException If failed
*/
@@ -1486,6 +1479,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
continue;
if (fut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) {
+ // Do not calculate affinity since it can change in case of exchange merge.
if (!fut.context().mergeExchanges()) {
List<List<ClusterNode>> assignment = grp.affinity().calculate(topVer,
fut.discoveryEvent(),
@@ -1657,7 +1651,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
final GridAffinityAssignmentCache aff = grpHolder.affinity();
if (newAff) {
- if (!aff.lastVersionEquals(topVer)) {
+ if (!aff.lastVersion().equals(topVer)) {
List<List<ClusterNode>> assign =
aff.calculate(topVer, fut.discoveryEvent(), fut.discoCache());
@@ -1717,7 +1711,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
if (newAff) {
GridAffinityAssignmentCache aff = grpHolder.affinity();
- if (!aff.lastVersionEquals(topVer)) {
+ if (!aff.lastVersion().equals(topVer)) {
List<List<ClusterNode>> assign = aff.calculate(topVer,
fut.discoveryEvent(),
fut.discoCache());
@@ -1786,6 +1780,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @param fut Current exchange future.
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return Rabalance info.
@@ -2175,6 +2170,13 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap
}
/**
+ * @return All registered cache groups.
+ */
+ public Collection<CacheGroupDescriptor> cacheGroups() {
+ return caches.allGroups();
+ }
+
+ /**
*
*/
public void dumpDebugInfo() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/1e9602fb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
index dc88852..bff63fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheExchangeMessageDuplicatedStateTest.java
@@ -237,9 +237,6 @@ public class CacheExchangeMessageDuplicatedStateTest extends GridCommonAbstractT
private void checkFullMessage(GridDhtPartitionsFullMessage msg) {
Map<Integer, Integer> dupPartsData = getFieldValue(msg, "dupPartsData");
- if (dupPartsData == null)
- System.out.println();
-
assertNotNull(dupPartsData);
checkFullMessage(AFF1_CACHE1, AFF1_CACHE2, dupPartsData, msg);
[03/11] ignite git commit: IGNITE-5859 IgniteUtils.ceilPow2 overflow
for values greater than 2^30
Posted by sb...@apache.org.
IGNITE-5859 IgniteUtils.ceilPow2 overflow for values greater than 2^30
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/160dab09
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/160dab09
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/160dab09
Branch: refs/heads/ignite-5578
Commit: 160dab09587a4c6ebdcfd71368360cfcb153575b
Parents: 5224c9d
Author: Vitaliy Biryukov <Bi...@gmail.com>
Authored: Tue Aug 1 17:02:53 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Aug 1 17:02:53 2017 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 4 ++-
.../internal/util/IgniteUtilsSelfTest.java | 26 ++++++++++++++++++++
2 files changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/160dab09/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 4116d07..524286c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -987,7 +987,9 @@ public abstract class IgniteUtils {
* @return Nearest power of 2.
*/
public static int ceilPow2(int v) {
- return Integer.highestOneBit(v - 1) << 1;
+ int i = v - 1;
+
+ return Integer.highestOneBit(i) << 1 - (i >>> 30 ^ v >> 31);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/160dab09/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 057d343..963c1d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -822,6 +822,32 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
}
/**
+ *
+ */
+ public void testCeilPow2() throws Exception {
+ assertEquals(2, U.ceilPow2(2));
+ assertEquals(4, U.ceilPow2(3));
+ assertEquals(4, U.ceilPow2(4));
+ assertEquals(8, U.ceilPow2(5));
+ assertEquals(8, U.ceilPow2(6));
+ assertEquals(8, U.ceilPow2(7));
+ assertEquals(8, U.ceilPow2(8));
+ assertEquals(16, U.ceilPow2(9));
+ assertEquals(1 << 15, U.ceilPow2((1 << 15) - 1));
+ assertEquals(1 << 15, U.ceilPow2(1 << 15));
+ assertEquals(1 << 16, U.ceilPow2((1 << 15) + 1));
+ assertEquals(1 << 26, U.ceilPow2((1 << 26) - 100));
+ assertEquals(1 << 26, U.ceilPow2(1 << 26));
+ assertEquals(1 << 27, U.ceilPow2((1 << 26) + 100));
+
+ for (int i = (int)Math.pow(2, 30); i < Integer.MAX_VALUE; i++)
+ assertEquals((int)Math.pow(2, 30), U.ceilPow2(i));
+
+ for (int i = Integer.MIN_VALUE; i < 0; i++)
+ assertEquals(0, U.ceilPow2(i));
+ }
+
+ /**
* Test enum.
*/
private enum TestEnum {
[05/11] ignite git commit: Added Redis commands package description.
Posted by sb...@apache.org.
Added Redis commands package description.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb034965
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb034965
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb034965
Branch: refs/heads/ignite-5578
Commit: cb034965451d865408de7320b6ce31872f7dc279
Parents: 40c91c5
Author: shroman <rs...@yahoo.com>
Authored: Wed Aug 2 12:58:31 2017 +0900
Committer: shroman <rs...@yahoo.com>
Committed: Wed Aug 2 12:58:31 2017 +0900
----------------------------------------------------------------------
.../rest/handlers/redis/package-info.java | 22 ++++++++++++++++++++
1 file changed, 22 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cb034965/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/package-info.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/package-info.java
new file mode 100644
index 0000000..9c4a9bc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/redis/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * <!-- Package description. -->
+ * Contains Redis protocol commands.
+ */
+package org.apache.ignite.internal.processors.rest.handlers.redis;
\ No newline at end of file
[10/11] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5578
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
# modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2d4b1940
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2d4b1940
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2d4b1940
Branch: refs/heads/ignite-5578
Commit: 2d4b1940fca80bb5f3d86ae8b5a38544e4a94d7f
Parents: 1e9602f 20d9ad5
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 2 12:02:51 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 2 12:02:51 2017 +0300
----------------------------------------------------------------------
.../ignite/tests/utils/TestTransaction.java | 10 +
.../processors/cache/GridCacheMapEntry.java | 3 +
.../cache/GridCacheSharedContext.java | 24 +
.../dht/GridDhtPartitionTopology.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 21 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 25 +-
.../distributed/near/GridNearCacheEntry.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 51 ++
.../near/GridNearTxPrepareRequest.java | 8 +-
.../store/GridCacheStoreManagerAdapter.java | 10 +
.../cache/transactions/IgniteTxAdapter.java | 20 +-
.../cache/transactions/IgniteTxManager.java | 77 +-
.../cache/transactions/IgniteTxMap.java | 2 +-
.../transactions/TransactionProxyImpl.java | 46 +-
.../apache/ignite/transactions/Transaction.java | 14 +
.../ignite/transactions/TransactionState.java | 7 +-
...ptimisticTxSuspendResumeMultiServerTest.java | 30 +
.../IgniteOptimisticTxSuspendResumeTest.java | 751 +++++++++++++++++++
.../IgnitePessimisticTxSuspendResumeTest.java | 91 +++
.../IgniteRejectConnectOnNodeStopTest.java | 7 +-
.../ignite/testframework/GridTestUtils.java | 26 +
.../testframework/junits/GridAbstractTest.java | 3 +-
.../cache/GridAbstractCacheStoreSelfTest.java | 10 +
.../testsuites/IgniteCacheTestSuite6.java | 7 +
.../processors/cache/jta/CacheJtaManager.java | 5 +-
.../processors/cache/jta/CacheJtaResource.java | 28 +-
.../GridJtaTransactionManagerSelfTest.java | 208 +++++
.../ignite/testsuites/IgniteJtaTestSuite.java | 3 +
28 files changed, 1459 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d4b1940/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d4b1940/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d4b1940/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index d3f1e94,880a102..ab0aec7
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@@ -667,7 -621,12 +664,12 @@@ public class GridDhtPartitionTopologyIm
}
}
- updateRebalanceVersion(grp.affinity().readyAssignments(topVer));
- List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
++ List<List<ClusterNode>> aff = grp.affinity().readyAssignments(topVer);
+
+ updateRebalanceVersion(aff);
+
+ if (node2part != null && node2part.valid())
+ changed |= checkEvictions(updateSeq, aff);
consistencyCheck();
}
@@@ -787,11 -746,12 +789,12 @@@
if (!belongs)
throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
- "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
+ "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']');
}
else if (loc != null && state == RENTING && !showRenting)
- throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted " +
- "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
+ throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " +
+ "evicted [part=" + p + ", shouldBeMoving=" + loc.reload() + ", belongs=" + belongs +
+ ", topVer=" + topVer + ", curTopVer=" + this.topVer + "]");
if (loc == null) {
if (!belongs)
@@@ -1377,10 -1312,11 +1380,11 @@@
long updateSeq = this.updateSeq.incrementAndGet();
- if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
+ if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
+ List<List<ClusterNode>> aff = grp.affinity().readyAssignments(readyTopVer);
- changed |= checkEvictions(updateSeq, readyTopVer, aff);
+ if (exchangeVer == null)
- changed |= checkEvictions(updateSeq, aff);
++ changed |= checkEvictions(updateSeq, readyTopVer, aff);
updateRebalanceVersion(aff);
}
@@@ -1536,57 -1455,58 +1540,58 @@@
node2part.put(parts.nodeId(), parts);
- AffinityTopologyVersion affVer = grp.affinity().lastVersion();
-
- if (affVer.compareTo(diffFromAffinityVer) >= 0) {
- AffinityAssignment affAssignment = grp.affinity().cachedAffinity(affVer);
-
- // Add new mappings.
- for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
- int p = e.getKey();
+ // During exchange calculate diff after all messages are received and affinity initialized.
+ if (exchId == null) {
+ if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
+ AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
- Set<UUID> diffIds = diffFromAffinity.get(p);
+ // Add new mappings.
+ for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
+ int p = e.getKey();
- if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING)
- && !affAssignment.getIds(p).contains(parts.nodeId())) {
- if (diffIds == null)
- diffFromAffinity.put(p, diffIds = U.newHashSet(3));
+ Set<UUID> diffIds = diffFromAffinity.get(p);
- if (diffIds.add(parts.nodeId()))
- changed = true;
- }
- else {
- if (diffIds != null && diffIds.remove(parts.nodeId())) {
- changed = true;
+ if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING)
+ && !affAssignment.getIds(p).contains(parts.nodeId())) {
+ if (diffIds == null)
+ diffFromAffinity.put(p, diffIds = U.newHashSet(3));
- if (diffIds.isEmpty())
- diffFromAffinity.remove(p);
+ if (diffIds.add(parts.nodeId()))
+ changed = true;
}
+ else {
+ if (diffIds != null && diffIds.remove(parts.nodeId())) {
+ changed = true;
+ if (diffIds.isEmpty())
+ diffFromAffinity.remove(p);
+ }
+ }
}
- }
- // Remove obsolete mappings.
- if (cur != null) {
- for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
- Set<UUID> ids = diffFromAffinity.get(p);
+ // Remove obsolete mappings.
+ if (cur != null) {
+ for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
+ Set<UUID> ids = diffFromAffinity.get(p);
- if (ids != null && ids.remove(parts.nodeId())) {
- changed = true;
+ if (ids != null && ids.remove(parts.nodeId())) {
+ changed = true;
- if (ids.isEmpty())
- diffFromAffinity.remove(p);
+ if (ids.isEmpty())
+ diffFromAffinity.remove(p);
+ }
}
}
- }
- diffFromAffinityVer = affVer;
+ diffFromAffinityVer = readyTopVer;
+ }
}
- if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
- List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
+ if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
+ List<List<ClusterNode>> aff = grp.affinity().assignments(readyTopVer);
- changed |= checkEvictions(updateSeq, readyTopVer, aff);
+ if (exchId == null)
- changed |= checkEvictions(updateSeq, aff);
++ changed |= checkEvictions(updateSeq, readyTopVer, aff);
updateRebalanceVersion(aff);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d4b1940/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d4b1940/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2d4b1940/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index a0b4c8d,771e974..86725e1
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@@ -18,8 -18,10 +18,11 @@@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CacheExchangeMergeTest;
import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
+ import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
+ import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
+ import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
/**
* Test suite.
@@@ -34,8 -36,10 +37,12 @@@ public class IgniteCacheTestSuite6 exte
suite.addTestSuite(CachePartitionStateTest.class);
+ suite.addTestSuite(IgniteOptimisticTxSuspendResumeTest.class);
+ suite.addTestSuite(IgniteOptimisticTxSuspendResumeMultiServerTest.class);
+ suite.addTestSuite(IgnitePessimisticTxSuspendResumeTest.class);
+
+ suite.addTestSuite(CacheExchangeMergeTest.class);
+
return suite;
}
}
[04/11] ignite git commit: IGNITE-5775: Fix removing jobs from
activeJobs for jobAlwaysActivate. This closes #2319.
Posted by sb...@apache.org.
IGNITE-5775: Fix removing jobs from activeJobs for jobAlwaysActivate. This closes #2319.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/40c91c59
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/40c91c59
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/40c91c59
Branch: refs/heads/ignite-5578
Commit: 40c91c5983c4237b329f0e0b532d8ba2d9ae9743
Parents: 160dab0
Author: Evgenii Zhuravlev <ez...@gridgain.com>
Authored: Tue Aug 1 17:28:45 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Aug 1 17:37:21 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/processors/job/GridJobProcessor.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/40c91c59/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index cc8d903..9052543 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1785,6 +1785,11 @@ public class GridJobProcessor extends GridProcessorAdapter {
if (jobAlwaysActivate) {
if (metricsUpdateFreq > -1L)
updateJobMetrics();
+
+ if (!activeJobs.remove(worker.getJobId(), worker))
+ cancelledJobs.remove(worker.getJobId(), worker);
+
+ heldJobs.remove(worker.getJobId());
}
else {
if (!rwLock.tryReadLock()) {
[11/11] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5578
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
# modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/760adcab
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/760adcab
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/760adcab
Branch: refs/heads/ignite-5578
Commit: 760adcab14d6dcf31998c25ddee5b1cb9e803794
Parents: 2d4b194
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 2 12:03:06 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 2 12:03:06 2017 +0300
----------------------------------------------------------------------
.../dht/GridDhtPartitionTopologyImpl.java | 34 +++++++++++---------
1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/760adcab/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index ab0aec7..bbc3962 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -669,7 +669,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
updateRebalanceVersion(aff);
if (node2part != null && node2part.valid())
- changed |= checkEvictions(updateSeq, aff);
+ changed |= checkEvictions(updateSeq, topVer, aff);
consistencyCheck();
}
@@ -786,15 +786,17 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (loc != null && state == EVICTED) {
locParts.set(p, loc = null);
- if (!belongs)
+ if (!belongs) {
throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.readyTopVer + ']');
+ }
}
- else if (loc != null && state == RENTING && !showRenting)
+ else if (loc != null && state == RENTING && !showRenting) {
throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " +
"evicted [part=" + p + ", shouldBeMoving=" + loc.reload() + ", belongs=" + belongs +
- ", topVer=" + topVer + ", curTopVer=" + this.topVer + "]");
+ ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]");
+ }
if (loc == null) {
if (!belongs)
@@ -1143,13 +1145,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Override public boolean update(
- @Nullable AffinityTopologyVersion exchangeResVer,
+ @Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, T2<Long, Long>> incomeCntrMap,
Set<Integer> partsToReload,
@Nullable AffinityTopologyVersion msgTopVer) {
if (log.isDebugEnabled())
- log.debug("Updating full partition map [exchVer=" + exchangeResVer + ", parts=" + fullMapString() + ']');
+ log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
assert partMap != null;
@@ -1182,20 +1184,20 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- if (exchangeResVer != null) {
+ if (exchangeVer != null) {
// Ignore if exchange already finished or new exchange started.
- if (readyTopVer.compareTo(exchangeResVer) > 0 || lastTopChangeVer.compareTo(exchangeResVer) > 0) {
+ if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer) > 0) {
if (log.isDebugEnabled()) {
log.debug("Stale exchange id for full partition map update (will ignore) [" +
"lastTopChange=" + lastTopChangeVer +
", readTopVer=" + readyTopVer +
- ", exchVer=" + exchangeResVer + ']');
+ ", exchVer=" + exchangeVer + ']');
}
U.warn(log, "Stale exchange id for full partition map update (will ignore) [" +
"lastTopChange=" + lastTopChangeVer +
", readTopVer=" + readyTopVer +
- ", exchVer=" + exchangeResVer + ']');
+ ", exchVer=" + exchangeVer + ']');
return false;
}
@@ -1227,7 +1229,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
fullMapUpdated = true;
if (log.isDebugEnabled()) {
- log.debug("Overriding partition map in full update map [exchVer=" + exchangeResVer +
+ log.debug("Overriding partition map in full update map [exchVer=" + exchangeVer +
", curPart=" + mapString(part) +
", newPart=" + mapString(newPart) + ']');
}
@@ -1264,7 +1266,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!fullMapUpdated) {
if (log.isDebugEnabled()) {
log.debug("No updates for full partition map (will ignore) [lastExch=" + lastTopChangeVer +
- ", exchVer=" + exchangeResVer +
+ ", exchVer=" + exchangeVer +
", curMap=" + node2part +
", newMap=" + partMap + ']');
}
@@ -1272,15 +1274,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
return false;
}
- if (exchangeResVer != null) {
- assert exchangeResVer.compareTo(readyTopVer) >= 0 && exchangeResVer.compareTo(lastTopChangeVer) >= 0;
+ if (exchangeVer != null) {
+ assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer) >= 0;
- lastTopChangeVer = readyTopVer = exchangeResVer;
+ lastTopChangeVer = readyTopVer = exchangeVer;
}
node2part = partMap;
- if (exchangeResVer == null && (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
+ if (exchangeVer == null && (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
[06/11] ignite git commit: IGNITE-5757 - Rent partitions on exchange
completion
Posted by sb...@apache.org.
IGNITE-5757 - Rent partitions on exchange completion
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c6fbe2d8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c6fbe2d8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c6fbe2d8
Branch: refs/heads/ignite-5578
Commit: c6fbe2d82a9f56f96c94551b09e85a12d192f32e
Parents: cb03496
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Aug 2 11:25:08 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Aug 2 11:25:22 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 3 +++
.../dht/GridDhtPartitionTopology.java | 4 +++-
.../dht/GridDhtPartitionTopologyImpl.java | 21 +++++++++-------
.../distributed/dht/GridDhtTxPrepareFuture.java | 25 ++++++++++++++++++--
.../distributed/near/GridNearCacheEntry.java | 2 +-
.../near/GridNearTxPrepareRequest.java | 8 +++----
.../cache/transactions/IgniteTxManager.java | 3 ++-
.../IgniteRejectConnectOnNodeStopTest.java | 7 +++++-
.../testframework/junits/GridAbstractTest.java | 3 ++-
9 files changed, 57 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index edfa950..2ee80a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2088,6 +2088,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
long expireTime = expireTimeExtras();
if (expireTime > 0 && (expireTime - U.currentTimeMillis() <= 0)) {
+ if (obsoleteVer == null)
+ obsoleteVer = nextVersion();
+
if (onExpired(this.val, obsoleteVer)) {
if (cctx.deferredDelete()) {
deferred = true;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 81d92e0..8688c4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -203,7 +203,9 @@ public interface GridDhtPartitionTopology {
* @param p Partition ID.
* @param affAssignment Assignments.
* @param affNodes Node assigned for given partition by affinity.
- * @return Collection of all nodes responsible for this partition with primary node being first.
+ * @return Collection of all nodes responsible for this partition with primary node being first. The first N
+ * elements of this collection (with N being 1 + backups) are actual DHT affinity nodes, other nodes
+ * are current additional owners of the partition after topology change.
*/
@Nullable public List<ClusterNode> nodes(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a8e13a0..880a102 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -380,9 +380,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
}
-
- if (node2part != null && node2part.valid())
- checkEvictions(updateSeq, aff);
}
updateRebalanceVersion(aff);
@@ -624,7 +621,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
- updateRebalanceVersion(grp.affinity().assignments(topVer));
+ List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
+
+ updateRebalanceVersion(aff);
+
+ if (node2part != null && node2part.valid())
+ changed |= checkEvictions(updateSeq, aff);
consistencyCheck();
}
@@ -747,8 +749,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
}
else if (loc != null && state == RENTING && !showRenting)
- throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently evicted " +
- "[part=" + p + ", shouldBeMoving=" + loc.reload() + "]");
+ throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " +
+ "evicted [part=" + p + ", shouldBeMoving=" + loc.reload() + ", belongs=" + belongs +
+ ", topVer=" + topVer + ", curTopVer=" + this.topVer + "]");
if (loc == null) {
if (!belongs)
@@ -1312,7 +1315,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
- changed |= checkEvictions(updateSeq, aff);
+ if (exchangeVer == null)
+ changed |= checkEvictions(updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -1501,7 +1505,8 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = grp.affinity().assignments(topVer);
- changed |= checkEvictions(updateSeq, aff);
+ if (exchId == null)
+ changed |= checkEvictions(updateSeq, aff);
updateRebalanceVersion(aff);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a31c540..03d99fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1510,7 +1510,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
try {
List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
- assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
+ assert !dhtNodes.isEmpty() && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes;
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) +
@@ -1531,7 +1531,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
ClusterNode readerNode = cctx.discovery().node(readerId);
- if (readerNode == null || dhtNodes.contains(readerNode))
+ if (readerNode == null || canSkipNearReader(dht, readerNode, dhtNodes))
continue;
if (log.isDebugEnabled())
@@ -1554,6 +1554,27 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
}
/**
+ * This method checks if we should skip mapping of an entry update to the near reader. We can skip the update
+ * if the reader is a primary or a backup. If the reader is a partition owner, but not a primary or a backup,
+ * we cannot skip the reader update and must attempt to update a near entry anyway.
+ *
+ * @param dhtCache DHT cache to check mapping.
+ * @param readerNode Reader node.
+ * @param dhtNodes Current DHT nodes (primary + backups first and other DHT nodes afterwards).
+ * @return {@code true} if reader is either a primary or a backup.
+ */
+ private boolean canSkipNearReader(GridDhtCacheAdapter<?, ?> dhtCache, ClusterNode readerNode, List<ClusterNode> dhtNodes) {
+ int limit = Math.min(dhtCache.configuration().getBackups() + 1, dhtNodes.size());
+
+ for (int i = 0; i < limit; i++) {
+ if (dhtNodes.get(i).id().equals(readerNode.id()))
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* @param entry Entry.
* @param n Node.
* @param globalMap Map.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 646281b..6e606bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -249,7 +249,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
// If we are here, then we already tried to evict this entry.
// If cannot evict, then update.
if (this.dhtVer == null) {
- if (!markObsolete(dhtVer)) {
+ if (!markObsolete(cctx.versions().next())) {
value(val);
ttlAndExpireTimeExtras((int) ttl, expireTime);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 29c7aad..875f397 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -406,13 +406,13 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
StringBuilder flags = new StringBuilder();
if (near())
- flags.append("near");
+ flags.append("[near]");
if (firstClientRequest())
- flags.append("clientReq");
+ flags.append("[firstClientReq]");
if (implicitSingle())
- flags.append("single");
+ flags.append("[implicitSingle]");
if (explicitLock())
- flags.append("explicitLock");
+ flags.append("[explicitLock]");
return S.toString(GridNearTxPrepareRequest.class, this,
"flags", flags.toString(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index cd68bc9..82692ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -863,7 +863,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
GridNearCacheEntry e = near.peekExx(entry.key());
- if (e != null && e.markObsoleteIfEmpty(tx.xidVersion()))
+ if (e != null && e.markObsoleteIfEmpty(null))
near.removeEntry(e);
}
}
@@ -1191,6 +1191,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
throw new IgniteCheckedException("Missing commit version (consider increasing " +
IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() +
+ ", committed0=" + committed0 +
", tx=" + tx.getClass().getSimpleName() + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
index d34de12..97d685f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteRejectConnectOnNodeStopTest.java
@@ -81,6 +81,11 @@ public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids(true);
+ }
+
/**
* @throws Exception If failed.
*/
@@ -126,7 +131,7 @@ public class IgniteRejectConnectOnNodeStopTest extends GridCommonAbstractTest {
boolean err = false;
- try{
+ try {
stopStartLatch.await();
IgniteCacheMessageRecoveryAbstractTest.closeSessions(srv);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c6fbe2d8/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index 9b99e01..4965d16 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -766,7 +766,8 @@ public abstract class GridAbstractTest extends TestCase {
Thread.sleep(1000);
}
- throw new Exception("Failed to wait for proper topology: " + cnt);
+ throw new Exception("Failed to wait for proper topology [expCnt=" + cnt +
+ ", actualTopology=" + grid(0).cluster().nodes() + ']');
}
/** */
[08/11] ignite git commit: ignite-5712 Context switching for
optimistic transactions
Posted by sb...@apache.org.
ignite-5712 Context switching for optimistic transactions
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/20d9ad5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/20d9ad5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/20d9ad5a
Branch: refs/heads/ignite-5578
Commit: 20d9ad5a4f66e6e38af087cc02dc9f52b94237ec
Parents: c6fbe2d
Author: Nikolay Izhikov <ni...@gmail.com>
Authored: Wed Aug 2 11:52:44 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 2 11:52:44 2017 +0300
----------------------------------------------------------------------
.../ignite/tests/utils/TestTransaction.java | 10 +
.../cache/GridCacheSharedContext.java | 24 +
.../cache/distributed/near/GridNearTxLocal.java | 51 ++
.../store/GridCacheStoreManagerAdapter.java | 10 +
.../cache/transactions/IgniteTxAdapter.java | 20 +-
.../cache/transactions/IgniteTxManager.java | 74 ++
.../cache/transactions/IgniteTxMap.java | 2 +-
.../transactions/TransactionProxyImpl.java | 46 +-
.../apache/ignite/transactions/Transaction.java | 14 +
.../ignite/transactions/TransactionState.java | 7 +-
...ptimisticTxSuspendResumeMultiServerTest.java | 30 +
.../IgniteOptimisticTxSuspendResumeTest.java | 751 +++++++++++++++++++
.../IgnitePessimisticTxSuspendResumeTest.java | 91 +++
.../ignite/testframework/GridTestUtils.java | 26 +
.../cache/GridAbstractCacheStoreSelfTest.java | 10 +
.../testsuites/IgniteCacheTestSuite6.java | 7 +
.../processors/cache/jta/CacheJtaManager.java | 5 +-
.../processors/cache/jta/CacheJtaResource.java | 28 +-
.../GridJtaTransactionManagerSelfTest.java | 208 +++++
.../ignite/testsuites/IgniteJtaTestSuite.java | 3 +
20 files changed, 1402 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
index 4a03d25..e587bd7 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -140,4 +140,14 @@ public class TestTransaction implements Transaction {
@Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public void suspend() throws IgniteException{
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resume() throws IgniteException {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 1876023..82d960a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -944,6 +944,30 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * Suspends transaction. It could be resume later. Supported only for optimistic transactions.
+ *
+ * @param tx Transaction to suspend.
+ * @throws IgniteCheckedException If suspension failed.
+ */
+ public void suspendTx(GridNearTxLocal tx) throws IgniteCheckedException {
+ tx.txState().awaitLastFuture(this);
+
+ tx.suspend();
+ }
+
+ /**
+ * Resume transaction if it was previously suspended.
+ *
+ * @param tx Transaction to resume.
+ * @throws IgniteCheckedException If resume failed.
+ */
+ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
+ tx.txState().awaitLastFuture(this);
+
+ tx.resume();
+ }
+
+ /**
* @return Store session listeners.
*/
@Nullable public Collection<CacheStoreSessionListener> storeSessionListeners() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 58ecee9..55d6bdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -105,12 +105,14 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTED;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
@@ -2851,6 +2853,47 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
}
/**
+ * Suspends transaction. It could be resumed later. Supported only for optimistic transactions.
+ *
+ * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
+ */
+ public void suspend() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Suspend near local tx: " + this);
+
+ if (pessimistic())
+ throw new UnsupportedOperationException("Suspension is not supported for pessimistic transactions.");
+
+ if (threadId() != Thread.currentThread().getId())
+ throw new IgniteCheckedException("Only thread started transaction can suspend it.");
+
+ synchronized (this) {
+ checkValid();
+
+ cctx.tm().suspendTx(this);
+ }
+ }
+
+ /**
+ * Resumes transaction (possibly in another thread) if it was previously suspended.
+ *
+ * @throws IgniteCheckedException If the transaction is in an incorrect state, or timed out.
+ */
+ public void resume() throws IgniteCheckedException {
+ if (log.isDebugEnabled())
+ log.debug("Resume near local tx: " + this);
+
+ if (pessimistic())
+ throw new UnsupportedOperationException("Resume is not supported for pessimistic transactions.");
+
+ synchronized (this) {
+ checkValid();
+
+ cctx.tm().resumeTx(this);
+ }
+ }
+
+ /**
* @param maps Mappings.
*/
void addEntryMapping(@Nullable Collection<GridDistributedTxMapping> maps) {
@@ -3956,6 +3999,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
}
/**
+ * @param threadId new owner of transaction.
+ * @throws IgniteCheckedException if method executed not in the middle of resume or suspend.
+ */
+ public void threadId(long threadId) {
+ this.threadId = threadId;
+ }
+
+ /**
* Post-lock closure.
*
* @param <T> Return type.
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 9cba3dd..83f07fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -1362,6 +1362,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public void suspend() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public long timeout() {
return tx.timeout();
}
@@ -1407,6 +1412,11 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
}
/** {@inheritDoc} */
+ @Override public void resume() throws IgniteException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteAsyncSupport withAsync() {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 91ce3ce..61ca78c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -97,6 +97,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
/**
* Managed transaction adapter.
@@ -977,10 +978,10 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
switch (state) {
case ACTIVE: {
- valid = false;
+ valid = prev == SUSPENDED;
break;
- } // Active is initial state and cannot be transitioned to.
+ }
case PREPARING: {
valid = prev == ACTIVE;
@@ -1025,15 +1026,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
}
case MARKED_ROLLBACK: {
- valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED;
+ valid = prev == ACTIVE || prev == PREPARING || prev == PREPARED || prev == SUSPENDED;
break;
}
case ROLLING_BACK: {
- valid =
- prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING ||
- prev == PREPARED || (prev == COMMITTING && local() && !dht());
+ valid = prev == ACTIVE || prev == MARKED_ROLLBACK || prev == PREPARING ||
+ prev == PREPARED || prev == SUSPENDED || (prev == COMMITTING && local() && !dht());
+
+ break;
+ }
+
+ case SUSPENDED: {
+ valid = prev == ACTIVE;
break;
}
@@ -1064,7 +1070,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
if (valid) {
// Seal transactions maps.
- if (state != ACTIVE)
+ if (state != ACTIVE && state != SUSPENDED)
seal();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 82692ae..20d306c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -113,6 +113,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -2241,6 +2242,79 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * Suspends transaction.
+ * Should not be used directly. Use tx.suspend() instead.
+ *
+ * @param tx Transaction to be suspended.
+ *
+ * @see #resumeTx(GridNearTxLocal)
+ * @see GridNearTxLocal#suspend()
+ * @see GridNearTxLocal#resume()
+ */
+ public void suspendTx(final GridNearTxLocal tx) throws IgniteCheckedException {
+ assert tx != null && !tx.system() : tx;
+
+ if (!tx.state(SUSPENDED)) {
+ throw new IgniteCheckedException("Trying to suspend transaction with incorrect state "
+ + "[expected=" + ACTIVE + ", actual=" + tx.state() + ']');
+ }
+
+ clearThreadMap(tx);
+
+ transactionMap(tx).remove(tx.xidVersion(), tx);
+ }
+
+ /**
+ * Resume transaction in current thread.
+ * Please don't use directly. Use tx.resume() instead.
+ *
+ * @param tx Transaction to be resumed.
+ *
+ * @see #suspendTx(GridNearTxLocal)
+ * @see GridNearTxLocal#suspend()
+ * @see GridNearTxLocal#resume()
+ */
+ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
+ assert tx != null && !tx.system() : tx;
+ assert !threadMap.containsValue(tx) : tx;
+ assert !transactionMap(tx).containsValue(tx) : tx;
+ assert !haveSystemTxForThread(Thread.currentThread().getId());
+
+ if(!tx.state(ACTIVE)) {
+ throw new IgniteCheckedException("Trying to resume transaction with incorrect state "
+ + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
+ }
+
+ long threadId = Thread.currentThread().getId();
+
+ if (threadMap.putIfAbsent(threadId, tx) != null)
+ throw new IgniteCheckedException("Thread already start a transaction.");
+
+ if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
+ throw new IgniteCheckedException("Thread already start a transaction.");
+
+ tx.threadId(threadId);
+ }
+
+ /**
+ * @param threadId Thread id.
+ * @return True if thread have system transaction. False otherwise.
+ */
+ private boolean haveSystemTxForThread(long threadId) {
+ if (!sysThreadMap.isEmpty()) {
+ for (GridCacheContext cacheCtx : cctx.cache().context().cacheContexts()) {
+ if (!cacheCtx.systemTx())
+ continue;
+
+ if (sysThreadMap.containsKey(new TxThreadKey(threadId, cacheCtx.cacheId())))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
* Timeout object for node failure handler.
*/
private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter {
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
index 429c995..6b79550 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java
@@ -190,4 +190,4 @@ public class IgniteTxMap extends AbstractMap<IgniteTxKey, IgniteTxEntry> impleme
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
throw new IllegalStateException("Transaction view map should never be serialized: " + this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 8750cab..f25fc36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -44,6 +44,8 @@ import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.ignite.transactions.TransactionState;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
+
/**
* Cache transaction proxy.
*/
@@ -98,6 +100,18 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
* Enters a call.
*/
private void enter() {
+ enter(false);
+ }
+
+ /**
+ * Enters a call.
+ *
+ * @param resume Flag to indicate that resume operation in progress.
+ */
+ private void enter(boolean resume) {
+ if (!resume && state() == SUSPENDED)
+ throw new IgniteException("Tx in SUSPENDED state. All operations except resume are prohibited.");
+
if (cctx.deploymentEnabled())
cctx.deploy().onEnter();
@@ -204,6 +218,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
/** {@inheritDoc} */
+ @Override public void suspend() throws IgniteException {
+ enter();
+
+ try {
+ cctx.suspendTx(tx);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public long timeout(long timeout) {
return tx.timeout(timeout);
}
@@ -333,6 +362,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
}
}
+ /** {@inheritDoc} */
+ @Override public void resume() throws IgniteException {
+ enter(true);
+
+ try {
+ cctx.resumeTx(tx);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ leave();
+ }
+ }
+
/**
* @param res Result to convert to finished future.
*/
@@ -377,4 +421,4 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza
@Override public String toString() {
return S.toString(TransactionProxyImpl.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
index 57a2b00..a1b4d78 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java
@@ -272,4 +272,18 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
* @throws IgniteException If rollback failed.
*/
public IgniteFuture<Void> rollbackAsync() throws IgniteException;
+
+ /**
+ * Resume transaction if it was previously suspended. <strong>Supported only for optimistic transactions.</strong>
+ *
+ * @throws IgniteException If resume failed.
+ */
+ public void resume() throws IgniteException;
+
+ /**
+ * Suspends transaction. It could be resumed later. <strong>Supported only for optimistic transactions.</strong>
+ *
+ * @throws IgniteException If suspension failed.
+ */
+ public void suspend() throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
index 1980242..d01c0fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
+++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionState.java
@@ -48,7 +48,10 @@ public enum TransactionState {
ROLLED_BACK,
/** Transaction rollback failed or is otherwise unknown state. */
- UNKNOWN;
+ UNKNOWN,
+
+ /** Transaction has been suspended by user. */
+ SUSPENDED;
/** Enumerated values. */
private static final TransactionState[] VALS = values();
@@ -62,4 +65,4 @@ public enum TransactionState {
@Nullable public static TransactionState fromOrdinal(int ord) {
return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
new file mode 100644
index 0000000..a6318d4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeMultiServerTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class IgniteOptimisticTxSuspendResumeMultiServerTest extends IgniteOptimisticTxSuspendResumeTest {
+ /**
+ * @return Number of server nodes.
+ */
+ protected int serversNumber() {
+ return 4;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
new file mode 100644
index 0000000..d16aebd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java
@@ -0,0 +1,751 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionTimeoutException;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
+import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
+
+/**
+ *
+ */
+public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest {
+ /** Transaction timeout. */
+ private static final long TX_TIMEOUT = 100;
+
+ /** Future timeout */
+ private static final int FUT_TIMEOUT = 5000;
+
+ private boolean client = false;
+
+ /**
+ * List of closures to execute transaction operation that prohibited in suspended state.
+ */
+ private static final List<CI1Exc<Transaction>> SUSPENDED_TX_PROHIBITED_OPS = Arrays.asList(
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.suspend();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.close();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.commit();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.commitAsync().get(FUT_TIMEOUT);
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.rollback();
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.rollbackAsync().get(FUT_TIMEOUT);
+ }
+ },
+ new CI1Exc<Transaction>() {
+ @Override public void applyx(Transaction tx) throws Exception {
+ tx.setRollbackOnly();
+ }
+ }
+ );
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(serversNumber());
+
+ if (serversNumber() > 1) {
+ client = true;
+
+ startGrid(serversNumber());
+
+ startGrid(serversNumber() + 1);
+
+ client = false;
+ }
+
+ awaitPartitionMapExchange();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids(true);
+ }
+
+ /**
+ * @return Number of server nodes.
+ */
+ protected int serversNumber() {
+ return 1;
+ }
+
+ /**
+ * Test for transaction starting in one thread, continuing in another.
+ *
+ * @throws Exception If failed.
+ */
+ public void testResumeTxInAnotherThread() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ final AtomicInteger cntr = new AtomicInteger(0);
+
+ cache.put(-1, -1);
+ cache.put(cntr.get(), cntr.getAndIncrement());
+
+ tx.suspend();
+
+ assertEquals(SUSPENDED, tx.state());
+
+ assertNull("Thread already have tx", ignite.transactions().tx());
+
+ assertNull(cache.get(-1));
+ assertNull(cache.get(cntr.get()));
+
+ for (int i = 0; i < 10; i++) {
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ assertEquals(SUSPENDED, tx.state());
+
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ cache.put(cntr.get(), cntr.getAndIncrement());
+
+ tx.suspend();
+ }
+ }).get(FUT_TIMEOUT);
+ }
+
+ tx.resume();
+
+ cache.remove(-1);
+
+ tx.commit();
+
+ assertEquals(COMMITTED, tx.state());
+
+ for (int i = 0; i < cntr.get(); i++)
+ assertEquals(i, (int)cache.get(i));
+
+ assertFalse(cache.containsKey(-1));
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test for transaction starting in one thread, continuing in another, and resuming in initiating thread.
+ * Cache operations performed for a couple of caches.
+ *
+ * @throws Exception If failed.
+ */
+ public void testCrossCacheTxInAnotherThread() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final IgniteCache<Integer, Integer> otherCache =
+ ignite.getOrCreateCache(cacheConfiguration(PARTITIONED, 0, false).setName("otherCache"));
+
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ final AtomicInteger cntr = new AtomicInteger(0);
+
+ cache.put(-1, -1);
+ otherCache.put(-1, -1);
+
+ tx.suspend();
+
+ for (int i = 0; i < 10; i++) {
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ cache.put(cntr.get(), cntr.get());
+ otherCache.put(cntr.get(), cntr.getAndIncrement());
+
+ tx.suspend();
+ }
+ }).get(FUT_TIMEOUT);
+ }
+
+ tx.resume();
+
+ cache.remove(-1);
+ otherCache.remove(-1);
+
+ tx.commit();
+
+ assertEquals(COMMITTED, tx.state());
+
+ for (int i = 0; i < cntr.get(); i++) {
+ assertEquals(i, (int)cache.get(i));
+ assertEquals(i, (int)otherCache.get(i));
+ }
+
+ assertFalse(cache.containsKey(-1));
+ assertFalse(otherCache.containsKey(-1));
+
+ cache.removeAll();
+ otherCache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test for transaction rollback.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTxRollback() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+ cache.put(2, 2);
+
+ tx.suspend();
+
+ assertNull("There is no transaction for current thread", ignite.transactions().tx());
+
+ assertEquals(SUSPENDED, tx.state());
+
+ GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ cache.put(3, 3);
+
+ tx.rollback();
+ }
+ }).get(FUT_TIMEOUT);
+
+ assertEquals(ROLLED_BACK, tx.state());
+
+ assertFalse(cache.containsKey(1));
+ assertFalse(cache.containsKey(2));
+ assertFalse(cache.containsKey(3));
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test for starting and suspending transactions, and then resuming and committing in another thread.
+ *
+ * @throws Exception If failed.
+ */
+ public void testMultiTxSuspendResume() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final List<Transaction> clientTxs = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(i, i);
+
+ tx.suspend();
+
+ clientTxs.add(tx);
+ }
+
+ GridTestUtils.runMultiThreaded(new CI1Exc<Integer>() {
+ public void applyx(Integer idx) throws Exception {
+ Transaction tx = clientTxs.get(idx);
+
+ assertEquals(SUSPENDED, tx.state());
+
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ tx.commit();
+ }
+ }, 10, "th-suspend");
+
+ for (int i = 0; i < 10; i++)
+ assertEquals(i, (int)cache.get(i));
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking all operations(exception resume) on suspended transaction from the other thread are prohibited.
+ *
+ * @throws Exception If failed.
+ */
+ public void testOpsProhibitedOnSuspendedTxFromOtherThread() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (final CI1Exc<Transaction> txOperation : SUSPENDED_TX_PROHIBITED_OPS) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ multithreaded(new RunnableX() {
+ @Override public void runx() throws Exception {
+ GridTestUtils.assertThrowsWithCause(txOperation, tx, IgniteException.class);
+ }
+ }, 1);
+
+ tx.resume();
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking all operations(exception resume) on suspended transaction are prohibited.
+ *
+ * @throws Exception If failed.
+ */
+ public void testOpsProhibitedOnSuspendedTx() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (CI1Exc<Transaction> txOperation : SUSPENDED_TX_PROHIBITED_OPS) {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ GridTestUtils.assertThrowsWithCause(txOperation, tx, IgniteException.class);
+
+ tx.resume();
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking timeout on resumed transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTxTimeoutOnResumed() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation, TX_TIMEOUT, 0);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ Thread.sleep(TX_TIMEOUT * 2);
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.resume();
+
+ return null;
+ }
+ }, TransactionTimeoutException.class);
+
+ assertEquals(MARKED_ROLLBACK, tx.state());
+
+ tx.close();
+ }
+ }
+ });
+ }
+
+ /**
+ * Test checking timeout on suspended transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testTxTimeoutOnSuspend() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation, TX_TIMEOUT, 0);
+
+ cache.put(1, 1);
+
+ Thread.sleep(TX_TIMEOUT * 2);
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.suspend();
+
+ return null;
+ }
+ }, TransactionTimeoutException.class);
+
+ assertEquals(MARKED_ROLLBACK, tx.state());
+
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ });
+ }
+
+ /**
+ * Test start 1 transaction, suspendTx it. And then start another transaction, trying to write
+ * the same key and commit it.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendTxAndStartNew() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation tx1Isolation : TransactionIsolation.values()) {
+ for (TransactionIsolation tx2Isolation : TransactionIsolation.values()) {
+ Transaction tx1 = ignite.transactions().txStart(OPTIMISTIC, tx1Isolation);
+
+ cache.put(1, 1);
+
+ tx1.suspend();
+
+ assertFalse(cache.containsKey(1));
+
+ Transaction tx2 = ignite.transactions().txStart(OPTIMISTIC, tx2Isolation);
+
+ cache.put(1, 2);
+
+ tx2.commit();
+
+ assertEquals(2, (int)cache.get(1));
+
+ tx1.resume();
+
+ assertEquals(1, (int)cache.get(1));
+
+ tx1.close();
+
+ cache.removeAll();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test start 1 transaction, suspendTx it. And then start another transaction, trying to write
+ * the same key.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendTxAndStartNewWithoutCommit() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation tx1Isolation : TransactionIsolation.values()) {
+ for (TransactionIsolation tx2Isolation : TransactionIsolation.values()) {
+ Transaction tx1 = ignite.transactions().txStart(OPTIMISTIC, tx1Isolation);
+
+ cache.put(1, 1);
+
+ tx1.suspend();
+
+ assertFalse(cache.containsKey(1));
+
+ Transaction tx2 = ignite.transactions().txStart(OPTIMISTIC, tx2Isolation);
+
+ cache.put(1, 2);
+
+ tx2.suspend();
+
+ assertFalse(cache.containsKey(1));
+
+ tx1.resume();
+
+ assertEquals(1, (int)cache.get(1));
+
+ tx1.suspend();
+
+ tx2.resume();
+
+ assertEquals(2, (int)cache.get(1));
+
+ tx2.rollback();
+
+ tx1.resume();
+ tx1.rollback();
+
+ cache.removeAll();
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Test we can resume and complete transaction if topology changed while transaction is suspended.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendTxAndResumeAfterTopologyChange() throws Exception {
+ executeTestForAllCaches(new CI2Exc<Ignite, IgniteCache<Integer, Integer>>() {
+ @Override public void applyx(Ignite ignite, final IgniteCache<Integer, Integer> cache) throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ Transaction tx = ignite.transactions().txStart(OPTIMISTIC, isolation);
+
+ cache.put(1, 1);
+
+ tx.suspend();
+
+ assertEquals(SUSPENDED, tx.state());
+
+ try (IgniteEx g = startGrid(serversNumber() + 3)) {
+ tx.resume();
+
+ assertEquals(ACTIVE, tx.state());
+
+ assertEquals(1, (int)cache.get(1));
+
+ tx.commit();
+
+ assertEquals(1, (int)cache.get(1));
+ }
+
+ cache.removeAll();
+ }
+ }
+ });
+ }
+
+ /**
+ * @return Cache configurations to test.
+ */
+ private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() {
+ List<CacheConfiguration<Integer, Integer>> cfgs = new ArrayList<>();
+
+ cfgs.add(cacheConfiguration(PARTITIONED, 0, false));
+ cfgs.add(cacheConfiguration(PARTITIONED, 1, false));
+ cfgs.add(cacheConfiguration(PARTITIONED, 1, true));
+ cfgs.add(cacheConfiguration(REPLICATED, 0, false));
+
+ return cfgs;
+ }
+
+ /**
+ * @param cacheMode Cache mode.
+ * @param backups Number of backups.
+ * @param nearCache If {@code true} near cache is enabled.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(
+ CacheMode cacheMode,
+ int backups,
+ boolean nearCache) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
+
+ ccfg.setCacheMode(cacheMode);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+ if (cacheMode == PARTITIONED)
+ ccfg.setBackups(backups);
+
+ if (nearCache)
+ ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>());
+
+ return ccfg;
+ }
+
+ /**
+ * @param c Closure.
+ * @throws Exception If failed.
+ */
+ private void executeTestForAllCaches(CI2<Ignite, IgniteCache<Integer, Integer>> c) throws Exception {
+ for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+ ignite(0).createCache(ccfg);
+
+ log.info("Run test for cache [cache=" + ccfg.getCacheMode() +
+ ", backups=" + ccfg.getBackups() +
+ ", near=" + (ccfg.getNearConfiguration() != null) + "]");
+
+ int srvNum = serversNumber();
+ if (serversNumber() > 1) {
+ ignite(serversNumber() + 1).createNearCache(ccfg.getName(), new NearCacheConfiguration<>());
+ srvNum += 2;
+ }
+
+ try {
+ for (int i = 0; i < srvNum; i++) {
+ Ignite ignite = ignite(i);
+
+ log.info("Run test for node [node=" + i + ", client=" + ignite.configuration().isClientMode() + ']');
+
+ c.apply(ignite, ignite.<Integer, Integer>cache(ccfg.getName()));
+ }
+ }
+ finally {
+ ignite(0).destroyCache(ccfg.getName());
+ }
+ }
+ }
+
+ /**
+ * Closure with 2 parameters that can throw any exception.
+ *
+ * @param <E1> Type of first closure parameter.
+ * @param <E2> Type of second closure parameter.
+ */
+ public static abstract class CI2Exc<E1, E2> implements CI2<E1, E2> {
+ /**
+ * Closure body.
+ *
+ * @param e1 First closure argument.
+ * @param e2 Second closure argument.
+ * @throws Exception If failed.
+ */
+ public abstract void applyx(E1 e1, E2 e2) throws Exception;
+
+ /** {@inheritdoc} */
+ @Override public void apply(E1 e1, E2 e2) {
+ try {
+ applyx(e1, e2);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Closure that can throw any exception.
+ *
+ * @param <T> Type of closure parameter.
+ */
+ public static abstract class CI1Exc<T> implements CI1<T> {
+ /**
+ * Closure body.
+ *
+ * @param o Closure argument.
+ * @throws Exception If failed.
+ */
+ public abstract void applyx(T o) throws Exception;
+
+ /** {@inheritdoc} */
+ @Override public void apply(T o) {
+ try {
+ applyx(o);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Runnable that can throw any exception.
+ */
+ public static abstract class RunnableX implements Runnable {
+ /**
+ * Closure body.
+ *
+ * @throws Exception If failed.
+ */
+ public abstract void runx() throws Exception;
+
+ /** {@inheritdoc} */
+ @Override public void run() {
+ try {
+ runx();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java
new file mode 100644
index 0000000..57a1470
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgnitePessimisticTxSuspendResumeTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+
+/**
+ *
+ */
+public class IgnitePessimisticTxSuspendResumeTest extends GridCommonAbstractTest {
+ /**
+ * Creates new cache configuration.
+ *
+ * @return CacheConfiguration New cache configuration.
+ */
+ protected CacheConfiguration<Integer, String> getCacheConfiguration() {
+ CacheConfiguration<Integer, String> cacheCfg = defaultCacheConfiguration();
+
+ cacheCfg.setCacheMode(PARTITIONED);
+
+ return cacheCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setClientMode(false);
+ cfg.setCacheConfiguration(getCacheConfiguration());
+
+ return cfg;
+ }
+
+ /**
+ * Test for suspension on pessimistic transaction.
+ *
+ * @throws Exception If failed.
+ */
+ public void testSuspendPessimisticTx() throws Exception {
+ try (Ignite g = startGrid()) {
+ IgniteCache<Integer, String> cache = jcache();
+
+ IgniteTransactions txs = g.transactions();
+
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ final Transaction tx = txs.txStart(TransactionConcurrency.PESSIMISTIC, isolation);
+
+ cache.put(1, "1");
+
+ GridTestUtils.assertThrowsWithCause(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ tx.suspend();
+
+ return null;
+ }
+ }, UnsupportedOperationException.class);
+
+ tx.close();
+
+ assertNull(cache.get(1));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index cbcbaee..585c759 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -426,6 +426,32 @@ public final class GridTestUtils {
}
/**
+ * Checks whether closure throws exception, which is itself of a specified
+ * class, or has a cause of the specified class.
+ *
+ * @param call Closure.
+ * @param p Parameter passed to closure.
+ * @param cls Expected class.
+ * @return Thrown throwable.
+ */
+ public static <P> Throwable assertThrowsWithCause(IgniteInClosure<P> call, P p, Class<? extends Throwable> cls) {
+ assert call != null;
+ assert cls != null;
+
+ try {
+ call.apply(p);
+ }
+ catch (Throwable e) {
+ if (!X.hasCause(e, cls))
+ fail("Exception is neither of a specified class, nor has a cause of the specified class: " + cls, e);
+
+ return e;
+ }
+
+ throw new AssertionError("Exception has not been thrown.");
+ }
+
+ /**
* Throw assertion error with specified error message and initialized cause.
*
* @param msg Error message.
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
index c5673b3..f764212 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/cache/GridAbstractCacheStoreSelfTest.java
@@ -579,6 +579,16 @@ public abstract class GridAbstractCacheStoreSelfTest<T extends CacheStore<Object
}
/** {@inheritDoc} */
+ @Override public void suspend() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resume() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index bb32d24..771e974 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -19,6 +19,9 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.distributed.CachePartitionStateTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeMultiServerTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
/**
* Test suite.
@@ -33,6 +36,10 @@ public class IgniteCacheTestSuite6 extends TestSuite {
suite.addTestSuite(CachePartitionStateTest.class);
+ suite.addTestSuite(IgniteOptimisticTxSuspendResumeTest.class);
+ suite.addTestSuite(IgniteOptimisticTxSuspendResumeMultiServerTest.class);
+ suite.addTestSuite(IgnitePessimisticTxSuspendResumeTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
index 5047491..dd5f6b7 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaManager.java
@@ -28,10 +28,11 @@ import org.apache.ignite.cache.jta.CacheTmLookup;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.transactions.TransactionState.SUSPENDED;
+
/**
* Implementation of {@link CacheJtaManagerAdapter}.
*/
@@ -147,7 +148,7 @@ public class CacheJtaManager extends CacheJtaManagerAdapter {
if (jtaTm != null) {
CacheJtaResource rsrc = this.rsrc.get();
- if (rsrc == null || rsrc.isFinished()) {
+ if (rsrc == null || rsrc.isFinished() || rsrc.cacheTx().state() == SUSPENDED) {
try {
Transaction jtaTx = jtaTm.getTransaction();
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
index c63dafa..5b1b37a 100644
--- a/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
+++ b/modules/jta/src/main/java/org/apache/ignite/internal/processors/cache/jta/CacheJtaResource.java
@@ -75,12 +75,21 @@ final class CacheJtaResource implements XAResource, Synchronization {
}
/** {@inheritDoc} */
- @Override public void start(Xid xid, int flags) {
+ @Override public void start(Xid xid, int flags) throws XAException {
if (log.isDebugEnabled())
log.debug("XA resource start(...) [xid=" + xid + ", flags=<" + flags(flags) + ">]");
// Simply save global transaction id.
this.xid = xid;
+
+ if ((flags & TMRESUME) == TMRESUME) {
+ try {
+ cacheTx.resume();
+ }
+ catch (IgniteCheckedException e) {
+ throwException("Failed to resume cache transaction: " + e.getMessage(), e);
+ }
+ }
}
/**
@@ -132,7 +141,7 @@ final class CacheJtaResource implements XAResource, Synchronization {
}
/** {@inheritDoc} */
- @Override public void end(Xid xid, int flags) {
+ @Override public void end(Xid xid, int flags) throws XAException {
assert this.xid.equals(xid);
if (log.isDebugEnabled())
@@ -140,6 +149,14 @@ final class CacheJtaResource implements XAResource, Synchronization {
if ((flags & TMFAIL) > 0)
cacheTx.setRollbackOnly();
+ else if ((flags & TMSUSPEND) == TMSUSPEND) {
+ try {
+ cacheTx.suspend();
+ }
+ catch (IgniteCheckedException e) {
+ throwException("Failed to suspend cache transaction: " + e.getMessage(), e);
+ }
+ }
}
/** {@inheritDoc} */
@@ -299,6 +316,13 @@ final class CacheJtaResource implements XAResource, Synchronization {
return state == COMMITTED || state == ROLLED_BACK;
}
+ /**
+ * @return Internal tx
+ */
+ GridNearTxLocal cacheTx() {
+ return cacheTx;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheJtaResource.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
new file mode 100644
index 0000000..a181068
--- /dev/null
+++ b/modules/jta/src/test/java/org/apache/ignite/internal/processors/cache/GridJtaTransactionManagerSelfTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import javax.cache.configuration.Factory;
+import javax.transaction.Transaction;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.objectweb.jotm.Current;
+import org.objectweb.jotm.Jotm;
+import org.objectweb.transaction.jta.TransactionManager;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.transactions.TransactionState.ACTIVE;
+
+/**
+ * JTA Tx Manager test.
+ */
+public class GridJtaTransactionManagerSelfTest extends GridCommonAbstractTest {
+ /** Java Open Transaction Manager facade. */
+ private static Jotm jotm;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName).
+ setCacheConfiguration(defaultCacheConfiguration().setCacheMode(PARTITIONED));
+
+ cfg.getTransactionConfiguration().setTxManagerFactory(new Factory<TransactionManager>() {
+ private static final long serialVersionUID = 0L;
+
+ @Override public TransactionManager create() {
+ return jotm.getTransactionManager();
+ }
+ });
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ jotm = new Jotm(true, false);
+
+ Current.setAppServer(false);
+
+ startGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+
+ jotm.stop();
+ }
+
+ /**
+ * Test for switching tx context by JTA Manager.
+ *
+ * @throws Exception If failed.
+ */
+ public void testJtaTxContextSwitch() throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ TransactionConfiguration cfg = grid().context().config().getTransactionConfiguration();
+
+ cfg.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC);
+ cfg.setDefaultTxIsolation(isolation);
+
+ TransactionManager jtaTm = jotm.getTransactionManager();
+
+ IgniteCache<Integer, String> cache = jcache();
+
+ assertNull(grid().transactions().tx());
+
+ jtaTm.begin();
+
+ Transaction tx1 = jtaTm.getTransaction();
+
+ cache.put(1, Integer.toString(1));
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ assertEquals(Integer.toString(1), cache.get(1));
+
+ jtaTm.suspend();
+
+ assertNull(grid().transactions().tx());
+
+ assertNull(cache.get(1));
+
+ jtaTm.begin();
+
+ Transaction tx2 = jtaTm.getTransaction();
+
+ assertNotSame(tx1, tx2);
+
+ cache.put(2, Integer.toString(2));
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ assertEquals(Integer.toString(2), cache.get(2));
+
+ jtaTm.commit();
+
+ assertNull(grid().transactions().tx());
+
+ assertEquals(Integer.toString(2), cache.get(2));
+
+ jtaTm.resume(tx1);
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ cache.put(3, Integer.toString(3));
+
+ jtaTm.commit();
+
+ assertEquals("1", cache.get(1));
+ assertEquals("2", cache.get(2));
+ assertEquals("3", cache.get(3));
+
+ assertNull(grid().transactions().tx());
+
+ cache.removeAll();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJtaTxContextSwitchWithExistingTx() throws Exception {
+ for (TransactionIsolation isolation : TransactionIsolation.values()) {
+ TransactionConfiguration cfg = grid().context().config().getTransactionConfiguration();
+
+ cfg.setDefaultTxConcurrency(TransactionConcurrency.OPTIMISTIC);
+ cfg.setDefaultTxIsolation(isolation);
+
+ TransactionManager jtaTm = jotm.getTransactionManager();
+
+ IgniteCache<Integer, String> cache = jcache();
+
+ jtaTm.begin();
+
+ Transaction tx1 = jtaTm.getTransaction();
+
+ cache.put(1, Integer.toString(1));
+
+ assertNotNull(grid().transactions().tx());
+
+ assertEquals(ACTIVE, grid().transactions().tx().state());
+
+ assertEquals(Integer.toString(1), cache.get(1));
+
+ jtaTm.suspend();
+
+ jtaTm.begin();
+
+ Transaction tx2 = jtaTm.getTransaction();
+
+ assertNotSame(tx1, tx2);
+
+ cache.put(2, Integer.toString(2));
+
+ try {
+ jtaTm.resume(tx1);
+
+ fail("jtaTm.resume shouldn't success.");
+ }
+ catch (IllegalStateException ignored) {
+ // No-op.
+ }
+ finally {
+ jtaTm.rollback(); //rolling back tx2
+ }
+
+ jtaTm.resume(tx1);
+ jtaTm.rollback();
+
+ cache.removeAll();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/20d9ad5a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
index 677f485..3cc7935 100644
--- a/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
+++ b/modules/jta/src/test/java/org/apache/ignite/testsuites/IgniteJtaTestSuite.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.CacheJndiTmFactorySelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheJtaConfigurationValidationSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheJtaFactoryConfigValidationSelfTest;
+import org.apache.ignite.internal.processors.cache.GridJtaTransactionManagerSelfTest;
import org.apache.ignite.internal.processors.cache.jta.GridPartitionedCacheJtaFactorySelfTest;
import org.apache.ignite.internal.processors.cache.jta.GridPartitionedCacheJtaFactoryUseSyncSelfTest;
import org.apache.ignite.internal.processors.cache.jta.GridPartitionedCacheJtaLookupClassNameSelfTest;
@@ -54,6 +55,8 @@ public class IgniteJtaTestSuite extends TestSuite {
suite.addTestSuite(GridCacheJtaConfigurationValidationSelfTest.class);
suite.addTestSuite(GridCacheJtaFactoryConfigValidationSelfTest.class);
+ suite.addTestSuite(GridJtaTransactionManagerSelfTest.class);
+
// Factory
suite.addTestSuite(CacheJndiTmFactorySelfTest.class);
[07/11] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5578
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/977b7528
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/977b7528
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/977b7528
Branch: refs/heads/ignite-5578
Commit: 977b752805f378b45b135b3634b2a6c673e5774d
Parents: ee7062e cb03496
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 2 11:26:00 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 2 11:26:00 2017 +0300
----------------------------------------------------------------------
modules/core/pom.xml | 7 ------
.../cache/query/GridCacheQueryManager.java | 3 ++-
.../processors/job/GridJobProcessor.java | 5 ++++
.../rest/handlers/redis/package-info.java | 22 +++++++++++++++++
.../ignite/internal/util/IgniteUtils.java | 4 ++-
.../internal/util/IgniteUtilsSelfTest.java | 26 ++++++++++++++++++++
6 files changed, 58 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
[02/11] ignite git commit: IGNITE-5882 Duplicated dependency in
pom.xml of core module
Posted by sb...@apache.org.
IGNITE-5882 Duplicated dependency in pom.xml of core module
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5224c9de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5224c9de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5224c9de
Branch: refs/heads/ignite-5578
Commit: 5224c9de4bea8d905bd53cd1699e5da2267f70c4
Parents: bdb838f
Author: daradurvs <da...@gmail.com>
Authored: Tue Aug 1 15:52:37 2017 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Aug 1 15:52:37 2017 +0300
----------------------------------------------------------------------
modules/core/pom.xml | 7 -------
1 file changed, 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5224c9de/modules/core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/core/pom.xml b/modules/core/pom.xml
index 8385dd8..6861ac2 100644
--- a/modules/core/pom.xml
+++ b/modules/core/pom.xml
@@ -159,13 +159,6 @@
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>14.0.1</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.gridgain</groupId>
<artifactId>ignite-shmem</artifactId>
<version>1.0.0</version>