You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/03/04 01:49:16 UTC
[3/6] incubator-ignite git commit: IGNITE-141 - Marshallers
refactoring
IGNITE-141 - Marshallers refactoring
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0fd5967f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0fd5967f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0fd5967f
Branch: refs/heads/ignite-141
Commit: 0fd5967f471228edf89a87e7370e7e2512108f12
Parents: 74078f6
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Mar 3 16:15:22 2015 -0800
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Mar 3 16:15:22 2015 -0800
----------------------------------------------------------------------
.../ignite/internal/GridKernalContext.java | 9 +-
.../ignite/internal/GridKernalContextImpl.java | 10 +
.../apache/ignite/internal/IgniteKernal.java | 2 +
.../org/apache/ignite/internal/IgnitionEx.java | 18 +-
.../managers/communication/GridIoManager.java | 13 +-
.../managers/communication/GridIoPolicy.java | 5 +-
.../processors/cache/GridCacheContext.java | 2 +-
.../GridDistributedTxFinishRequest.java | 28 +-
.../GridDistributedTxPrepareRequest.java | 12 +
.../GridDistributedTxRemoteAdapter.java | 4 +
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +
.../distributed/dht/GridDhtTxFinishFuture.java | 2 +
.../distributed/dht/GridDhtTxFinishRequest.java | 4 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 3 +
.../distributed/dht/GridDhtTxLocalAdapter.java | 6 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 11 +-
.../near/GridNearTransactionalCache.java | 1 +
.../near/GridNearTxFinishFuture.java | 1 +
.../near/GridNearTxFinishRequest.java | 6 +-
.../cache/distributed/near/GridNearTxLocal.java | 3 +
.../distributed/near/GridNearTxRemote.java | 11 +-
.../processors/cache/local/GridLocalTx.java | 206 -----------
.../cache/local/GridLocalTxFuture.java | 351 -------------------
.../cache/transactions/IgniteTxAdapter.java | 12 +-
.../cache/transactions/IgniteTxHandler.java | 6 +-
.../transactions/IgniteTxLocalAdapter.java | 7 +-
.../cache/transactions/IgniteTxManager.java | 2 +
.../junits/GridTestKernalContext.java | 1 +
28 files changed, 152 insertions(+), 586 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index f1a135f..bd6d3be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -287,11 +287,18 @@ public interface GridKernalContext extends Iterable<GridComponent> {
/**
* Gets utility cache pool.
*
- * @return DR pool.
+ * @return Utility cache pool.
*/
public ExecutorService utilityCachePool();
/**
+ * Gets marshaller cache pool.
+ *
+ * @return Marshaller cache pool.
+ */
+ public ExecutorService marshallerCachePool();
+
+ /**
* Gets portable processor.
*
* @return Portable processor.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 8544c60..b63d65b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -288,6 +288,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
private ExecutorService utilityCachePool;
/** */
+ private ExecutorService marshCachePool;
+
+ /** */
private IgniteConfiguration cfg;
/** */
@@ -338,6 +341,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
GridKernalGateway gw,
IgniteExceptionRegistry registry,
ExecutorService utilityCachePool,
+ ExecutorService marshCachePool,
ExecutorService execSvc,
ExecutorService sysExecSvc,
ExecutorService p2pExecSvc,
@@ -353,6 +357,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.gw = gw;
this.registry = registry;
this.utilityCachePool = utilityCachePool;
+ this.marshCachePool = marshCachePool;
this.execSvc = execSvc;
this.sysExecSvc = sysExecSvc;
this.p2pExecSvc = p2pExecSvc;
@@ -695,6 +700,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public ExecutorService marshallerCachePool() {
+ return marshCachePool;
+ }
+
+ /** {@inheritDoc} */
@Override public GridPortableProcessor portable() {
return portableProc;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index a6a5bde..c682a80 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -547,6 +547,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@SuppressWarnings({"CatchGenericClass", "unchecked"})
public void start(final IgniteConfiguration cfg,
ExecutorService utilityCachePool,
+ ExecutorService marshCachePool,
final ExecutorService execSvc,
final ExecutorService sysExecSvc,
ExecutorService p2pExecSvc,
@@ -668,6 +669,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
gw,
new IgniteExceptionRegistry(log),
utilityCachePool,
+ marshCachePool,
execSvc,
sysExecSvc,
p2pExecSvc,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 7844522..d818381 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1165,6 +1165,9 @@ public class IgnitionEx {
/** Utility cache executor service. */
private ExecutorService utilityCacheExecSvc;
+ /** Marshaller cache executor service. */
+ private ExecutorService marshCacheExecSvc;
+
/** Grid state. */
private volatile IgniteState state = STOPPED;
@@ -1385,6 +1388,13 @@ public class IgnitionEx {
DFLT_SYSTEM_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+ marshCacheExecSvc = new IgniteThreadPoolExecutor(
+ "marshaller-cache-" + cfg.getGridName(),
+ DFLT_SYSTEM_CORE_THREAD_CNT,
+ DFLT_SYSTEM_MAX_THREAD_CNT,
+ DFLT_SYSTEM_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+
// Register Ignite MBean for current grid instance.
registerFactoryMbean(myCfg.getMBeanServer());
@@ -1396,8 +1406,8 @@ public class IgnitionEx {
// Init here to make grid available to lifecycle listeners.
grid = grid0;
- grid0.start(myCfg, utilityCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, igfsExecSvc,
- restExecSvc,
+ grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
+ igfsExecSvc, restExecSvc,
new CA() {
@Override public void apply() {
startLatch.countDown();
@@ -2046,6 +2056,10 @@ public class IgnitionEx {
U.shutdownNow(getClass(), utilityCacheExecSvc, log);
utilityCacheExecSvc = null;
+
+ U.shutdownNow(getClass(), marshCacheExecSvc, log);
+
+ marshCacheExecSvc = null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 968e93a..ca84cb1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -83,6 +83,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Utility cache pool. */
private ExecutorService utilityCachePool;
+ /** Marshaller cache pool. */
+ private ExecutorService marshCachePool;
+
/** Discovery listener. */
private GridLocalEventListener discoLsnr;
@@ -188,6 +191,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
sysPool = ctx.getSystemExecutorService();
mgmtPool = ctx.getManagementExecutorService();
utilityCachePool = ctx.utilityCachePool();
+ marshCachePool = ctx.marshallerCachePool();
affPool = Executors.newFixedThreadPool(1);
getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
@@ -498,7 +502,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case SYSTEM_POOL:
case MANAGEMENT_POOL:
case AFFINITY_POOL:
- case UTILITY_CACHE_POOL: {
+ case UTILITY_CACHE_POOL:
+ case MARSH_CACHE_POOL: {
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
else
@@ -534,11 +539,17 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return mgmtPool;
case AFFINITY_POOL:
return affPool;
+
case UTILITY_CACHE_POOL:
assert utilityCachePool != null : "Utility cache pool is not configured.";
return utilityCachePool;
+ case MARSH_CACHE_POOL:
+ assert marshCachePool != null : "Marshaller cache pool is not configured.";
+
+ return marshCachePool;
+
default: {
assert false : "Invalid communication policy: " + plc;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index 78ceab4..6e45043 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -40,7 +40,10 @@ public enum GridIoPolicy {
AFFINITY_POOL,
/** Utility cache execution pool. */
- UTILITY_CACHE_POOL;
+ UTILITY_CACHE_POOL,
+
+ /** Marshaller cache execution pool. */
+ MARSH_CACHE_POOL;
/** Enum values. */
private static final GridIoPolicy[] VALS = values();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index f2e71ab..38b58d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -314,7 +314,7 @@ public class GridCacheContext<K, V> implements Externalizable {
sys = ctx.cache().systemCache(cacheName);
- plc = sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;
+ plc = CU.isMarshallerCache(cacheName) ? MARSH_CACHE_POOL : sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;
Factory<ExpiryPolicy> factory = cacheCfg.getExpiryPolicyFactory();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
index 8f954e8..cb5968d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
@@ -73,9 +74,12 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
/** Group lock key bytes. */
private byte[] grpLockKeyBytes;
- /** System flag. */
+ /** System transaction flag. */
private boolean sys;
+ /** IO policy. */
+ private GridIoPolicy plc;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -90,7 +94,8 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
* @param commitVer Commit version.
* @param commit Commit flag.
* @param invalidate Invalidate flag.
- * @param sys System flag.
+ * @param sys System transaction flag.
+ * @param plc IO policy.
* @param baseVer Base version.
* @param committedVers Committed versions.
* @param rolledbackVers Rolled back versions.
@@ -105,6 +110,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
boolean commit,
boolean invalidate,
boolean sys,
+ GridIoPolicy plc,
boolean syncCommit,
boolean syncRollback,
GridCacheVersion baseVer,
@@ -122,6 +128,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
this.commit = commit;
this.invalidate = invalidate;
this.sys = sys;
+ this.plc = plc;
this.syncCommit = syncCommit;
this.syncRollback = syncRollback;
this.baseVer = baseVer;
@@ -132,13 +139,20 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
}
/**
- * @return System flag.
+ * @return System transaction flag.
*/
public boolean system() {
return sys;
}
/**
+ * @return IO policy.
+ */
+ public GridIoPolicy policy() {
+ return plc;
+ }
+
+ /**
* @return Future ID.
*/
public IgniteUuid futureId() {
@@ -309,7 +323,7 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
writer.incrementState();
case 16:
- if (!writer.writeBoolean("sys", sys))
+ if (!writer.writeByte("sys", plc != null ? (byte)plc.ordinal() : -1))
return false;
writer.incrementState();
@@ -407,11 +421,15 @@ public class GridDistributedTxFinishRequest<K, V> extends GridDistributedBaseMes
reader.incrementState();
case 16:
- sys = reader.readBoolean("sys");
+ byte plcOrd;
+
+ plcOrd = reader.readByte("plc");
if (!reader.isLastRead())
return false;
+ plc = GridIoPolicy.fromOrdinal(plcOrd);
+
reader.incrementState();
case 17:
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 8f3742b..6dced98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
@@ -119,6 +120,9 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
/** System flag. */
private boolean sys;
+ /** IO policy. */
+ private GridIoPolicy plc;
+
/**
* Required by {@link Externalizable}.
*/
@@ -154,6 +158,7 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
invalidate = tx.isInvalidate();
txSize = tx.size();
sys = tx.system();
+ plc = tx.ioPolicy();
this.reads = reads;
this.writes = writes;
@@ -178,6 +183,13 @@ public class GridDistributedTxPrepareRequest<K, V> extends GridDistributedBaseMe
}
/**
+ * @return IO policy.
+ */
+ public GridIoPolicy policy() {
+ return plc;
+ }
+
+ /**
* Adds version to be verified on remote node.
*
* @param key Key for which version is verified.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index dbf82dd..02c4b97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -88,6 +89,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
* @param xidVer XID version.
* @param commitVer Commit version.
* @param sys System flag.
+ * @param plc IO policy.
* @param concurrency Concurrency level (should be pessimistic).
* @param isolation Transaction isolation.
* @param invalidate Invalidate flag.
@@ -104,6 +106,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
GridCacheVersion xidVer,
GridCacheVersion commitVer,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
boolean invalidate,
@@ -120,6 +123,7 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K, V>
ctx.versions().last(),
Thread.currentThread().getId(),
sys,
+ plc,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index 3fa0b89..ca99241 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -197,6 +197,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.version(),
/*commitVer*/null,
ctx.system(),
+ ctx.ioPolicy(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
@@ -790,6 +791,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.implicitTx(),
req.implicitSingleTx(),
ctx.system(),
+ ctx.ioPolicy(),
PESSIMISTIC,
req.isolation(),
req.timeout(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 7dac17b..38705df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -318,6 +318,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
commit,
tx.isInvalidate(),
tx.system(),
+ tx.ioPolicy(),
tx.isSystemInvalidate(),
tx.syncCommit(),
tx.syncRollback(),
@@ -369,6 +370,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
commit,
tx.isInvalidate(),
tx.system(),
+ tx.ioPolicy(),
tx.isSystemInvalidate(),
tx.syncCommit(),
tx.syncRollback(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 2835844..4e84426 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
@@ -111,6 +112,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
boolean commit,
boolean invalidate,
boolean sys,
+ GridIoPolicy plc,
boolean sysInvalidate,
boolean syncCommit,
boolean syncRollback,
@@ -123,7 +125,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
@Nullable UUID subjId,
int taskNameHash
) {
- super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer,
+ super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
committedVers, rolledbackVers, txSize, grpLockKey);
assert miniId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 6aa159c..a77b560 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -105,6 +106,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
boolean implicit,
boolean implicitSingle,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -123,6 +125,7 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
implicit,
implicitSingle,
sys,
+ plc,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 1c71f12..eb5c356 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -95,6 +96,7 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
boolean implicit,
boolean implicitSingle,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -106,8 +108,8 @@ public abstract class GridDhtTxLocalAdapter<K, V> extends IgniteTxLocalAdapter<K
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, sys, concurrency, isolation, timeout, invalidate, storeEnabled,
- txSize, grpLockKey, partLock, subjId, taskNameHash);
+ super(cctx, xidVer, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, invalidate,
+ storeEnabled, txSize, grpLockKey, partLock, subjId, taskNameHash);
assert cctx != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 506888b..d818c1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -89,6 +90,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
GridCacheVersion xidVer,
GridCacheVersion commitVer,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
boolean invalidate,
@@ -100,8 +102,8 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- grpLockKey, subjId, taskNameHash);
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
+ txSize, grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
assert rmtFutId != null;
@@ -149,6 +151,7 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
GridCacheVersion xidVer,
GridCacheVersion commitVer,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
boolean invalidate,
@@ -158,8 +161,8 @@ public class GridDhtTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- grpLockKey, subjId, taskNameHash);
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
+ txSize, grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
assert rmtFutId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 6255588..c25a7a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -292,6 +292,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
req.version(),
null,
ctx.system(),
+ ctx.ioPolicy(),
PESSIMISTIC,
req.isolation(),
req.isInvalidate(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index f3811c6..dddeb23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -350,6 +350,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
commit,
tx.isInvalidate(),
tx.system(),
+ tx.ioPolicy(),
tx.syncCommit(),
tx.syncRollback(),
m.explicitLock(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
index f29cfea..1ac8aed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.tostring.*;
@@ -82,6 +83,7 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
boolean commit,
boolean invalidate,
boolean sys,
+ GridIoPolicy plc,
boolean syncCommit,
boolean syncRollback,
boolean explicitLock,
@@ -93,8 +95,8 @@ public class GridNearTxFinishRequest<K, V> extends GridDistributedTxFinishReques
int txSize,
@Nullable UUID subjId,
int taskNameHash) {
- super(xidVer, futId, null, threadId, commit, invalidate, sys, syncCommit, syncRollback, baseVer, committedVers,
- rolledbackVers, txSize, null);
+ super(xidVer, futId, null, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer,
+ committedVers, rolledbackVers, txSize, null);
this.explicitLock = explicitLock;
this.storeEnabled = storeEnabled;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 06e4767..7a5f9d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -113,6 +114,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
boolean implicit,
boolean implicitSingle,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -130,6 +132,7 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
implicit,
implicitSingle,
sys,
+ plc,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 5f9a0b7..1c69548 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -92,6 +93,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
GridCacheVersion xidVer,
GridCacheVersion commitVer,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
boolean invalidate,
@@ -102,8 +104,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
@Nullable UUID subjId,
int taskNameHash
) throws IgniteCheckedException {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- grpLockKey, subjId, taskNameHash);
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
+ txSize, grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
@@ -149,6 +151,7 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
GridCacheVersion xidVer,
GridCacheVersion commitVer,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
boolean invalidate,
@@ -158,8 +161,8 @@ public class GridNearTxRemote<K, V> extends GridDistributedTxRemoteAdapter<K, V>
@Nullable UUID subjId,
int taskNameHash
) {
- super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, concurrency, isolation, invalidate, timeout, txSize,
- grpLockKey, subjId, taskNameHash);
+ super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
+ txSize, grpLockKey, subjId, taskNameHash);
assert nearNodeId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java
deleted file mode 100644
index 6727f7e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTx.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.local;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.transactions.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.transactions.TransactionState.*;
-
-/**
- * Local cache transaction.
- */
-class GridLocalTx<K, V> extends IgniteTxLocalAdapter<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Transaction future. */
- private final AtomicReference<GridLocalTxFuture<K, V>> fut = new AtomicReference<>();
-
- /**
- * Empty constructor required for {@link Externalizable}.
- */
- public GridLocalTx() {
- // No-op.
- }
-
- /**
- * @param ctx Cache registry.
- * @param implicit {@code True} if transaction is implicitly created by the system,
- * {@code false} if user explicitly created the transaction.
- * @param implicitSingle Implicit with single kye flag.
- * @param concurrency Concurrency.
- * @param isolation Isolation.
- * @param timeout Timeout.
- */
- GridLocalTx(
- GridCacheSharedContext<K, V> ctx,
- boolean implicit,
- boolean implicitSingle,
- TransactionConcurrency concurrency,
- TransactionIsolation isolation,
- long timeout,
- int txSize,
- @Nullable UUID subjId,
- int taskNameHash
- ) {
- super(ctx, ctx.versions().next(), implicit, implicitSingle, false, concurrency, isolation, timeout, false, true,
- txSize, null, false, subjId, taskNameHash);
- }
-
- /** {@inheritDoc} */
- @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) {
- GridLocalTxFuture<K, V> fut = this.fut.get();
-
- return fut != null && fut.onOwnerChanged(entry, owner);
- }
-
- /** {@inheritDoc} */
- @Override public void prepare() throws IgniteCheckedException {
- if (!state(PREPARING)) {
- TransactionState state = state();
-
- // If other thread is doing "prepare", then no-op.
- if (state == PREPARING || state == PREPARED || state == COMMITTING || state == COMMITTED)
- return;
-
- setRollbackOnly();
-
- throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']');
- }
-
- try {
- userPrepare();
-
- state(PREPARED);
- }
- catch (IgniteCheckedException e) {
- setRollbackOnly();
-
- throw e;
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx<K, V>> prepareAsync() {
- try {
- prepare();
-
- return new GridFinishedFuture<IgniteInternalTx<K, V>>(cctx.kernalContext(), this);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(cctx.kernalContext(), e);
- }
- }
-
- /**
- * Commits without prepare.
- *
- * @throws IgniteCheckedException If commit failed.
- */
- void commit0() throws IgniteCheckedException {
- if (state(COMMITTING)) {
- try {
- userCommit();
- }
- finally {
- if (!done()) {
- if (isRollbackOnly()) {
- state(ROLLING_BACK);
-
- userRollback();
-
- state(ROLLED_BACK);
- }
- else
- state(COMMITTED);
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings( {"unchecked", "RedundantCast"})
- @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
- try {
- prepare();
- }
- catch (IgniteCheckedException e) {
- state(UNKNOWN);
-
- return new GridFinishedFuture<>(cctx.kernalContext(), e);
- }
-
- GridLocalTxFuture<K, V> fut = this.fut.get();
-
- if (fut == null) {
- if (this.fut.compareAndSet(null, fut = new GridLocalTxFuture<>(cctx, this))) {
- cctx.mvcc().addFuture(fut);
-
- fut.checkLocks();
-
- return (IgniteInternalFuture)fut;
- }
- }
-
- return (IgniteInternalFuture)this.fut.get();
- }
-
- /** {@inheritDoc} */
- @Override public void rollback() throws IgniteCheckedException {
- rollbackAsync().get();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
- try {
- state(ROLLING_BACK);
-
- userRollback();
-
- state(ROLLED_BACK);
-
- return new GridFinishedFuture<IgniteInternalTx>(cctx.kernalContext(), this);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(cctx.kernalContext(), e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean finish(boolean commit) throws IgniteCheckedException {
- assert false;
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return GridToStringBuilder.toString(GridLocalTx.class, this, "super", super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
deleted file mode 100644
index 66a5eb2..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.local;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.transactions.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.transactions.TransactionState.*;
-
-/**
- * Replicated cache transaction future.
- */
-final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx<K, V>>
- implements GridCacheMvccFuture<K, V, IgniteInternalTx<K, V>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Future ID. */
- private IgniteUuid futId = IgniteUuid.randomUuid();
-
- /** Cache. */
- @GridToStringExclude
- private GridCacheSharedContext<K, V> cctx;
-
- /** Cache transaction. */
- @GridToStringExclude // Need to exclude due to circular dependencies.
- private GridLocalTx<K, V> tx;
-
- /** Error. */
- private AtomicReference<Throwable> err = new AtomicReference<>(null);
-
- /** Commit flag. */
- private AtomicBoolean commit = new AtomicBoolean(false);
-
- /** Logger. */
- @GridToStringExclude
- private IgniteLogger log;
-
- /** Trackable flag. */
- private boolean trackable = true;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridLocalTxFuture() {
- // No-op.
- }
-
- /**
- * @param cctx Context.
- * @param tx Cache transaction.
- */
- GridLocalTxFuture(
- GridCacheSharedContext<K, V> cctx,
- GridLocalTx<K, V> tx) {
- super(cctx.kernalContext());
-
- assert cctx != null;
- assert tx != null;
-
- this.cctx = cctx;
- this.tx = tx;
-
- log = U.logger(ctx, logRef, GridLocalTxFuture.class);
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return tx.xidVersion();
- }
-
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- return Collections.emptyList();
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- // No-op.
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return trackable;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- trackable = false;
- }
-
- /**
- * @return Lock version.
- */
- GridLocalTx<K, V> tx() {
- return tx;
- }
-
- /**
- *
- */
- void complete() {
- onComplete();
- }
-
- /**
- * @param e Error.
- */
- void onError(Throwable e) {
- if (err.compareAndSet(null, e)) {
- tx.setRollbackOnly();
-
- onComplete();
- }
- }
-
- /**
- * @param e Error.
- */
- @SuppressWarnings({"TypeMayBeWeakened"})
- void onError(IgniteTxOptimisticCheckedException e) {
- if (err.compareAndSet(null, e)) {
- tx.setRollbackOnly();
-
- onComplete();
- }
- }
-
- /**
- * @param e Error.
- */
- @SuppressWarnings({"TypeMayBeWeakened"})
- void onError(IgniteTxRollbackCheckedException e) {
- if (err.compareAndSet(null, e)) {
- // Attempt rollback.
- if (tx.setRollbackOnly()) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to rollback the transaction: " + tx, ex);
- }
- }
-
- onComplete();
- }
- }
-
- /**
- * Callback for whenever all replies are received.
- */
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
- void checkLocks() {
- for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) {
- while (true) {
- try {
- GridCacheEntryEx<K, V> entry = txEntry.cached();
-
- if (entry == null) {
- onError(new IgniteTxRollbackCheckedException("Failed to find cache entry for " +
- "transaction key (will rollback) [key=" + txEntry.key() + ", tx=" + tx + ']'));
-
- break;
- }
-
- // Another thread or transaction owns some lock.
- if (!entry.lockedByThread(tx.threadId())) {
- if (tx.pessimistic())
- onError(new IgniteCheckedException("Pessimistic transaction does not own lock for commit: " + tx));
-
- if (log.isDebugEnabled())
- log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + entry +
- ", tx=" + tx + ']');
-
- return;
- }
-
- break; // While.
- }
- // If entry cached within transaction got removed before lock.
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in checkLocks method (will retry): " + txEntry);
-
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes());
- }
- }
- }
-
- commit();
- }
-
- /**
- *
- * @param entry Entry.
- * @param owner Owner.
- */
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
- @Override public boolean onOwnerChanged(GridCacheEntryEx<K, V> entry, GridCacheMvccCandidate<K> owner) {
- if (log.isDebugEnabled())
- log.debug("Transaction future received owner changed callback [owner=" + owner + ", entry=" + entry + ']');
-
- for (IgniteTxEntry<K, V> txEntry : tx.writeMap().values()) {
- while (true) {
- try {
- GridCacheEntryEx<K,V> cached = txEntry.cached();
-
- if (entry == null) {
- onError(new IgniteTxRollbackCheckedException("Failed to find cache entry for " +
- "transaction key (will rollback) [key=" + txEntry.key() + ", tx=" + tx + ']'));
-
- return true;
- }
-
- // Don't compare entry against itself.
- if (cached != entry && !cached.lockedLocally(tx.xidVersion())) {
- if (log.isDebugEnabled())
- log.debug("Transaction entry is not locked by transaction (will wait) [entry=" + entry +
- ", tx=" + tx + ']');
-
- return true;
- }
-
- break;
- }
- // If entry cached within transaction got removed before lock.
- catch (GridCacheEntryRemovedException ignore) {
- if (log.isDebugEnabled())
- log.debug("Got removed entry in onOwnerChanged method (will retry): " + txEntry);
-
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), txEntry.keyBytes());
- }
- }
- }
-
- commit();
-
- return false;
- }
-
- /**
- * Callback invoked when all locks succeeded.
- */
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
- private void commit() {
- if (commit.compareAndSet(false, true)) {
- try {
- tx.commit0();
-
- onComplete();
- }
- catch (IgniteTxTimeoutCheckedException e) {
- onError(e);
- }
- catch (IgniteCheckedException e) {
- if (tx.state() == UNKNOWN) {
- onError(new IgniteTxHeuristicCheckedException("Commit only partially succeeded " +
- "(entries will be invalidated on remote nodes once transaction timeout passes): " +
- tx, e));
- }
- else {
- onError(new IgniteTxRollbackCheckedException(
- "Failed to commit transaction (will attempt rollback): " + tx, e));
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"ThrowableInstanceNeverThrown"})
- @Override public boolean cancel() {
- if (log.isDebugEnabled())
- log.debug("Attempting to cancel transaction: " + tx);
-
- // Attempt rollback.
- if (onCancelled()) {
- try {
- tx.rollback();
- }
- catch (IgniteCheckedException ex) {
- U.error(log, "Failed to rollback the transaction: " + tx, ex);
- }
-
- if (log.isDebugEnabled())
- log.debug("Transaction was cancelled and rolled back: " + tx);
-
- return true;
- }
-
- return isCancelled();
- }
-
- /**
- * Completeness callback.
- */
- private void onComplete() {
- if (onDone(tx, err.get()))
- cctx.mvcc().removeFuture(this);
- }
-
- /**
- * Checks for errors.
- *
- * @throws IgniteCheckedException If execution failed.
- */
- private void checkError() throws IgniteCheckedException {
- if (err.get() != null)
- throw U.cast(err.get());
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return GridToStringBuilder.toString(GridLocalTxFuture.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index abdb99c..6200593 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -43,7 +43,6 @@ import java.util.concurrent.atomic.*;
import java.util.concurrent.locks.*;
import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
@@ -135,6 +134,9 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
/** System transaction flag. */
private boolean sys;
+ /** IO policy. */
+ private GridIoPolicy plc;
+
/** */
protected boolean onePhaseCommit;
@@ -225,6 +227,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
* @param implicitSingle Implicit with one key flag.
* @param loc Local flag.
* @param sys System transaction flag.
+ * @param plc IO policy.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -238,6 +241,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
boolean implicitSingle,
boolean loc,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -257,6 +261,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
this.implicitSingle = implicitSingle;
this.loc = loc;
this.sys = sys;
+ this.plc = plc;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
@@ -283,6 +288,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
* @param startVer Start version mark.
* @param threadId Thread ID.
* @param sys System transaction flag.
+ * @param plc IO policy.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -296,6 +302,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
GridCacheVersion startVer,
long threadId,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -310,6 +317,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
this.xidVer = xidVer;
this.startVer = startVer;
this.sys = sys;
+ this.plc = plc;
this.concurrency = concurrency;
this.isolation = isolation;
this.timeout = timeout;
@@ -412,7 +420,7 @@ public abstract class IgniteTxAdapter<K, V> extends GridMetadataAwareAdapter
/** {@inheritDoc} */
@Override public GridIoPolicy ioPolicy() {
- return sys ? UTILITY_CACHE_POOL : SYSTEM_POOL;
+ return plc;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a14902d..430f073 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -269,6 +269,7 @@ public class IgniteTxHandler<K, V> {
req.implicitSingle(),
req.implicitSingle(),
req.system(),
+ req.policy(),
req.concurrency(),
req.isolation(),
req.timeout(),
@@ -506,7 +507,7 @@ public class IgniteTxHandler<K, V> {
req.miniId(), new IgniteCheckedException("Transaction has been already completed."));
try {
- ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
+ ctx.io().send(nodeId, res, req.policy());
}
catch (Throwable e) {
// Double-check.
@@ -538,6 +539,7 @@ public class IgniteTxHandler<K, V> {
true,
false, /* we don't know, so assume false. */
req.system(),
+ req.policy(),
PESSIMISTIC,
READ_COMMITTED,
/*timeout */0,
@@ -919,6 +921,7 @@ public class IgniteTxHandler<K, V> {
req.version(),
null,
req.system(),
+ req.policy(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
@@ -1038,6 +1041,7 @@ public class IgniteTxHandler<K, V> {
req.version(),
null,
req.system(),
+ req.policy(),
req.concurrency(),
req.isolation(),
req.isInvalidate(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 8bc5230..ab6721c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.dr.*;
@@ -116,6 +117,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
* {@code false} if it was started explicitly by user.
* @param implicitSingle {@code True} if transaction is implicit with only one key.
* @param sys System flag.
+ * @param plc IO policy.
* @param concurrency Concurrency.
* @param isolation Isolation.
* @param timeout Timeout.
@@ -129,6 +131,7 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
boolean implicit,
boolean implicitSingle,
boolean sys,
+ GridIoPolicy plc,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -140,8 +143,8 @@ public abstract class IgniteTxLocalAdapter<K, V> extends IgniteTxAdapter<K, V>
@Nullable UUID subjId,
int taskNameHash
) {
- super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, concurrency, isolation, timeout, invalidate,
- storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
+ super(cctx, xidVer, implicit, implicitSingle, /*local*/true, sys, plc, concurrency, isolation, timeout,
+ invalidate, storeEnabled, txSize, grpLockKey, subjId, taskNameHash);
assert !partLock || grpLockKey != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index bcfe1c2..b92a542 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions;
import org.apache.ignite.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
@@ -376,6 +377,7 @@ public class IgniteTxManager<K, V> extends GridCacheSharedManagerAdapter<K, V> {
implicit,
implicitSingle,
sysCacheCtx != null,
+ sysCacheCtx != null ? sysCacheCtx.ioPolicy() : GridIoPolicy.SYSTEM_POOL,
concurrency,
isolation,
timeout,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0fd5967f/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index afce47b..4911f1b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -46,6 +46,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
null,
null,
null,
+ null,
null);
GridTestUtils.setFieldValue(grid(), "cfg", config());