You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2014/12/19 04:21:58 UTC
[1/8] incubator-ignite git commit: GG-9141 - Check for cache enlist
in TX
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-1 e248ca73f -> fb1913090
GG-9141 - Check for cache enlist in TX
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a70cfa2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a70cfa2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a70cfa2c
Branch: refs/heads/ignite-1
Commit: a70cfa2c24cd56656f09e3717326a94decd2c544
Parents: e248ca7
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Tue Dec 16 23:02:17 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Tue Dec 16 23:02:17 2014 -0800
----------------------------------------------------------------------
.../processors/cache/GridCacheContext.java | 1 -
.../cache/GridCacheTxLocalAdapter.java | 27 +++++++++++++++++---
2 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a70cfa2c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index c6cb355..98766fb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -15,7 +15,6 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.portables.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.cloner.*;
import org.gridgain.grid.kernal.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a70cfa2c/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index abb9fef..59f0e97 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -2610,10 +2610,31 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
* cache (e.g. they have different stores).
*/
private void addActiveCache(GridCacheContext<K, V> cacheCtx) throws IgniteCheckedException {
+ int cacheId = cacheCtx.cacheId();
+
// If this is a first cache to work on, capture cache settings.
- if (activeCacheIds.isEmpty() ||
- !activeCacheIds.contains(cacheCtx.cacheId()) && cctx.txCompatible(activeCacheIds, cacheCtx))
- activeCacheIds.add(cacheCtx.cacheId());
+ if (activeCacheIds.isEmpty())
+ activeCacheIds.add(cacheId);
+ // Else check if we can enlist new cache to transaction.
+ else if (!activeCacheIds.contains(cacheId)) {
+ if (!cctx.txCompatible(activeCacheIds, cacheCtx)) {
+ StringBuilder cacheNames = new StringBuilder();
+
+ for (Integer activeCacheId : activeCacheIds) {
+ cacheNames.append(cctx.cacheContext(activeCacheId).name());
+
+ cacheNames.append(", ");
+ }
+
+ cacheNames.setLength(cacheNames.length() - 2);
+
+ throw new IgniteCheckedException("Failed to enlist new cache to existing transaction " +
+ "(cache configurations are not compatible) [activeCaches=[" + cacheNames +
+ "], cacheName=" + cacheCtx.name() + ']');
+ }
+ else
+ activeCacheIds.add(cacheId);
+ }
}
/**
[6/8] incubator-ignite git commit: GG-9141 - Fixing tests
Posted by ag...@apache.org.
GG-9141 - Fixing tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/738e553a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/738e553a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/738e553a
Branch: refs/heads/ignite-1
Commit: 738e553a8b089528fbdb9559df7f0f391ed53d34
Parents: 246bc86
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Dec 18 17:45:16 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Dec 18 17:45:16 2014 -0800
----------------------------------------------------------------------
.../processors/cache/GridCacheSharedContext.java | 17 ++++++++++++++---
.../processors/cache/GridCacheTxLocalAdapter.java | 12 +++++-------
.../cache/distributed/dht/GridDhtTxLocal.java | 2 +-
.../preloader/GridDhtPartitionsExchangeFuture.java | 5 +++--
.../cache/GridCacheInterceptorSelfTestSuite.java | 6 +++---
5 files changed, 26 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738e553a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
index 10256b0..ba2bfe1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
@@ -385,13 +385,24 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param tx Transaction to check.
* @param activeCacheIds Active cache IDs.
* @param cacheCtx Cache context.
* @return {@code True} if cross-cache transaction can include this new cache.
*/
- public boolean txCompatible(Set<Integer> activeCacheIds, GridCacheContext<K, V> cacheCtx) {
- // TODO GG-9141 implement.
- return false;
+ public boolean txCompatible(GridCacheTxEx<K, V> tx, Iterable<Integer> activeCacheIds, GridCacheContext<K, V> cacheCtx) {
+ if (cacheCtx.system() ^ tx.system())
+ return false;
+
+ for (Integer cacheId : activeCacheIds) {
+ GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
+
+ // Check that caches have the same store.
+ if (activeCacheCtx.store().store() != cacheCtx.store().store())
+ return false;
+ }
+
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738e553a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 16ed64f..4af716a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -2612,12 +2612,9 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
private void addActiveCache(GridCacheContext<K, V> cacheCtx) throws IgniteCheckedException {
int cacheId = cacheCtx.cacheId();
- // If this is a first cache to work on, capture cache settings.
- if (activeCacheIds.isEmpty())
- activeCacheIds.add(cacheId);
- // Else check if we can enlist new cache to transaction.
- else if (!activeCacheIds.contains(cacheId)) {
- if (!cctx.txCompatible(activeCacheIds, cacheCtx)) {
+ // Check if we can enlist new cache to transaction.
+ if (!activeCacheIds.contains(cacheId)) {
+ if (!cctx.txCompatible(this, activeCacheIds, cacheCtx)) {
StringBuilder cacheNames = new StringBuilder();
for (Integer activeCacheId : activeCacheIds) {
@@ -2630,7 +2627,8 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
throw new IgniteCheckedException("Failed to enlist new cache to existing transaction " +
"(cache configurations are not compatible) [activeCaches=[" + cacheNames +
- "], cacheName=" + cacheCtx.name() + ']');
+ "], cacheName=" + cacheCtx.name() + ", txSystem=" + system() +
+ ", cacheSystem=" + cacheCtx.system() + ']');
}
else
activeCacheIds.add(cacheId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738e553a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 9fa40d3..c21fc19 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -604,7 +604,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
/** {@inheritDoc} */
@SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
@Override public boolean finish(boolean commit) throws IgniteCheckedException {
- assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate() //|| !isNearEnabled(cctx) TODO GG-9141
+ assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate()
|| onePhaseCommit() || state() == PREPARED :
"Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']';
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738e553a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 65b4522..2c676e2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -612,8 +612,9 @@ public class GridDhtPartitionsExchangeFuture<K, V> extends GridFutureAdapter<Lon
catch (IgniteCheckedException e) {
scheduleRecheck();
- U.error(log, "Failed to send full partition map to nodes (will retry after timeout) [nodes=" +
- F.nodeId8s(rmtNodes) + ", exchangeId=" + exchId + ']', e);
+ if (!X.hasCause(e, InterruptedException.class))
+ U.error(log, "Failed to send full partition map to nodes (will retry after timeout) [nodes=" +
+ F.nodeId8s(rmtNodes) + ", exchangeId=" + exchId + ']', e);
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738e553a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorSelfTestSuite.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorSelfTestSuite.java
index 160210b..01ef118 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorSelfTestSuite.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheInterceptorSelfTestSuite.java
@@ -39,9 +39,9 @@ public class GridCacheInterceptorSelfTestSuite extends TestSuite {
suite.addTestSuite(GridCacheInterceptorSelfTest.class);
suite.addTestSuite(GridCacheInterceptorNearEnabledSelfTest.class);
-// suite.addTestSuite(GridCacheInterceptorWithStoreSelfTest.class);
-// suite.addTestSuite(GridCacheInterceptorReplicatedSelfTest.class);
-// suite.addTestSuite(GridCacheInterceptorReplicatedWithStoreSelfTest.class);
+// suite.addTestSuite(GridCacheInterceptorWithStoreSelfTest.class); TODO GG-9141
+ suite.addTestSuite(GridCacheInterceptorReplicatedSelfTest.class);
+ suite.addTestSuite(GridCacheInterceptorReplicatedWithStoreSelfTest.class);
return suite;
}
[7/8] incubator-ignite git commit: GG-9141 - Enabled passing tests.
Posted by ag...@apache.org.
GG-9141 - Enabled passing tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/cf99caac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/cf99caac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/cf99caac
Branch: refs/heads/ignite-1
Commit: cf99caac3ac652db879ab1ec8670873e5c79c0c2
Parents: 738e553
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Dec 18 18:05:09 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Dec 18 18:05:09 2014 -0800
----------------------------------------------------------------------
.../cache/GridCacheQueryMultiThreadedSelfTest.java | 9 ++++-----
1 file changed, 4 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/cf99caac/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java
index 15d3b2a..1d91607 100644
--- a/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java
+++ b/modules/indexing/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheQueryMultiThreadedSelfTest.java
@@ -17,7 +17,6 @@ import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.spi.swapspace.file.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.eviction.lru.*;
import org.gridgain.grid.cache.query.*;
@@ -227,7 +226,7 @@ public class GridCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@SuppressWarnings({"TooBroadScope"})
- public void _testMultiThreadedSwapUnswapString() throws Exception { // TODO GG-9141
+ public void testMultiThreadedSwapUnswapString() throws Exception {
int threadCnt = 150;
final int keyCnt = 2000;
final int valCnt = 10000;
@@ -307,7 +306,7 @@ public class GridCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@SuppressWarnings({"TooBroadScope"})
- public void _testMultiThreadedSwapUnswapLong() throws Exception { // TODO GG-9141
+ public void testMultiThreadedSwapUnswapLong() throws Exception {
int threadCnt = 150;
final int keyCnt = 2000;
final int valCnt = 10000;
@@ -389,7 +388,7 @@ public class GridCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@SuppressWarnings({"TooBroadScope"})
- public void _testMultiThreadedSwapUnswapLongString() throws Exception { // TODO GG-9141
+ public void testMultiThreadedSwapUnswapLongString() throws Exception {
int threadCnt = 150;
final int keyCnt = 2000;
final int valCnt = 10000;
@@ -472,7 +471,7 @@ public class GridCacheQueryMultiThreadedSelfTest extends GridCommonAbstractTest
* @throws Exception If failed.
*/
@SuppressWarnings({"TooBroadScope"})
- public void _testMultiThreadedSwapUnswapObject() throws Exception { // TODO GG-9141
+ public void testMultiThreadedSwapUnswapObject() throws Exception {
int threadCnt = 50;
final int keyCnt = 4000;
final int valCnt = 10000;
[4/8] incubator-ignite git commit: GG-9141 - Fixes for DR.
Posted by ag...@apache.org.
GG-9141 - Fixes for DR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/72b82b21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/72b82b21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/72b82b21
Branch: refs/heads/ignite-1
Commit: 72b82b21ccf6fcdeb076af4b17bbba66b48ef9cb
Parents: 88a2d8d
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Dec 18 12:33:23 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Dec 18 12:33:23 2014 -0800
----------------------------------------------------------------------
.../kernal/processors/cache/GridCacheSharedContext.java | 10 ++++++++++
.../kernal/processors/cache/GridCacheTxLocalAdapter.java | 6 ++----
.../kernal/processors/cache/GridCacheVersionManager.java | 2 +-
.../grid/cache/spring/GridSpringCacheManager.java | 5 ++---
4 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72b82b21/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
index ab23680..10256b0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheSharedContext.java
@@ -174,6 +174,16 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @return Data center ID.
+ */
+ public byte dataCenterId() {
+ // Data center ID is same for all caches, so grab the first one.
+ GridCacheContext<K, V> cacheCtx = F.first(cacheContexts());
+
+ return cacheCtx.dataCenterId();
+ }
+
+ /**
* @return Compound preloaders start future.
*/
public IgniteFuture<Object> preloadersStartFuture() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72b82b21/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 384b243..16ed64f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -706,7 +706,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
txEntry.filters(),
cached.detached() ? DR_NONE : drType,
txEntry.drExpireTime(),
- near() ? null : explicitVer,
+ cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName());
@@ -742,7 +742,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
topVer,
txEntry.filters(),
cached.detached() ? DR_NONE : drType,
- near() ? null : explicitVer,
+ cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName());
@@ -769,8 +769,6 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
nearCached.innerReload(CU.<K, V>empty());
}
else if (op == READ) {
- assert near();
-
if (log.isDebugEnabled())
log.debug("Ignoring READ entry when committing: " + txEntry);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72b82b21/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheVersionManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheVersionManager.java
index 3c2d8e7..000b99b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheVersionManager.java
@@ -79,7 +79,7 @@ public class GridCacheVersionManager<K, V> extends GridCacheSharedManagerAdapter
@Override public void start0() throws IgniteCheckedException {
txSerEnabled = cctx.gridConfig().getTransactionsConfiguration().isTxSerializableEnabled();
- dataCenterId = 0; //cctx.dataCenterId(); TODO GG-9141 Grab data center ID from DR manager.
+ dataCenterId = cctx.dataCenterId();
last = new GridCacheVersion(0, 0, order.get(), 0, dataCenterId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/72b82b21/modules/spring/src/main/java/org/gridgain/grid/cache/spring/GridSpringCacheManager.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/gridgain/grid/cache/spring/GridSpringCacheManager.java b/modules/spring/src/main/java/org/gridgain/grid/cache/spring/GridSpringCacheManager.java
index daad4d3..4edd808 100644
--- a/modules/spring/src/main/java/org/gridgain/grid/cache/spring/GridSpringCacheManager.java
+++ b/modules/spring/src/main/java/org/gridgain/grid/cache/spring/GridSpringCacheManager.java
@@ -13,7 +13,6 @@ import org.apache.ignite.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.resources.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -67,7 +66,7 @@ import java.util.*;
* <cache:annotation-driven/>
* </beans>
* </pre>
- * Or you can provide a {@link org.apache.ignite.configuration.IgniteConfiguration} bean, like below:
+ * Or you can provide a {@link IgniteConfiguration} bean, like below:
* <pre name="code" class="xml">
* <beans xmlns="http://www.springframework.org/schema/beans"
* xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
@@ -115,7 +114,7 @@ import java.util.*;
* for node startup.
* <p>
* If neither {@link #setConfigurationPath(String) configurationPath},
- * {@link #setConfiguration(org.apache.ignite.configuration.IgniteConfiguration) configuration}, nor
+ * {@link #setConfiguration(IgniteConfiguration) configuration}, nor
* {@link #setGridName(String) gridName} are provided, cache manager
* will try to use default Grid instance (the one with the {@code null}
* name). If it doesn't exist, exception will be thrown.
[8/8] incubator-ignite git commit: GG-9141 - Fixed preloading test.
Posted by ag...@apache.org.
GG-9141 - Fixed preloading test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb191309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb191309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb191309
Branch: refs/heads/ignite-1
Commit: fb19130908792ecaec93aa9fd5c9008230c25524
Parents: cf99caa
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Dec 18 19:21:43 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Dec 18 19:21:43 2014 -0800
----------------------------------------------------------------------
.../kernal/processors/cache/GridCachePartitionExchangeManager.java | 2 +-
.../replicated/preloader/GridCacheReplicatedPreloadSelfTest.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb191309/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
index 8ca5911..15703a0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
@@ -800,7 +800,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
changed |= cacheCtx.topology().afterExchange(exchFut.exchangeId());
// Preload event notification.
- if (cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_STARTED)) {
+ if (!cacheCtx.system() && cctx.gridEvents().isRecordable(EVT_CACHE_PRELOAD_STARTED)) {
if (!cacheCtx.isReplicated() || !startEvtFired) {
IgniteDiscoveryEvent discoEvt = exchFut.discoveryEvent();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb191309/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
index 82a56a3..c5bb91e 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadSelfTest.java
@@ -139,7 +139,7 @@ public class GridCacheReplicatedPreloadSelfTest extends GridCommonAbstractTest {
* @throws Exception If test failed.
*/
@SuppressWarnings({"BusyWait"})
- public void _testIntegrity() throws Exception { // TODO GG-9141
+ public void testIntegrity() throws Exception {
preloadMode = SYNC;
try {
[5/8] incubator-ignite git commit: GG-9141 - Fixes for DR.
Posted by ag...@apache.org.
GG-9141 - Fixes for DR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/246bc860
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/246bc860
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/246bc860
Branch: refs/heads/ignite-1
Commit: 246bc860cb477e8ba7061546aa7c9c55bf9c6729
Parents: 72b82b2
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Thu Dec 18 16:16:27 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Thu Dec 18 16:16:27 2014 -0800
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 6 ++++-
.../processors/cache/GridCacheMvccManager.java | 4 ++--
.../GridCachePartitionExchangeManager.java | 10 ++++++---
.../processors/cache/GridCacheProcessor.java | 23 ++++++++++----------
4 files changed, 26 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246bc860/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index b5bd597..92b5e62 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -3136,7 +3136,11 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
/** {@inheritDoc} */
@Override public GridCacheTx txStart(GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation, long timeout, int txSize) throws IllegalStateException {
- return ctx.kernalContext().cache().transactions().txStart(concurrency, isolation, timeout, txSize);
+ IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions();
+
+ return ctx.system() ?
+ txs.txStartSystem(concurrency, isolation, timeout, txSize) :
+ txs.txStart(concurrency, isolation, timeout, txSize);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246bc860/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMvccManager.java
index 10a953b..93bf3fa 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheMvccManager.java
@@ -13,7 +13,6 @@ import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.managers.discovery.*;
import org.gridgain.grid.kernal.managers.eventstorage.*;
@@ -408,9 +407,10 @@ public class GridCacheMvccManager<K, V> extends GridCacheSharedManagerAdapter<K,
// Close window in case of node is gone before the future got added to
// the map of futures.
- for (ClusterNode n : fut.nodes())
+ for (ClusterNode n : fut.nodes()) {
if (cctx.discovery().node(n.id()) == null)
fut.onNodeLeft(n.id());
+ }
// Just in case if future was complete before it was added.
if (fut.isDone())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246bc860/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
index 201b960..8ca5911 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCachePartitionExchangeManager.java
@@ -548,9 +548,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param exchFut Exchange.
*/
public void onExchangeDone(GridDhtPartitionsExchangeFuture<K, V> exchFut) {
- for (GridDhtPartitionsExchangeFuture<K, V> fut : exchFuts.values()) {
- if (fut.exchangeId().topologyVersion() < exchFut.exchangeId().topologyVersion() - 10)
- fut.cleanUp();
+ ExchangeFutureSet exchFuts0 = exchFuts;
+
+ if (exchFuts0 != null) {
+ for (GridDhtPartitionsExchangeFuture<K, V> fut : exchFuts0.values()) {
+ if (fut.exchangeId().topologyVersion() < exchFut.exchangeId().topologyVersion() - 10)
+ fut.cleanUp();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246bc860/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
index 1c08267..9f11834 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheProcessor.java
@@ -15,6 +15,7 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.fs.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.spi.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.affinity.*;
@@ -1292,15 +1293,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
-
- for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
- it.hasPrevious();) {
- GridCacheSharedManager<?, ?> mgr = it.previous();
-
- mgr.onKernalStop(cancel);
- }
-
for (GridCacheAdapter<?, ?> cache : stopSeq) {
GridCacheContext ctx = cache.context();
@@ -1331,6 +1323,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cache.onKernalStop();
}
+
+ List<? extends GridCacheSharedManager<?, ?>> sharedMgrs = sharedCtx.managers();
+
+ for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = sharedMgrs.listIterator(sharedMgrs.size());
+ it.hasPrevious();) {
+ GridCacheSharedManager<?, ?> mgr = it.previous();
+
+ mgr.onKernalStop(cancel);
+ }
}
/** {@inheritDoc} */
@@ -1662,7 +1663,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @return Transactions interface implementation.
*/
- public IgniteTransactions transactions() {
+ public IgniteTransactionsEx transactions() {
return transactions;
}
@@ -1767,7 +1768,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
/**
* @param ccfg Cache configuration.
* @param objs Extra components.
- * @return Components provided in cache configuration which can implement {@link org.apache.ignite.lifecycle.LifecycleAware} interface.
+ * @return Components provided in cache configuration which can implement {@link LifecycleAware} interface.
*/
private Iterable<Object> lifecycleAwares(GridCacheConfiguration ccfg, Object...objs) {
Collection<Object> ret = new ArrayList<>(7 + objs.length);
[2/8] incubator-ignite git commit: GG-9141 - Fixes for DR
transactions.
Posted by ag...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
index 2b974e9..df4dd58 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -81,6 +81,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
* @param ctx Cache registry.
* @param implicit Implicit flag.
* @param implicitSingle Implicit with one key flag.
+ * @param sys System flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -92,6 +93,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
GridCacheSharedContext<K, V> ctx,
boolean implicit,
boolean implicitSingle,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -104,10 +106,11 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
int taskNameHash
) {
super(
+ ctx,
ctx.versions().next(),
implicit,
implicitSingle,
- ctx,
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 5bc5a3e..9098b73 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -30,6 +30,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
/**
@@ -640,7 +641,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
add(fut); // Append new future.
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 30b7aef..431e134 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -242,19 +242,19 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
}
switch (commState.idx) {
- case 21:
+ case 22:
if (!commState.putGridUuid(futId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putBoolean(last))
return false;
commState.idx++;
- case 23:
+ case 24:
if (lastBackups != null) {
if (commState.it == null) {
if (!commState.putInt(lastBackups.size()))
@@ -281,31 +281,31 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 24:
+ case 25:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 25:
+ case 26:
if (!commState.putBoolean(near))
return false;
commState.idx++;
- case 26:
+ case 27:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 27:
+ case 28:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 28:
+ case 29:
if (!commState.putInt(taskNameHash))
return false;
@@ -325,7 +325,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
return false;
switch (commState.idx) {
- case 21:
+ case 22:
IgniteUuid futId0 = commState.getGridUuid();
if (futId0 == GRID_UUID_NOT_READ)
@@ -335,7 +335,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 22:
+ case 23:
if (buf.remaining() < 1)
return false;
@@ -343,7 +343,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 23:
+ case 24:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -372,7 +372,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 24:
+ case 25:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -382,7 +382,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 25:
+ case 26:
if (buf.remaining() < 1)
return false;
@@ -390,7 +390,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 26:
+ case 27:
if (buf.remaining() < 8)
return false;
@@ -398,7 +398,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 27:
+ case 28:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -408,7 +408,7 @@ public class GridNearTxPrepareRequest<K, V> extends GridDistributedTxPrepareRequ
commState.idx++;
- case 28:
+ case 29:
if (buf.remaining() < 4)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
index aa3546b..938e2d2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -62,6 +62,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -73,24 +74,25 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @throws IgniteCheckedException If unmarshalling failed.
*/
public GridNearTxRemote(
+ GridCacheSharedContext<K, V> ctx,
ClassLoader ldr,
UUID nodeId,
UUID nearNodeId,
long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
Collection<GridCacheTxEntry<K, V>> writeEntries,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) throws IgniteCheckedException {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
@@ -119,6 +121,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -128,23 +131,24 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param grpLockKey Collection of group lock keys if this is a group-lock transaction.
*/
public GridNearTxRemote(
+ GridCacheSharedContext<K, V> ctx,
UUID nodeId,
UUID nearNodeId,
GridCacheVersion nearXidVer,
long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
index cebd888..aac60fd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/local/GridLocalTx.java
@@ -60,8 +60,8 @@ class GridLocalTx<K, V> extends GridCacheTxLocalAdapter<K, V> {
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, ctx.versions().next(), implicit, implicitSingle, concurrency, isolation, timeout, false, true, txSize,
- null, false, subjId, taskNameHash);
+ super(ctx, ctx.versions().next(), implicit, implicitSingle, false, concurrency, isolation, timeout, false, true,
+ txSize, null, false, subjId, taskNameHash);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
index 60eec9e..aece7cf 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -12,6 +12,7 @@ package org.gridgain.grid.kernal.processors.cache.transactions;
import org.apache.ignite.*;
import org.apache.ignite.lang.*;
import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -25,7 +26,7 @@ import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
/**
* Grid transactions implementation.
*/
-public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
+public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx {
/** Cache shared context. */
private GridCacheSharedContext<K, V> cctx;
@@ -44,7 +45,9 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
cfg.getDefaultTxConcurrency(),
cfg.getDefaultTxIsolation(),
cfg.getDefaultTxTimeout(),
- 0);
+ 0,
+ false
+ );
}
/** {@inheritDoc} */
@@ -58,7 +61,8 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
concurrency,
isolation,
cfg.getDefaultTxTimeout(),
- 0
+ 0,
+ false
);
}
@@ -74,7 +78,24 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
concurrency,
isolation,
timeout,
- txSize
+ txSize,
+ false
+ );
+ }
+
+ @Override public GridCacheTx txStartSystem(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation,
+ long timeout, int txSize) {
+ A.notNull(concurrency, "concurrency");
+ A.notNull(isolation, "isolation");
+ A.ensure(timeout >= 0, "timeout cannot be negative");
+ A.ensure(txSize >= 0, "transaction size cannot be negative");
+
+ return txStart0(
+ concurrency,
+ isolation,
+ timeout,
+ txSize,
+ true
);
}
@@ -83,10 +104,11 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
* @param isolation Transaction isolation.
* @param timeout Transaction timeout.
* @param txSize Expected transaction size.
+ * @param sys System flag.
* @return Transaction.
*/
private GridCacheTx txStart0(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation,
- long timeout, int txSize) {
+ long timeout, int txSize, boolean sys) {
GridTransactionsConfiguration cfg = cctx.gridConfig().getTransactionsConfiguration();
if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE)
@@ -102,6 +124,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
tx = cctx.tm().newTx(
false,
false,
+ sys,
concurrency,
isolation,
timeout,
@@ -128,7 +151,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
throw new IllegalArgumentException("Failed to find cache with given name (cache is not configured): " +
cacheName);
- return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize);
+ return txStartGroupLock(cache.context(), affinityKey, concurrency, isolation, false, timeout, txSize, false);
}
/** {@inheritDoc} */
@@ -142,7 +165,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
Object grpLockKey = cache.context().affinity().partitionAffinityKey(partId);
- return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize);
+ return txStartGroupLock(cache.context(), grpLockKey, concurrency, isolation, true, timeout, txSize, false);
}
/**
@@ -155,13 +178,14 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
* should be a unique partition-specific key.
* @param timeout Tx timeout.
* @param txSize Expected transaction size.
+ * @param sys System flag.
* @return Started transaction.
* @throws IllegalStateException If other transaction was already started.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("unchecked")
private GridCacheTx txStartGroupLock(GridCacheContext ctx, Object grpLockKey, GridCacheTxConcurrency concurrency,
- GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize)
+ GridCacheTxIsolation isolation, boolean partLock, long timeout, int txSize, boolean sys)
throws IllegalStateException, IgniteCheckedException {
GridCacheTx tx = cctx.tm().userTx();
@@ -172,6 +196,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactions {
GridCacheTxLocalAdapter<K, V> tx0 = cctx.tm().newTx(
false,
false,
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
----------------------------------------------------------------------
diff --git a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
index 6044900..e1e24bb 100644
--- a/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
+++ b/modules/jta/src/main/java/org/gridgain/grid/kernal/processors/cache/jta/GridCacheJtaManager.java
@@ -64,16 +64,17 @@ public class GridCacheJtaManager<K, V> extends GridCacheJtaManagerAdapter<K, V>
.getTransactionsConfiguration();
tx = cctx.tm().newTx(
- false,
- false,
+ /*implicit*/false,
+ /*implicit single*/false,
+ /*system*/false,
tCfg.getDefaultTxConcurrency(),
tCfg.getDefaultTxIsolation(),
tCfg.getDefaultTxTimeout(),
- false,
- true,
- 0,
- /** group lock keys */null,
- /** partition lock */false
+ /*invalidate*/false,
+ /*store enabled*/true,
+ /*tx size*/0,
+ /*group lock keys*/null,
+ /*partition lock*/false
);
}
[3/8] incubator-ignite git commit: GG-9141 - Fixes for DR
transactions.
Posted by ag...@apache.org.
GG-9141 - Fixes for DR transactions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/88a2d8da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/88a2d8da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/88a2d8da
Branch: refs/heads/ignite-1
Commit: 88a2d8da1ca94a6e217c796cc958dfbfc9393eca
Parents: a70cfa2
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Dec 17 18:57:14 2014 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Dec 17 18:57:14 2014 -0800
----------------------------------------------------------------------
.../gridgain/grid/kernal/GridKernalContext.java | 2 +-
.../grid/kernal/IgniteTransactionsEx.java | 33 +++++++++++++
.../processors/cache/GridCacheAdapter.java | 2 +
.../processors/cache/GridCacheContext.java | 26 ++++++----
.../processors/cache/GridCacheIoManager.java | 32 ++++++++-----
.../processors/cache/GridCacheTxAdapter.java | 14 ++++++
.../kernal/processors/cache/GridCacheTxEx.java | 9 ++++
.../processors/cache/GridCacheTxHandler.java | 25 ++++++----
.../cache/GridCacheTxLocalAdapter.java | 4 +-
.../processors/cache/GridCacheTxManager.java | 2 +
.../kernal/processors/cache/GridCacheUtils.java | 13 +++--
.../GridDistributedTxFinishRequest.java | 26 ++++++++++
.../GridDistributedTxPrepareRequest.java | 24 ++++++++++
.../GridDistributedTxRemoteAdapter.java | 3 ++
.../distributed/dht/GridDhtLockFuture.java | 6 +--
.../dht/GridDhtTransactionalCacheAdapter.java | 11 +++--
.../distributed/dht/GridDhtTxFinishFuture.java | 7 ++-
.../distributed/dht/GridDhtTxFinishRequest.java | 50 ++++++++++----------
.../cache/distributed/dht/GridDhtTxLocal.java | 9 ++--
.../distributed/dht/GridDhtTxLocalAdapter.java | 8 ++--
.../distributed/dht/GridDhtTxPrepareFuture.java | 9 ++--
.../dht/GridDhtTxPrepareRequest.java | 48 +++++++++----------
.../cache/distributed/dht/GridDhtTxRemote.java | 12 +++--
.../colocated/GridDhtColocatedLockFuture.java | 5 +-
.../distributed/near/GridNearLockFuture.java | 5 +-
.../near/GridNearTransactionalCache.java | 3 +-
.../near/GridNearTxFinishFuture.java | 5 +-
.../near/GridNearTxFinishRequest.java | 24 +++++-----
.../cache/distributed/near/GridNearTxLocal.java | 5 +-
.../near/GridNearTxPrepareFuture.java | 3 +-
.../near/GridNearTxPrepareRequest.java | 32 ++++++-------
.../distributed/near/GridNearTxRemote.java | 12 +++--
.../processors/cache/local/GridLocalTx.java | 4 +-
.../transactions/IgniteTransactionsImpl.java | 41 ++++++++++++----
.../cache/jta/GridCacheJtaManager.java | 15 +++---
35 files changed, 367 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
index 7b161a4..e1b9d92 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/GridKernalContext.java
@@ -483,7 +483,7 @@ public interface GridKernalContext extends GridMetadataAware, Iterable<GridCompo
/**
* @param name Plugin name.
* @return Plugin provider instance.
- * @throws org.apache.ignite.plugin.PluginNotFoundException If plugin provider for the given name was not found.
+ * @throws PluginNotFoundException If plugin provider for the given name was not found.
*/
public PluginProvider pluginProvider(String name) throws PluginNotFoundException;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java
new file mode 100644
index 0000000..8666e66
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteTransactionsEx.java
@@ -0,0 +1,33 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.kernal;
+
+import org.apache.ignite.*;
+import org.gridgain.grid.cache.*;
+
+/**
+ * Extended interface to work with system transactions.
+ */
+public interface IgniteTransactionsEx extends IgniteTransactions {
+ /**
+ * Starts transaction with specified isolation, concurrency, timeout, invalidation flag,
+ * and number of participating entries.
+ *
+ * @param concurrency Concurrency.
+ * @param isolation Isolation.
+ * @param timeout Timeout.
+ * @param txSize Number of entries participating in transaction (may be approximate).
+ * @return New transaction.
+ * @throws IllegalStateException If transaction is already started by this thread.
+ * @throws UnsupportedOperationException If cache is {@link GridCacheAtomicityMode#ATOMIC}.
+ */
+ public GridCacheTx txStartSystem(GridCacheTxConcurrency concurrency, GridCacheTxIsolation isolation, long timeout,
+ int txSize);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index 720ff36..b5bd597 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -3603,6 +3603,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
tx = ctx.tm().newTx(
true,
op.single(),
+ ctx.system(),
PESSIMISTIC,
READ_COMMITTED,
tCfg.getDefaultTxTimeout(),
@@ -3677,6 +3678,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter im
tx = ctx.tm().newTx(
true,
op.single(),
+ ctx.system(),
PESSIMISTIC,
READ_COMMITTED,
ctx.kernalContext().config().getTransactionsConfiguration().getDefaultTxTimeout(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
index 98766fb..a15713e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheContext.java
@@ -169,6 +169,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Cache ID. */
private int cacheId;
+ /** System cache flag. */
+ private boolean sys;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -274,6 +277,8 @@ public class GridCacheContext<K, V> implements Externalizable {
}
else
cacheId = 1;
+
+ sys = CU.UTILITY_CACHE_NAME.equals(cacheName);
}
/**
@@ -309,6 +314,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return System cache flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
* @param cache Cache.
*/
public void cache(GridCacheAdapter<K, V> cache) {
@@ -928,8 +940,7 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
- * Same as {@link GridFunc#isAll(Object, org.apache.ignite.lang.IgnitePredicate[])}, but safely unwraps
- * exceptions.
+ * Same as {@link GridFunc#isAll(Object, IgnitePredicate[])}, but safely unwraps exceptions.
*
* @param e Element.
* @param p Predicates.
@@ -937,14 +948,13 @@ public class GridCacheContext<K, V> implements Externalizable {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings({"ErrorNotRethrown"})
- public <K, V> boolean isAll(GridCacheEntryEx<K, V> e,
- @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] p) throws IgniteCheckedException {
+ public <K1, V1> boolean isAll(GridCacheEntryEx<K1, V1> e,
+ @Nullable IgnitePredicate<GridCacheEntry<K1, V1>>[] p) throws IgniteCheckedException {
return F.isEmpty(p) || isAll(e.wrap(false), p);
}
/**
- * Same as {@link GridFunc#isAll(Object, org.apache.ignite.lang.IgnitePredicate[])}, but safely unwraps
- * exceptions.
+ * Same as {@link GridFunc#isAll(Object, IgnitePredicate[])}, but safely unwraps exceptions.
*
* @param e Element.
* @param p Predicates.
@@ -1569,7 +1579,7 @@ public class GridCacheContext<K, V> implements Externalizable {
/**
* @param obj Object.
* @return Portable object.
- * @throws org.apache.ignite.portables.PortableException In case of error.
+ * @throws PortableException In case of error.
*/
@Nullable public Object marshalToPortable(@Nullable Object obj) throws PortableException {
assert portableEnabled();
@@ -1634,7 +1644,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param col List to unwrap.
* @return Unwrapped list.
*/
- private ArrayList<Object> unwrapPortables(ArrayList<Object> col) {
+ private Collection<Object> unwrapPortables(ArrayList<Object> col) {
int size = col.size();
for (int i = 0; i < size; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
index a222c32..92cdb9a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheIoManager.java
@@ -65,9 +65,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
/** Deployment enabled. */
private boolean depEnabled;
- /** IO policy. */
- private GridIoPolicy plc;
-
/** Message listener. */
private GridMessageListener lsnr = new GridMessageListener() {
@SuppressWarnings("unchecked")
@@ -132,10 +129,6 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
retryDelay = cctx.gridConfig().getNetworkSendRetryDelay();
retryCnt = cctx.gridConfig().getNetworkSendRetryCount();
- //String cacheName = cctx.name(); TODO GG-9141 how to determine policy?
-
- plc = SYSTEM_POOL; // TODO GG-9141 CU.isDrSystemCache(cacheName) ? DR_POOL : SYSTEM_POOL;
-
depEnabled = cctx.gridDeploy().enabled();
cctx.gridIO().addMessageListener(TOPIC_CACHE, lsnr);
@@ -333,7 +326,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param node Node to send the message to.
* @param msg Message to send.
* @throws IgniteCheckedException If sending failed.
- * @throws org.apache.ignite.cluster.ClusterTopologyException If receiver left.
+ * @throws ClusterTopologyException If receiver left.
*/
public void send(ClusterNode node, GridCacheMessage<K, V> msg) throws IgniteCheckedException {
send(node, msg, SYSTEM_POOL);
@@ -345,7 +338,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
* @param node Node to send the message to.
* @param msg Message to send.
* @throws IgniteCheckedException If sending failed.
- * @throws org.apache.ignite.cluster.ClusterTopologyException If receiver left.
+ * @throws ClusterTopologyException If receiver left.
*/
public void send(ClusterNode node, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
assert !node.isLocal();
@@ -444,7 +437,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
else
msg0 = (GridCacheMessage<K, V>)msg.clone();
- cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, plc);
+ cctx.gridIO().send(nodesView, TOPIC_CACHE, msg0, SYSTEM_POOL);
boolean added = false;
@@ -537,6 +530,23 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
}
/**
+ * Sends communication message.
+ *
+ * @param nodeId ID of node to send the message to.
+ * @param msg Message to send.
+ * @throws IgniteCheckedException If sending failed.
+ */
+ public void send(UUID nodeId, GridCacheMessage<K, V> msg, GridIoPolicy plc) throws IgniteCheckedException {
+ ClusterNode n = cctx.discovery().node(nodeId);
+
+ if (n == null)
+ throw new ClusterTopologyException("Failed to send message because node left grid [node=" + n + ", msg=" +
+ msg + ']');
+
+ send(n, msg, plc);
+ }
+
+ /**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msgId Ordered message ID.
@@ -554,7 +564,7 @@ public class GridCacheIoManager<K, V> extends GridCacheSharedManagerAdapter<K, V
try {
cnt++;
- cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, plc, timeout, false);
+ cctx.gridIO().sendOrderedMessage(node, topic, msgId, msg, SYSTEM_POOL, timeout, false);
if (log.isDebugEnabled())
log.debug("Sent ordered cache message [topic=" + topic + ", msg=" + msg +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
index 7a32afa..ebd862b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxAdapter.java
@@ -117,6 +117,9 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
/** Internal flag. */
protected boolean internal;
+ /** System transaction flag. */
+ private boolean sys;
+
/** */
protected boolean onePhaseCommit;
@@ -202,6 +205,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
* @param implicit Implicit flag.
* @param implicitSingle Implicit with one key flag.
* @param loc Local flag.
+ * @param sys System transaction flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -214,6 +218,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
boolean implicit,
boolean implicitSingle,
boolean loc,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -232,6 +237,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
this.implicit = implicit;
this.implicitSingle = implicitSingle;
this.loc = loc;
+ this.sys = sys;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
@@ -257,6 +263,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
* @param xidVer Transaction ID.
* @param startVer Start version mark.
* @param threadId Thread ID.
+ * @param sys System transaction flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -269,6 +276,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
GridCacheVersion xidVer,
GridCacheVersion startVer,
long threadId,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -282,6 +290,7 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
this.threadId = threadId;
this.xidVer = xidVer;
this.startVer = startVer;
+ this.sys = sys;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
@@ -394,6 +403,11 @@ public abstract class GridCacheTxAdapter<K, V> extends GridMetadataAwareAdapter
}
/** {@inheritDoc} */
+ @Override public boolean system() {
+ return sys;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean storeUsed() {
return storeEnabled() && store() != null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
index 877c0f1..0dda62b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxEx.java
@@ -54,6 +54,15 @@ public interface GridCacheTxEx<K, V> extends GridCacheTx, GridTimeoutObject {
public boolean storeUsed();
/**
+ * Checks if this is system cache transaction. System transactions are isolated from user transactions
+ * because some of the public API methods may be invoked inside user transactions and internally start
+ * system cache transactions.
+ *
+ * @return {@code True} if transaction is started for system cache.
+ */
+ public boolean system();
+
+ /**
* @return Last recorded topology version.
*/
public long topologyVersion();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
index fa85566..88def0e 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxHandler.java
@@ -27,6 +27,7 @@ import java.util.*;
import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
import static org.gridgain.grid.cache.GridCacheTxIsolation.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheTxEx.FinalizationStatus.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
@@ -240,6 +241,7 @@ public class GridCacheTxHandler<K, V> {
}
else {
tx = new GridDhtTxLocal<>(
+ ctx,
nearNode.id(),
req.version(),
req.futureId(),
@@ -247,7 +249,7 @@ public class GridCacheTxHandler<K, V> {
req.threadId(),
/*implicit*/false,
/*implicit-single*/false,
- ctx,
+ req.system(),
req.concurrency(),
req.isolation(),
req.timeout(),
@@ -468,7 +470,7 @@ public class GridCacheTxHandler<K, V> {
req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
try {
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (Throwable e) {
// Double-check.
@@ -491,6 +493,7 @@ public class GridCacheTxHandler<K, V> {
// Create transaction and add entries.
tx = ctx.tm().onCreated(
new GridDhtTxLocal<>(
+ ctx,
nodeId,
req.version(),
req.futureId(),
@@ -498,7 +501,7 @@ public class GridCacheTxHandler<K, V> {
req.threadId(),
true,
false, /* we don't know, so assume false. */
- ctx,
+ req.system(),
PESSIMISTIC,
READ_COMMITTED,
/*timeout */0,
@@ -666,7 +669,7 @@ public class GridCacheTxHandler<K, V> {
try {
// Reply back to sender.
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
if (e instanceof ClusterTopologyException) {
@@ -864,7 +867,7 @@ public class GridCacheTxHandler<K, V> {
GridCacheMessage<K, V> res = new GridDhtTxFinishResponse<>(req.version(), req.futureId(), req.miniId());
try {
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (Throwable e) {
// Double-check.
@@ -895,6 +898,7 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridDhtTxRemote<>(
+ ctx,
req.nearNodeId(),
req.futureId(),
nodeId,
@@ -902,11 +906,11 @@ public class GridCacheTxHandler<K, V> {
req.topologyVersion(),
req.version(),
req.commitVersion(),
+ req.system(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
req.timeout(),
- ctx,
req.writes() != null ? Math.max(req.writes().size(), req.txSize()) : req.txSize(),
req.groupLockKey(),
req.nearXidVersion(),
@@ -1012,18 +1016,19 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridNearTxRemote<>(
+ ctx,
ldr,
nodeId,
req.nearNodeId(),
req.threadId(),
req.version(),
req.commitVersion(),
+ req.system(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
req.timeout(),
req.nearWrites(),
- ctx,
req.txSize(),
req.groupLockKey(),
req.subjectId(),
@@ -1100,6 +1105,7 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridDhtTxRemote<>(
+ ctx,
req.nearNodeId(),
req.futureId(),
nodeId,
@@ -1109,11 +1115,11 @@ public class GridCacheTxHandler<K, V> {
req.topologyVersion(),
req.version(),
/*commitVer*/null,
+ req.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
0,
- ctx,
req.txSize(),
req.groupLockKey(),
req.subjectId(),
@@ -1241,6 +1247,7 @@ public class GridCacheTxHandler<K, V> {
if (tx == null) {
tx = new GridNearTxRemote<>(
+ ctx,
nodeId,
req.nearNodeId(),
// We can pass null as nearXidVer as transaction will be committed right away.
@@ -1248,11 +1255,11 @@ public class GridCacheTxHandler<K, V> {
req.threadId(),
req.version(),
null,
+ req.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
0,
- ctx,
req.txSize(),
req.groupLockKey(),
req.subjectId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
index 59f0e97..384b243 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxLocalAdapter.java
@@ -94,6 +94,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
* @param implicit {@code True} if transaction was implicitly started by the system,
* {@code false} if it was started explicitly by user.
* @param implicitSingle {@code True} if transaction is implicit with only one key.
+ * @param sys System flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -106,6 +107,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -117,7 +119,7 @@ public abstract class GridCacheTxLocalAdapter<K, V> extends GridCacheTxAdapter<K
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, /*local*/true, concurrency, isolation, timeout, invalidate,
+ super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
assert !partLock || grpLockKey != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
index cc37438..cad83e5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheTxManager.java
@@ -361,6 +361,7 @@ public class GridCacheTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V
public GridCacheTxLocalAdapter<K, V> newTx(
boolean implicit,
boolean implicitSingle,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -377,6 +378,7 @@ public class GridCacheTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V
cctx,
implicit,
implicitSingle,
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
index d28f728..acd81ef 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java
@@ -14,7 +14,6 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.fs.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
@@ -51,9 +50,6 @@ public class GridCacheUtils {
/** Security system cache name. */
public static final String UTILITY_CACHE_NAME = "gg-sys-cache";
- /** Flag to turn off DHT cache for debugging purposes. */
- public static final boolean DHT_ENABLED = true;
-
/** Default mask name. */
private static final String DEFAULT_MASK_NAME = "<default>";
@@ -1522,6 +1518,15 @@ public class GridCacheUtils {
}
/**
+ * @return Cache ID for utility cache.
+ */
+ public static int utilityCacheId() {
+ int hc = UTILITY_CACHE_NAME.hashCode();
+
+ return hc == 0 ? 1 : hc;
+ }
+
+ /**
* Validates that cache key or cache value implements {@link Externalizable}
*
* @param log Logger used to log warning message.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index d2093c2..15d2ce3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -82,6 +82,9 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
/** Group lock key bytes. */
private byte[] grpLockKeyBytes;
+ /** System flag. */
+ private boolean sys;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -96,6 +99,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
* @param commitVer Commit version.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
+ * @param sys System flag.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -112,6 +116,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
long threadId,
boolean commit,
boolean invalidate,
+ boolean sys,
boolean syncCommit,
boolean syncRollback,
GridCacheVersion baseVer,
@@ -130,6 +135,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
this.threadId = threadId;
this.commit = commit;
this.invalidate = invalidate;
+ this.sys = sys;
this.syncCommit = syncCommit;
this.syncRollback = syncRollback;
this.baseVer = baseVer;
@@ -164,6 +170,13 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
}
/**
+ * @return System flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
* @return Future ID.
*/
public IgniteUuid futureId() {
@@ -350,6 +363,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
_clone.txSize = txSize;
_clone.grpLockKey = grpLockKey;
_clone.grpLockKeyBytes = grpLockKeyBytes;
+ _clone.sys = sys;
}
/** {@inheritDoc} */
@@ -482,6 +496,11 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
commState.idx++;
+ case 20:
+ if (!commState.putBoolean(sys))
+ return false;
+
+ commState.idx++;
}
return true;
@@ -642,6 +661,13 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
commState.idx++;
+ case 20:
+ if (buf.remaining() < 1)
+ return false;
+
+ sys = commState.getBoolean();
+
+ commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 791c4f7..6c48627 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -103,6 +103,9 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
/** */
private byte[] txNodesBytes;
+ /** System flag. */
+ private boolean sys;
+
/**
* Required by {@link Externalizable}.
*/
@@ -135,6 +138,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
timeout = tx.timeout();
invalidate = tx.isInvalidate();
txSize = tx.size();
+ sys = tx.system();
this.reads = reads;
this.writes = writes;
@@ -151,6 +155,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
}
/**
+ * @return System flag.
+ */
+ public boolean system() {
+ return sys;
+ }
+
+ /**
* Adds version to be verified on remote node.
*
* @param key Key for which version is verified.
@@ -415,6 +426,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
_clone.txSize = txSize;
_clone.txNodes = txNodes;
_clone.txNodesBytes = txNodesBytes;
+ _clone.sys = sys;
}
/** {@inheritDoc} */
@@ -553,6 +565,11 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
commState.idx++;
+ case 21:
+ if (!commState.putBoolean(sys))
+ return false;
+
+ commState.idx++;
}
return true;
@@ -725,6 +742,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
commState.idx++;
+ case 21:
+ if (buf.remaining() < 1)
+ return false;
+
+ sys = commState.getBoolean();
+
+ commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3cd3e2d..f38d483 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -74,6 +74,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
* @param rmtThreadId Remote thread ID.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -87,6 +88,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
long rmtThreadId,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
@@ -102,6 +104,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends GridCacheTxAdapter<K,
xidVer,
ctx.versions().last(),
Thread.currentThread().getId(),
+ sys,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
index be4153a..0187e5f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
@@ -30,6 +29,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.dr.GridDrType.*;
/**
@@ -862,7 +862,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
if (log.isDebugEnabled())
log.debug("Sending DHT lock request to DHT node [node=" + n.id() + ", req=" + req + ']');
- cctx.io().send(n, req);
+ cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
@@ -924,7 +924,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
log.debug("Sending DHT lock request to near node [node=" + n.id() +
", req=" + req + ']');
- cctx.io().send(n, req);
+ cctx.io().send(n, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException e) {
fut.onResult(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 3f30801..2113a68 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -29,6 +29,7 @@ import java.util.*;
import static org.gridgain.grid.cache.GridCacheTxConcurrency.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
@@ -179,6 +180,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx == null) {
tx = new GridDhtTxRemote<>(
+ ctx.shared(),
req.nodeId(),
req.futureId(),
nodeId,
@@ -187,11 +189,11 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.topologyVersion(),
req.version(),
/*commitVer*/null,
+ ctx.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
req.timeout(),
- ctx.shared(),
req.txSize(),
req.groupLockKey(),
req.subjectId(),
@@ -423,7 +425,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (res != null) {
try {
// Reply back to sender.
- ctx.io().send(nodeId, res);
+ ctx.io().send(nodeId, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ignored) {
U.warn(log, "Failed to send lock reply to remote node because it left grid: " + nodeId);
@@ -748,6 +750,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (req.inTx()) {
if (tx == null) {
tx = new GridDhtTxLocal<>(
+ ctx.shared(),
nearNode.id(),
req.version(),
req.futureId(),
@@ -755,7 +758,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.threadId(),
req.implicitTx(),
req.implicitSingleTx(),
- ctx.shared(),
+ ctx.system(),
PESSIMISTIC,
req.isolation(),
req.timeout(),
@@ -1059,7 +1062,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
try {
// Don't send reply message to this node or if lock was cancelled.
if (!nearNode.id().equals(ctx.nodeId()) && !X.hasCause(err, GridDistributedLockCancelledException.class))
- ctx.io().send(nearNode, res);
+ ctx.io().send(nearNode, res, ctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send lock reply to originating node (will rollback transaction) [node=" +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index a836ff8..d65de8c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
*
@@ -304,6 +305,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.isolation(),
commit,
tx.isInvalidate(),
+ tx.system(),
tx.isSystemInvalidate(),
tx.syncCommit(),
tx.syncRollback(),
@@ -324,7 +326,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
req.writeVersion(tx.writeVersion());
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
if (sync)
res = true;
@@ -361,6 +363,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.isolation(),
commit,
tx.isInvalidate(),
+ tx.system(),
tx.isSystemInvalidate(),
tx.syncCommit(),
tx.syncRollback(),
@@ -381,7 +384,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
req.writeVersion(tx.writeVersion());
try {
- cctx.io().send(nearMapping.node(), req);
+ cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
if (sync)
res = true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index c5db862..21cccf0 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -92,6 +92,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
* @param isolation Transaction isolation.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
+ * @param sys System flag.
* @param sysInvalidate System invalidation flag.
* @param baseVer Base version.
* @param committedVers Committed versions.
@@ -115,6 +116,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
GridCacheTxIsolation isolation,
boolean commit,
boolean invalidate,
+ boolean sys,
boolean sysInvalidate,
boolean syncCommit,
boolean syncRollback,
@@ -131,8 +133,8 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
@Nullable UUID subjId,
int taskNameHash
) {
- super(xidVer, futId, commitVer, threadId, commit, invalidate, syncCommit, syncRollback, baseVer, committedVers,
- rolledbackVers, txSize, writes, recoverWrites, grpLockKey);
+ super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer,
+ committedVers, rolledbackVers, txSize, writes, recoverWrites, grpLockKey);
assert miniId != null;
assert nearNodeId != null;
@@ -322,25 +324,25 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
}
switch (commState.idx) {
- case 20:
+ case 21:
if (!commState.putEnum(isolation))
return false;
commState.idx++;
- case 21:
+ case 22:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putUuid(nearNodeId))
return false;
commState.idx++;
- case 23:
+ case 24:
if (nearWritesBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearWritesBytes.size()))
@@ -367,13 +369,13 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 24:
+ case 25:
if (!commState.putBoolean(onePhaseCommit))
return false;
commState.idx++;
- case 25:
+ case 26:
if (pendingVers != null) {
if (commState.it == null) {
if (!commState.putInt(pendingVers.size()))
@@ -400,31 +402,31 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 26:
+ case 27:
if (!commState.putBoolean(sysInvalidate))
return false;
commState.idx++;
- case 27:
+ case 28:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 28:
+ case 29:
if (!commState.putCacheVersion(writeVer))
return false;
commState.idx++;
- case 29:
+ case 30:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 30:
+ case 31:
if (!commState.putInt(taskNameHash))
return false;
@@ -444,7 +446,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
return false;
switch (commState.idx) {
- case 20:
+ case 21:
if (buf.remaining() < 1)
return false;
@@ -454,7 +456,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 21:
+ case 22:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -464,7 +466,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 22:
+ case 23:
UUID nearNodeId0 = commState.getUuid();
if (nearNodeId0 == UUID_NOT_READ)
@@ -474,7 +476,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 23:
+ case 24:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -503,7 +505,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 24:
+ case 25:
if (buf.remaining() < 1)
return false;
@@ -511,7 +513,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 25:
+ case 26:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -540,7 +542,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 26:
+ case 27:
if (buf.remaining() < 1)
return false;
@@ -548,7 +550,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 27:
+ case 28:
if (buf.remaining() < 8)
return false;
@@ -556,7 +558,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 28:
+ case 29:
GridCacheVersion writeVer0 = commState.getCacheVersion();
if (writeVer0 == CACHE_VER_NOT_READ)
@@ -566,7 +568,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 29:
+ case 30:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -576,7 +578,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 30:
+ case 31:
if (buf.remaining() < 4)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
index b53c1c8..9fa40d3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -26,6 +26,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheUtils.*;
/**
@@ -87,6 +88,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
* @param txNodes Transaction nodes mapping.
*/
public GridDhtTxLocal(
+ GridCacheSharedContext<K, V> cctx,
UUID nearNodeId,
GridCacheVersion nearXidVer,
IgniteUuid nearFutId,
@@ -94,7 +96,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
long nearThreadId,
boolean implicit,
boolean implicitSingle,
- GridCacheSharedContext<K, V> cctx,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -108,10 +110,11 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
int taskNameHash
) {
super(
+ cctx,
cctx.versions().onReceivedAndNext(nearNodeId, nearXidVer),
implicit,
implicitSingle,
- cctx,
+ sys,
concurrency,
isolation,
timeout,
@@ -625,7 +628,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
nearFinMiniId, err);
try {
- cctx.io().send(nearNodeId, res);
+ cctx.io().send(nearNodeId, res, system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ignored) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 461ea04..035f9a2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -69,6 +69,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
* @param implicit Implicit flag.
* @param implicitSingle Implicit-with-single-key flag.
* @param cctx Cache context.
+ * @param sys System flag.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -77,10 +78,11 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
* @param partLock If this is a group-lock transaction and the whole partition should be locked.
*/
protected GridDhtTxLocalAdapter(
+ GridCacheSharedContext<K, V> cctx,
GridCacheVersion xidVer,
boolean implicit,
boolean implicitSingle,
- GridCacheSharedContext<K, V> cctx,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
long timeout,
@@ -92,8 +94,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends GridCacheTxLocalAdapte
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, concurrency, isolation, timeout, invalidate, storeEnabled, txSize,
- grpLockKey, partLock, subjId, taskNameHash);
+ super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, isolation, timeout, invalidate, storeEnabled,
+ txSize, grpLockKey, partLock, subjId, taskNameHash);
assert cctx != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index f872cf9..84c8f0f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
*
@@ -276,7 +277,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
nearMiniId, tx.xidVersion(), Collections.<Integer>emptySet(), t);
try {
- cctx.io().send(tx.nearNodeId(), res);
+ cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e);
@@ -386,7 +387,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
res.pending(localDhtPendingVersions(tx.writeEntries(), min));
- cctx.io().send(tx.nearNodeId(), res);
+ cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
return true;
@@ -676,7 +677,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
//noinspection TryWithIdenticalCatches
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException e) {
fut.onResult(e);
@@ -730,7 +731,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
//noinspection TryWithIdenticalCatches
try {
- cctx.io().send(nearMapping.node(), req);
+ cctx.io().send(nearMapping.node(), req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException e) {
fut.onResult(e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index 61dfe70..e6f9051 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -352,37 +352,37 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
}
switch (commState.idx) {
- case 21:
+ case 22:
if (!commState.putGridUuid(futId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putBitSet(invalidateNearEntries))
return false;
commState.idx++;
- case 23:
+ case 24:
if (!commState.putBoolean(last))
return false;
commState.idx++;
- case 24:
+ case 25:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 25:
+ case 26:
if (!commState.putUuid(nearNodeId))
return false;
commState.idx++;
- case 26:
+ case 27:
if (nearWritesBytes != null) {
if (commState.it == null) {
if (!commState.putInt(nearWritesBytes.size()))
@@ -409,37 +409,37 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 27:
+ case 28:
if (!commState.putCacheVersion(nearXidVer))
return false;
commState.idx++;
- case 28:
+ case 29:
if (!commState.putByteArray(ownedBytes))
return false;
commState.idx++;
- case 29:
+ case 30:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 30:
+ case 31:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 31:
+ case 32:
if (!commState.putInt(taskNameHash))
return false;
commState.idx++;
- case 32:
+ case 33:
if (!commState.putBitSet(preloadKeys))
return false;
@@ -459,7 +459,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
return false;
switch (commState.idx) {
- case 21:
+ case 22:
IgniteUuid futId0 = commState.getGridUuid();
if (futId0 == GRID_UUID_NOT_READ)
@@ -469,7 +469,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 22:
+ case 23:
BitSet invalidateNearEntries0 = commState.getBitSet();
if (invalidateNearEntries0 == BIT_SET_NOT_READ)
@@ -479,7 +479,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 23:
+ case 24:
if (buf.remaining() < 1)
return false;
@@ -487,7 +487,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 24:
+ case 25:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -497,7 +497,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 25:
+ case 26:
UUID nearNodeId0 = commState.getUuid();
if (nearNodeId0 == UUID_NOT_READ)
@@ -507,7 +507,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 26:
+ case 27:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -536,7 +536,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 27:
+ case 28:
GridCacheVersion nearXidVer0 = commState.getCacheVersion();
if (nearXidVer0 == CACHE_VER_NOT_READ)
@@ -546,7 +546,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 28:
+ case 29:
byte[] ownedBytes0 = commState.getByteArray();
if (ownedBytes0 == BYTE_ARR_NOT_READ)
@@ -556,7 +556,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 29:
+ case 30:
if (buf.remaining() < 8)
return false;
@@ -564,7 +564,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 30:
+ case 31:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -574,7 +574,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 31:
+ case 32:
if (buf.remaining() < 4)
return false;
@@ -582,7 +582,7 @@ public class GridDhtTxPrepareRequest<K, V> extends GridDistributedTxPrepareReque
commState.idx++;
- case 32:
+ case 33:
BitSet preloadKeys0 = commState.getBitSet();
if (preloadKeys0 == BIT_SET_NOT_READ)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 3bc41b2..e71b0fb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -59,6 +59,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param topVer Topology version.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -70,6 +71,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param txNodes Transaction nodes mapping.
*/
public GridDhtTxRemote(
+ GridCacheSharedContext<K, V> ctx,
UUID nearNodeId,
IgniteUuid rmtFutId,
UUID nodeId,
@@ -77,11 +79,11 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
long topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
GridCacheVersion nearXidVer,
@@ -89,7 +91,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
@@ -118,6 +120,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param topVer Topology version.
* @param xidVer XID version.
* @param commitVer Commit version.
+ * @param sys System flag.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -127,6 +130,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
* @param grpLockKey Group lock key if transaction is group-lock.
*/
public GridDhtTxRemote(
+ GridCacheSharedContext<K, V> ctx,
UUID nearNodeId,
IgniteUuid rmtFutId,
UUID nodeId,
@@ -135,17 +139,17 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
long topVer,
GridCacheVersion xidVer,
GridCacheVersion commitVer,
+ boolean sys,
GridCacheTxConcurrency concurrency,
GridCacheTxIsolation isolation,
boolean invalidate,
long timeout,
- GridCacheSharedContext<K, V> ctx,
int txSize,
@Nullable GridCacheTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, concurrency, isolation, invalidate, timeout, txSize,
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 535f46e..baa7ead 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -32,6 +32,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
* Colocated cache lock future.
@@ -832,7 +833,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
@@ -847,7 +848,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
index b67229d..f0ce36b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -31,6 +31,7 @@ import java.util.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.events.IgniteEventType.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
/**
* Cache lock future.
@@ -1099,7 +1100,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
@@ -1114,7 +1115,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (log.isDebugEnabled())
log.debug("Sending near lock request [node=" + node.id() + ", req=" + req + ']');
- cctx.io().send(node, req);
+ cctx.io().send(node, req, cctx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
catch (ClusterTopologyException ex) {
assert fut != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 9b4d117..7a71452 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -247,17 +247,18 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (tx == null) {
tx = new GridNearTxRemote<>(
+ ctx.shared(),
nodeId,
req.nearNodeId(),
req.nearXidVersion(),
req.threadId(),
req.version(),
null,
+ ctx.system(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
req.timeout(),
- ctx.shared(),
req.txSize(),
req.groupLockKey(),
req.subjectId(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 176fdd0..4a438bf 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -12,7 +12,6 @@ package org.gridgain.grid.kernal.processors.cache.distributed.near;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.lang.*;
-import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.*;
@@ -28,6 +27,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.gridgain.grid.cache.GridCacheTxState.*;
+import static org.gridgain.grid.kernal.managers.communication.GridIoPolicy.*;
import static org.gridgain.grid.kernal.processors.cache.GridCacheOperation.*;
/**
@@ -338,6 +338,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
tx.threadId(),
commit,
tx.isInvalidate(),
+ tx.system(),
tx.syncCommit(),
tx.syncRollback(),
m.explicitLock(),
@@ -373,7 +374,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
cctx.tm().beforeFinishRemote(n.id(), tx.threadId());
try {
- cctx.io().send(n, req);
+ cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
// If we don't wait for result, then mark future as done.
if (!isSync() && !m.explicitLock())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/88a2d8da/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index 90cdfe6..76976e8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -58,6 +58,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
* @param threadId Thread ID.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
+ * @param sys System flag.
* @param explicitLock Explicit lock flag.
* @param topVer Topology version.
* @param baseVer Base version.
@@ -73,6 +74,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
long threadId,
boolean commit,
boolean invalidate,
+ boolean sys,
boolean syncCommit,
boolean syncRollback,
boolean explicitLock,
@@ -85,7 +87,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
Collection<GridCacheTxEntry<K, V>> recoverEntries,
@Nullable UUID subjId,
int taskNameHash) {
- super(xidVer, futId, null, threadId, commit, invalidate, syncCommit, syncRollback, baseVer, committedVers,
+ super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, committedVers,
rolledbackVers, txSize, writeEntries, recoverEntries, null);
this.explicitLock = explicitLock;
@@ -175,31 +177,31 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
}
switch (commState.idx) {
- case 20:
+ case 21:
if (!commState.putBoolean(explicitLock))
return false;
commState.idx++;
- case 21:
+ case 22:
if (!commState.putGridUuid(miniId))
return false;
commState.idx++;
- case 22:
+ case 23:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 23:
+ case 24:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 24:
+ case 25:
if (!commState.putInt(taskNameHash))
return false;
@@ -219,7 +221,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
return false;
switch (commState.idx) {
- case 20:
+ case 21:
if (buf.remaining() < 1)
return false;
@@ -227,7 +229,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 21:
+ case 22:
IgniteUuid miniId0 = commState.getGridUuid();
if (miniId0 == GRID_UUID_NOT_READ)
@@ -237,7 +239,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 22:
+ case 23:
if (buf.remaining() < 8)
return false;
@@ -245,7 +247,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 23:
+ case 24:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -255,7 +257,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
commState.idx++;
- case 24:
+ case 25:
if (buf.remaining() < 4)
return false;