You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/16 15:26:02 UTC
[7/7] ignite git commit: Internal cache API cleanup.
Internal cache API cleanup.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/decb0c7a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/decb0c7a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/decb0c7a
Branch: refs/heads/ignite-2.0
Commit: decb0c7aa62f9354b25ee0a09ca19b424a688e8b
Parents: 82f016f
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 16 18:25:36 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 16 18:25:36 2017 +0300
----------------------------------------------------------------------
.../ClientAbstractMultiNodeSelfTest.java | 13 +-
.../ignite/internal/IgniteTransactionsEx.java | 8 +-
.../processors/cache/GridCacheAdapter.java | 98 +-
.../processors/cache/GridCacheProxyImpl.java | 6 +-
.../cache/GridCacheSharedContext.java | 11 +-
.../processors/cache/GridCacheUtils.java | 6 +-
.../processors/cache/IgniteInternalCache.java | 5 +-
.../distributed/GridCacheCommittedTxInfo.java | 117 -
.../GridDistributedCacheAdapter.java | 2 +-
.../GridDistributedTxRemoteAdapter.java | 59 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 57 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 126 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 28 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 65 +-
.../dht/colocated/GridDhtColocatedCache.java | 8 +-
.../colocated/GridDhtColocatedLockFuture.java | 7 +-
.../distributed/near/GridNearLockFuture.java | 4 +-
.../distributed/near/GridNearLockRequest.java | 200 +-
.../near/GridNearTransactionalCache.java | 6 +-
.../near/GridNearTxFinishFuture.java | 4 +-
.../cache/distributed/near/GridNearTxLocal.java | 2732 ++++++++++++++++-
.../near/GridNearTxPrepareFutureAdapter.java | 5 +-
.../near/GridNearTxPrepareRequest.java | 2 +-
.../distributed/near/GridNearTxRemote.java | 4 +-
.../store/GridCacheStoreManagerAdapter.java | 142 +-
.../cache/transactions/IgniteInternalTx.java | 80 +-
.../transactions/IgniteTransactionsImpl.java | 12 +-
.../cache/transactions/IgniteTxAdapter.java | 165 +-
.../cache/transactions/IgniteTxEntry.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 67 +-
.../IgniteTxImplicitSingleStateImpl.java | 4 +-
.../transactions/IgniteTxLocalAdapter.java | 2801 ++----------------
.../cache/transactions/IgniteTxLocalEx.java | 145 +-
.../cache/transactions/IgniteTxManager.java | 293 +-
.../cache/transactions/IgniteTxRemoteEx.java | 11 +
.../IgniteTxRemoteStateAdapter.java | 2 +-
.../cache/transactions/IgniteTxState.java | 2 +-
.../cache/transactions/IgniteTxStateImpl.java | 4 +-
.../transactions/TransactionProxyImpl.java | 13 +-
.../datastructures/DataStructuresProcessor.java | 32 +-
.../datastructures/GridCacheAtomicLongImpl.java | 18 +-
.../GridCacheAtomicReferenceImpl.java | 6 +-
.../GridCacheAtomicSequenceImpl.java | 4 +-
.../GridCacheAtomicStampedImpl.java | 6 +-
.../GridCacheCountDownLatchImpl.java | 6 +-
.../datastructures/GridCacheLockImpl.java | 11 +-
.../datastructures/GridCacheSemaphoreImpl.java | 18 +-
.../GridTransactionalCacheQueueImpl.java | 10 +-
.../processors/igfs/IgfsDataManager.java | 55 +-
.../processors/igfs/IgfsMetaManager.java | 73 +-
.../service/GridServiceProcessor.java | 6 +-
.../internal/TestRecordingCommunicationSpi.java | 29 +
.../processors/cache/GridCacheTestStore.java | 2 -
.../cache/IgniteTxConfigCacheSelfTest.java | 4 +-
.../IgniteCacheSystemTransactionsSelfTest.java | 7 +-
.../IgniteTxCachePrimarySyncTest.java | 5 +
...xOriginatingNodeFailureAbstractSelfTest.java | 6 +-
...cOriginatingNodeFailureAbstractSelfTest.java | 4 +-
...ePrimaryNodeFailureRecoveryAbstractTest.java | 9 +-
.../dht/IgniteCacheTxRecoveryRollbackTest.java | 501 ++++
.../GridCachePartitionedTxSalvageSelfTest.java | 7 +-
.../TxOptimisticDeadlockDetectionTest.java | 30 +-
...lockMessageSystemPoolStarvationSelfTest.java | 6 +-
.../IgniteCacheRestartTestSuite2.java | 3 +-
.../IgniteCacheTxRecoverySelfTestSuite.java | 3 +
.../HibernateReadWriteAccessStrategy.java | 12 +-
.../processors/cache/jta/CacheJtaManager.java | 3 +-
.../processors/cache/jta/CacheJtaResource.java | 8 +-
68 files changed, 4148 insertions(+), 4054 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
index 7fb2385..2fba49a 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractMultiNodeSelfTest.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
@@ -480,11 +481,11 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract
@SuppressWarnings("unchecked")
private static class TestCommunicationSpi extends TcpCommunicationSpi {
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
checkSyncFlags((GridIoMessage)msg);
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
/**
@@ -512,13 +513,13 @@ public abstract class ClientAbstractMultiNodeSelfTest extends GridCommonAbstract
IgniteInternalTx t = tm.tx(v);
if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x1"))))
- assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
+ assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode());
else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x2"))))
- assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
+ assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode());
else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x3"))))
- assertEquals("Invalid tx flags: " + t, FULL_ASYNC, t.syncMode());
+ assertEquals("Invalid tx flags: " + t, FULL_ASYNC, ((IgniteTxLocalAdapter)t).syncMode());
else if (t.hasWriteKey(cacheCtx.txKey(cacheCtx.toCacheKeyObject("x4"))))
- assertEquals("Invalid tx flags: " + t, FULL_SYNC, t.syncMode());
+ assertEquals("Invalid tx flags: " + t, FULL_SYNC, ((IgniteTxLocalAdapter)t).syncMode());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
index 9772dcc..4133ddc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTransactionsEx.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -35,7 +35,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions {
* @param txSize Number of entries participating in transaction (may be approximate).
* @return New transaction.
*/
- public IgniteInternalTx txStartEx(GridCacheContext ctx,
+ public GridNearTxLocal txStartEx(GridCacheContext ctx,
TransactionConcurrency concurrency,
TransactionIsolation isolation,
long timeout,
@@ -47,5 +47,7 @@ public interface IgniteTransactionsEx extends IgniteTransactions {
* @param isolation Isolation.
* @return New transaction.
*/
- public IgniteInternalTx txStartEx(GridCacheContext ctx, TransactionConcurrency concurrency, TransactionIsolation isolation);
+ public GridNearTxLocal txStartEx(GridCacheContext ctx,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 71be718..3bfd1f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -86,18 +86,16 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
@@ -1876,7 +1874,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
- IgniteTxLocalAdapter tx = null;
+ GridNearTxLocal tx = null;
if (checkTx) {
try {
@@ -2132,7 +2130,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
else {
return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
- @Override public IgniteInternalFuture<Map<K1, V1>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Map<K1, V1>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.getAllAsync(ctx,
readyTopVer,
keys,
@@ -2187,7 +2185,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected V getAndPut0(final K key, final V val, @Nullable final CacheEntryPredicate filter)
throws IgniteCheckedException {
return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException {
return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value();
}
@@ -2237,7 +2235,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
@Nullable final CacheEntryPredicate filter)
{
return asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAsync(ctx, readyTopVer, key, val, true, filter)
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
@@ -2293,7 +2291,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected boolean put0(final K key, final V val, final CacheEntryPredicate filter)
throws IgniteCheckedException {
Boolean res = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException {
return tx.putAsync(ctx, null, key, val, false, filter).get().success();
}
@@ -2316,7 +2314,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(drMap.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
tx.putAllDrAsync(ctx, drMap).get();
}
@@ -2335,7 +2333,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncOp(drMap.keySet()) {
- @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAllDrAsync(ctx, drMap);
}
@@ -2380,7 +2378,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
throws IgniteCheckedException {
assert topVer == null || tx.implicit();
@@ -2418,7 +2416,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
@@ -2448,7 +2446,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() {
- @Override public IgniteInternalFuture op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor);
@@ -2491,7 +2489,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx,
+ @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx,
AffinityTopologyVersion readyTopVer) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
@Override public EntryProcessor apply(K k) {
@@ -2532,7 +2530,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) {
- @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.invokeAsync(ctx,
readyTopVer,
(Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map,
@@ -2568,7 +2566,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
throws IgniteCheckedException {
IgniteInternalFuture<GridCacheReturn> fut =
tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
@@ -2616,7 +2614,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
@Nullable final CacheEntryPredicate filter) {
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAsync(ctx,
readyTopVer,
key,
@@ -2721,7 +2719,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected void putAll0(final Map<? extends K, ? extends V> m) throws IgniteCheckedException {
syncOp(new SyncInOp(m.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
tx.putAllAsync(ctx, null, m, false).get();
}
@@ -2748,7 +2746,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<?> putAllAsync0(final Map<? extends K, ? extends V> m) {
return asyncOp(new AsyncOp(m.keySet()) {
- @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.putAllAsync(ctx,
readyTopVer,
m,
@@ -2789,7 +2787,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
final boolean keepBinary = ctx.keepBinary();
return syncOp(new SyncOp<V>(true) {
- @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException {
K key0 = keepBinary ? (K) ctx.toCacheKeyObject(key) : key;
V ret = tx.removeAllAsync(ctx,
@@ -2839,7 +2837,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<V> getAndRemoveAsync0(final K key) {
return asyncOp(new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
// TODO should we invoke interceptor here?
return tx.removeAllAsync(ctx,
readyTopVer,
@@ -2897,7 +2895,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected void removeAll0(final Collection<? extends K> keys) throws IgniteCheckedException {
syncOp(new SyncInOp(keys.size() == 1) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx,
null,
keys,
@@ -2938,7 +2936,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<Object> removeAllAsync0(final Collection<? extends K> keys) {
return asyncOp(new AsyncOp(keys) {
- @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.removeAllAsync(ctx,
readyTopVer,
keys,
@@ -2990,7 +2988,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected boolean remove0(final K key, final CacheEntryPredicate filter) throws IgniteCheckedException {
Boolean res = syncOp(new SyncOp<Boolean>(true) {
- @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException {
return tx.removeAllAsync(ctx,
null,
Collections.singletonList(key),
@@ -3046,7 +3044,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable final CacheEntryPredicate filter) {
return asyncOp(new AsyncOp<Boolean>() {
- @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.removeAllAsync(ctx,
readyTopVer,
Collections.singletonList(key),
@@ -3071,8 +3069,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(false) {
- @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- tx.removeAllDrAsync(ctx, (Map)drMap).get();
+ @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException {
+ tx.removeAllDrAsync(ctx, drMap).get();
}
@Override public String toString() {
@@ -3090,8 +3088,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncOp(drMap.keySet()) {
- @Override public IgniteInternalFuture<?> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
- return tx.removeAllDrAsync(ctx, (Map)drMap);
+ @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
+ return tx.removeAllDrAsync(ctx, drMap);
}
@Override public String toString() {
@@ -3160,10 +3158,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public Transaction tx() {
- IgniteTxAdapter tx = ctx.tm().threadLocalTx(ctx);
-
- return tx == null ? null : new TransactionProxyImpl<>(tx, ctx.shared(), false);
+ @Nullable @Override public GridNearTxLocal tx() {
+ return ctx.tm().threadLocalTx(ctx);
}
/** {@inheritDoc} */
@@ -3291,7 +3287,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+ @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions();
return txs.txStartEx(ctx, concurrency, isolation);
@@ -4142,7 +4138,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Transaction commit future.
*/
@SuppressWarnings("unchecked")
- public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final IgniteInternalTx tx) {
+ IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) {
FutureHolder holder = lastFut.get();
holder.lock();
@@ -4154,7 +4150,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
@Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) {
- return tx.commitAsync();
+ return tx.commitNearTxLocalAsync();
}
});
@@ -4163,7 +4159,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return f;
}
- IgniteInternalFuture<IgniteInternalTx> f = tx.commitAsync();
+ IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
saveFuture(holder, f);
@@ -4208,7 +4204,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
awaitLastFut();
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
if (tx == null || tx.implicit()) {
TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config());
@@ -4304,7 +4300,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (log.isDebugEnabled())
log.debug("Performing async op: " + op);
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -4348,7 +4344,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
@SuppressWarnings("unchecked")
protected <T> IgniteInternalFuture<T> asyncOp(
- IgniteTxLocalAdapter tx,
+ GridNearTxLocal tx,
final AsyncOp<T> op,
final CacheOperationContext opCtx
) {
@@ -4364,7 +4360,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
try {
IgniteInternalFuture fut = holder.future();
- final IgniteTxLocalAdapter tx0 = tx;
+ final GridNearTxLocal tx0 = tx;
if (fut != null && !fut.isDone()) {
IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
@@ -4383,7 +4379,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
throw e;
}
catch (IgniteCheckedException e1) {
- tx0.rollbackAsync();
+ tx0.rollbackNearTxLocalAsync();
throw e1;
}
@@ -4409,7 +4405,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
throw e;
}
catch (IgniteCheckedException e1) {
- tx0.rollbackAsync();
+ tx0.rollbackNearTxLocalAsync();
throw e1;
}
@@ -4925,7 +4921,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private int retries;
/** */
- private IgniteTxLocalAdapter tx;
+ private GridNearTxLocal tx;
/** */
private CacheOperationContext opCtx;
@@ -5173,7 +5169,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Operation return value.
* @throws IgniteCheckedException If failed.
*/
- @Nullable public abstract T op(IgniteTxLocalAdapter tx) throws IgniteCheckedException;
+ @Nullable public abstract T op(GridNearTxLocal tx) throws IgniteCheckedException;
}
/**
@@ -5188,7 +5184,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public final Object op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Nullable @Override public final Object op(GridNearTxLocal tx) throws IgniteCheckedException {
inOp(tx);
return null;
@@ -5198,7 +5194,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param tx Transaction.
* @throws IgniteCheckedException If failed.
*/
- public abstract void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException;
+ public abstract void inOp(GridNearTxLocal tx) throws IgniteCheckedException;
}
/**
@@ -5234,14 +5230,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param readyTopVer Ready topology version.
* @return Operation future.
*/
- public abstract IgniteInternalFuture<T> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer);
+ public abstract IgniteInternalFuture<T> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer);
/**
* @param tx Transaction.
* @param opCtx Operation context.
* @return Operation future.
*/
- public IgniteInternalFuture<T> op(final IgniteTxLocalAdapter tx, CacheOperationContext opCtx) {
+ public IgniteInternalFuture<T> op(final GridNearTxLocal tx, CacheOperationContext opCtx) {
AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot();
if (txTopVer != null)
@@ -5267,7 +5263,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
private IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut,
final AffinityTopologyVersion topVer,
- final IgniteTxLocalAdapter tx,
+ final GridNearTxLocal tx,
final CacheOperationContext opCtx) {
final GridFutureAdapter fut0 = new GridFutureAdapter();
@@ -5304,7 +5300,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @param opCtx Operation context.
* @return Future.
*/
- private IgniteInternalFuture<T> runOp(IgniteTxLocalAdapter tx,
+ private IgniteInternalFuture<T> runOp(GridNearTxLocal tx,
AffinityTopologyVersion topVer,
CacheOperationContext opCtx) {
ctx.operationContextPerCall(opCtx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 00898ec..787a767 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -40,8 +40,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -939,7 +939,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
- @Override public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
+ @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) {
CacheOperationContext prev = gate.enter(opCtx);
try {
@@ -977,7 +977,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
}
/** {@inheritDoc} */
- @Override public Transaction tx() {
+ @Override public GridNearTxLocal tx() {
CacheOperationContext prev = gate.enter(opCtx);
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 0f79100..39a3baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.jta.CacheJtaManagerAdapter;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -731,7 +732,7 @@ public class GridCacheSharedContext<K, V> {
* @param tx Transaction to close.
* @throws IgniteCheckedException If failed.
*/
- public void endTx(IgniteInternalTx tx) throws IgniteCheckedException {
+ public void endTx(GridNearTxLocal tx) throws IgniteCheckedException {
tx.txState().awaitLastFut(this);
tx.close();
@@ -742,13 +743,13 @@ public class GridCacheSharedContext<K, V> {
* @return Commit future.
*/
@SuppressWarnings("unchecked")
- public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx) {
+ public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(GridNearTxLocal tx) {
GridCacheContext ctx = tx.txState().singleCacheContext(this);
if (ctx == null) {
tx.txState().awaitLastFut(this);
- return tx.commitAsync();
+ return tx.commitNearTxLocalAsync();
}
else
return ctx.cache().commitTxAsync(tx);
@@ -759,10 +760,10 @@ public class GridCacheSharedContext<K, V> {
* @throws IgniteCheckedException If failed.
* @return Rollback future.
*/
- public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException {
+ public IgniteInternalFuture rollbackTxAsync(GridNearTxLocal tx) throws IgniteCheckedException {
tx.txState().awaitLastFut(this);
- return tx.rollbackAsync();
+ return tx.rollbackNearTxLocalAsync();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 3e68b70..7131612 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -97,7 +98,6 @@ import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.LOCAL;
-import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
@@ -891,7 +891,7 @@ public class GridCacheUtils {
* @param isolation Isolation.
* @return New transaction.
*/
- public static IgniteInternalTx txStartInternal(GridCacheContext ctx, IgniteInternalCache prj,
+ public static GridNearTxLocal txStartInternal(GridCacheContext ctx, IgniteInternalCache prj,
TransactionConcurrency concurrency, TransactionIsolation isolation) {
assert ctx != null;
assert prj != null;
@@ -1257,7 +1257,7 @@ public class GridCacheUtils {
public static <K, V> void inTx(IgniteInternalCache<K, V> cache, TransactionConcurrency concurrency,
TransactionIsolation isolation, IgniteInClosureX<IgniteInternalCache<K ,V>> clo) throws IgniteCheckedException {
- try (IgniteInternalTx tx = cache.txStartEx(concurrency, isolation);) {
+ try (GridNearTxLocal tx = cache.txStartEx(concurrency, isolation);) {
clo.applyx(cache);
tx.commit();
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
index 0ac98fb..5471335 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java
@@ -43,6 +43,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -952,7 +953,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
* @param isolation Isolation.
* @return New transaction.
*/
- public IgniteInternalTx txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation);
+ public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation);
/**
* Starts transaction with specified isolation, concurrency, timeout, invalidation flag,
@@ -976,7 +977,7 @@ public interface IgniteInternalCache<K, V> extends Iterable<Cache.Entry<K, V>> {
* @return Transaction started by this thread or {@code null} if this thread
* does not have a transaction.
*/
- @Nullable public Transaction tx();
+ @Nullable public GridNearTxLocal tx();
/**
* Evicts entry associated with given key from cache. Note, that entry will be evicted
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
deleted file mode 100644
index 875ada0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheCommittedTxInfo.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Committed transaction information. Contains recovery writes that will be used to set commit values
- * in case if originating node crashes.
- */
-@Deprecated
-public class GridCacheCommittedTxInfo implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Originating transaction ID. */
- private GridCacheVersion originatingTxId;
-
- /** Originating node ID. */
- private UUID originatingNodeId;
-
- /** Recovery writes, i.e. values that have never been sent to remote nodes. */
- @GridToStringInclude
- private Collection<IgniteTxEntry> recoveryWrites;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridCacheCommittedTxInfo() {
- // No-op.
- }
-
- /**
- * @param tx Committed cache transaction.
- */
- public GridCacheCommittedTxInfo(IgniteInternalTx tx) {
- assert !tx.local() || !tx.replicated();
-
- originatingTxId = tx.nearXidVersion();
- originatingNodeId = tx.eventNodeId();
- }
-
- /**
- * @return Originating transaction ID (the transaction ID for replicated cache and near transaction ID
- * for partitioned cache).
- */
- public GridCacheVersion originatingTxId() {
- return originatingTxId;
- }
-
- /**
- * @return Originating node ID (the local transaction node ID for replicated cache and near node ID
- * for partitioned cache).
- */
- public UUID originatingNodeId() {
- return originatingNodeId;
- }
-
- /**
- * @return Collection of recovery writes.
- */
- public Collection<IgniteTxEntry> recoveryWrites() {
- return recoveryWrites == null ? Collections.<IgniteTxEntry>emptyList() : recoveryWrites;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- originatingTxId.writeExternal(out);
-
- U.writeUuid(out, originatingNodeId);
-
- U.writeCollection(out, recoveryWrites);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- originatingTxId = new GridCacheVersion();
-
- originatingTxId.readExternal(in);
-
- originatingNodeId = U.readUuid(in);
-
- recoveryWrites = U.readCollection(in);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheCommittedTxInfo.class, this, "recoveryWrites", recoveryWrites);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 7e4deff..00bc6d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -111,7 +111,7 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> lockAllAsync(Collection<? extends K> keys, long timeout) {
- IgniteTxLocalEx tx = ctx.tm().userTxx();
+ IgniteTxLocalEx tx = ctx.tm().userTx();
// Return value flag is true because we choose to bring values for explicit locks.
return lockAllAsync(ctx.cacheKeysView(keys),
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 68c0e57..b31a7be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -83,7 +83,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
* Transaction created by system implicitly on remote nodes.
*/
-public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
+public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
implements IgniteTxRemoteEx {
/** */
private static final long serialVersionUID = 0L;
@@ -180,11 +180,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public Collection<UUID> masterNodeIds() {
- return Collections.singleton(nodeId);
- }
-
- /** {@inheritDoc} */
@Override public UUID originatingNodeId() {
return nodeId;
}
@@ -347,12 +342,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<IgniteInternalTx> prepareAsync() {
- assert false;
- return null;
- }
-
- /** {@inheritDoc} */
@Override public Set<IgniteTxKey> readSet() {
return txState.readSet();
}
@@ -378,11 +367,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/**
- * Prepare phase.
- *
- * @throws IgniteCheckedException If prepare failed.
+ * @throws IgniteCheckedException If failed.
*/
- @Override public void prepare() throws IgniteCheckedException {
+ public final void prepareRemoteTx() throws IgniteCheckedException {
// If another thread is doing prepare or rollback.
if (!state(PREPARING)) {
// In optimistic mode prepare may be called multiple times.
@@ -729,7 +716,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @Override public void commit() throws IgniteCheckedException {
+ @Override public final void commitRemoteTx() throws IgniteCheckedException {
if (optimistic())
state(PREPARED);
@@ -748,7 +735,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (!isSystemInvalidate())
throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']');
- rollback();
+ rollbackRemoteTx();
}
commitIfLocked();
@@ -766,7 +753,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
try {
- commit();
+ commitRemoteTx();
return new GridFinishedFuture<IgniteInternalTx>(this);
}
@@ -776,8 +763,36 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
}
/** {@inheritDoc} */
- @SuppressWarnings({"CatchGenericClass"})
- @Override public void rollback() {
+ @Override public final IgniteInternalFuture<?> salvageTx() {
+ try {
+ systemInvalidate(true);
+
+ prepareRemoteTx();
+
+ if (state() == PREPARING) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
+ "by another thread: " + this);
+
+ return null;
+ }
+
+ doneRemote(xidVersion(),
+ Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList(),
+ Collections.<GridCacheVersion>emptyList());
+
+ commitRemoteTx();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to invalidate transaction: " + xidVersion(), e);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void rollbackRemoteTx() {
try {
// Note that we don't evict near entries here -
// they will be deleted by their corresponding transactions.
@@ -796,7 +811,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
/** {@inheritDoc} */
@Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
- rollback();
+ rollbackRemoteTx();
return new GridFinishedFuture<IgniteInternalTx>(this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index dea4072..1e09eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -178,7 +178,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @throws GridDistributedLockCancelledException If lock has been cancelled.
*/
@SuppressWarnings({"RedundantTypeArguments"})
- @Nullable GridDhtTxRemote startRemoteTx(UUID nodeId,
+ @Nullable private GridDhtTxRemote startRemoteTx(UUID nodeId,
GridDhtLockRequest req,
GridDhtLockResponse res)
throws IgniteCheckedException, GridDistributedLockCancelledException {
@@ -307,7 +307,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx.state() == COMMITTING)
tx.forceCommit();
else
- tx.rollback();
+ tx.rollbackRemoteTx();
}
return null;
@@ -362,7 +362,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (log.isDebugEnabled())
log.debug("Rolling back remote DHT transaction because it is empty [req=" + req + ", res=" + res + ']');
- tx.rollback();
+ tx.rollbackRemoteTx();
tx = null;
}
@@ -374,7 +374,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param nodeId Node ID.
* @param req Request.
*/
- protected final void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) {
+ private void processDhtLockRequest(final UUID nodeId, final GridDhtLockRequest req) {
if (txLockMsgLog.isDebugEnabled()) {
txLockMsgLog.debug("Received dht lock request [txId=" + req.nearXidVersion() +
", dhtTxId=" + req.version() +
@@ -452,7 +452,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param nodeId Node ID.
* @param req Request.
*/
- protected final void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
+ private void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
assert nodeId != null;
assert req != null;
assert !nodeId.equals(locNodeId);
@@ -561,10 +561,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (fail) {
if (dhtTx != null)
- dhtTx.rollback();
+ dhtTx.rollbackRemoteTx();
if (nearTx != null) // Even though this should never happen, we leave this check for consistency.
- nearTx.rollback();
+ nearTx.rollbackRemoteTx();
List<KeyCacheObject> keys = req.keys();
@@ -602,7 +602,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
* @param nodeId Node ID.
* @param req Request.
*/
- protected void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) {
+ private void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) {
clearLocks(nodeId, req);
if (isNearEnabled(cacheCfg))
@@ -961,8 +961,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
req.futureId(),
req.miniId(),
req.threadId(),
- req.implicitTx(),
- req.implicitSingleTx(),
+ /*implicitTx*/false,
+ /*implicitSingleTx*/false,
ctx.systemTx(),
false,
ctx.ioPolicy(),
@@ -989,7 +989,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
U.warn(log, msg);
if (tx != null)
- tx.rollback();
+ tx.rollbackDhtLocal();
return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg));
}
@@ -1038,31 +1038,12 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
t.xidVersion(),
e);
- if (resp.error() == null && t.onePhaseCommit()) {
- assert t.implicit();
-
- return t.commitAsync().chain(
- new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() {
- @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) {
- try {
- // Check for error.
- f.get();
- }
- catch (IgniteCheckedException e1) {
- resp.error(e1);
- }
-
- sendLockReply(nearNode, t, req, resp);
-
- return resp;
- }
- });
- }
- else {
- sendLockReply(nearNode, t, req, resp);
+ assert !t.implicit() : t;
+ assert !t.onePhaseCommit() : t;
- return new GridFinishedFuture<>(resp);
- }
+ sendLockReply(nearNode, t, req, resp);
+
+ return new GridFinishedFuture<>(resp);
}
}
);
@@ -1105,7 +1086,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (tx != null) {
try {
- tx.rollback();
+ tx.rollbackDhtLocal();
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to rollback the transaction: " + tx, ex);
@@ -1309,7 +1290,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
*/
private void sendLockReply(
ClusterNode nearNode,
- @Nullable IgniteInternalTx tx,
+ @Nullable GridDhtTxLocal tx,
GridNearLockRequest req,
GridNearLockResponse res
) {
@@ -1347,7 +1328,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
", res=" + res + ']', e);
if (tx != null)
- tx.rollbackAsync();
+ tx.rollbackDhtLocalAsync();
// Convert to closure exception as this method is only called form closures.
throw new GridClosureException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index bff69bc..b1c7e5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -292,82 +292,22 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> prepareAsync() {
- if (optimistic()) {
- assert isSystemInvalidate();
-
- return prepareAsync(
- null,
- null,
- Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
- 0,
- nearMiniId,
- null,
- true);
- }
-
- long timeout = remainingTime();
+ @Override public IgniteInternalFuture<?> salvageTx() {
+ systemInvalidate(true);
- // For pessimistic mode we don't distribute prepare request.
- GridDhtTxPrepareFuture fut = prepFut;
+ state(PREPARED);
- if (fut == null) {
- // Future must be created before any exception can be thrown.
- if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture(
- cctx,
- this,
- timeout,
- nearMiniId,
- Collections.<IgniteTxKey, GridCacheVersion>emptyMap(),
- true,
- needReturnValue()))) {
- if (timeout == -1)
- prepFut.onError(timeoutException());
+ if (state() == PREPARING) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring transaction in PREPARING state as it is currently handled " +
+ "by another thread: " + this);
- return prepFut;
- }
- }
- else
- // Prepare was called explicitly.
- return fut;
-
- if (!state(PREPARING)) {
- if (setRollbackOnly()) {
- if (timeout == -1)
- fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " +
- this));
- else
- fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() +
- ", tx=" + this + ']'));
- }
- else
- fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" +
- state() + ", tx=" + this + ']'));
-
- return fut;
+ return null;
}
- try {
- userPrepare();
-
- if (!state(PREPARED)) {
- setRollbackOnly();
-
- fut.onError(new IgniteCheckedException("Invalid transaction state for commit [state=" + state() +
- ", tx=" + this + ']'));
-
- return fut;
- }
+ setRollbackOnly();
- fut.complete();
-
- return fut;
- }
- catch (IgniteCheckedException e) {
- fut.onError(e);
-
- return fut;
- }
+ return rollbackDhtLocalAsync();
}
/**
@@ -382,7 +322,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
* @param last {@code True} if this is last prepare request.
* @return Future that will be completed when locks are acquired.
*/
- public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
+ public final IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync(
@Nullable Collection<IgniteTxEntry> reads,
@Nullable Collection<IgniteTxEntry> writes,
Map<IgniteTxKey, GridCacheVersion> verMap,
@@ -478,7 +418,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
fut.onError(new IgniteTxRollbackCheckedException("Failed to prepare transaction: " + this, e));
try {
- rollback();
+ rollbackDhtLocal();
}
catch (IgniteTxOptimisticCheckedException e1) {
if (log.isDebugEnabled())
@@ -523,7 +463,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
if (prepFut != null)
prepFut.get(); // Check for errors.
- boolean finished = finish(commit);
+ boolean finished = localFinish(commit);
if (!finished)
err = new IgniteCheckedException("Failed to finish transaction [commit=" + commit +
@@ -544,16 +484,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
fut.finish(commit);
}
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+ /**
+ * @return Commit future.
+ */
+ public IgniteInternalFuture<IgniteInternalTx> commitDhtLocalAsync() {
if (log.isDebugEnabled())
log.debug("Committing dht local tx: " + this);
- // In optimistic mode prepare was called explicitly.
- if (pessimistic())
- prepareAsync();
-
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, true);
cctx.mvcc().addFuture(fut, fut.futureId());
@@ -581,15 +518,29 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public IgniteInternalFuture<IgniteInternalTx> commitAsync() {
+ return commitDhtLocalAsync();
+ }
+
+ /** {@inheritDoc} */
@Override protected void clearPrepareFuture(GridDhtTxPrepareFuture fut) {
assert optimistic();
PREP_FUT_UPD.compareAndSet(this, fut, null);
}
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ public void rollbackDhtLocal() throws IgniteCheckedException {
+ rollbackDhtLocalAsync().get();
+ }
+
+ /**
+ * @return Rollback future.
+ */
+ public IgniteInternalFuture<IgniteInternalTx> rollbackDhtLocalAsync() {
final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, false);
cctx.mvcc().addFuture(fut, fut.futureId());
@@ -612,8 +563,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture<IgniteInternalTx> rollbackAsync() {
+ return rollbackDhtLocalAsync();
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
- @Override public boolean finish(boolean commit) throws IgniteCheckedException {
+ @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
assert nearFinFutId != null || isInvalidate() || !commit || isSystemInvalidate()
|| onePhaseCommit() || state() == PREPARED :
"Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit +
@@ -621,7 +577,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa
assert nearMiniId != 0;
- return super.finish(commit);
+ return super.localFinish(commit);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 67e1993..0329386 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -29,7 +29,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -161,7 +160,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @param node Node.
*/
- public void addLockTransactionNode(ClusterNode node) {
+ void addLockTransactionNode(ClusterNode node) {
assert node != null;
assert !node.isLocal();
@@ -185,7 +184,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
*
* @return Has near cache flag.
*/
- public boolean nearOnOriginatingNode() {
+ boolean nearOnOriginatingNode() {
return nearOnOriginatingNode;
}
@@ -206,7 +205,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @return Nodes where transactions were started on lock step.
*/
- @Nullable public Set<ClusterNode> lockTransactionNodes() {
+ @Nullable Set<ClusterNode> lockTransactionNodes() {
return lockTxNodes;
}
@@ -349,14 +348,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/**
* @param mappings Mappings to add.
*/
- void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
+ private void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
addMapping(mappings, dhtMap);
}
/**
* @param mappings Mappings to add.
*/
- void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
+ private void addNearNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) {
addMapping(mappings, nearMap);
}
@@ -654,7 +653,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
needRetVal,
createTtl,
accessTtl,
- null,
skipStore,
keepBinary);
}
@@ -673,7 +671,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
* @param needRetVal Return value flag.
* @param createTtl TTL for create operation.
* @param accessTtl TTL for read operation.
- * @param filter Entry write filter.
* @param skipStore Skip store flag.
* @return Future for lock acquisition.
*/
@@ -685,7 +682,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
final boolean needRetVal,
final long createTtl,
final long accessTtl,
- @Nullable final CacheEntryPredicate[] filter,
boolean skipStore,
boolean keepBinary) {
if (log.isDebugEnabled())
@@ -729,7 +725,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/*retval*/false,
/*read*/read,
accessTtl,
- filter == null ? CU.empty0() : filter,
+ CU.empty0(),
/*computeInvoke*/false);
return ret;
@@ -740,7 +736,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
/** {@inheritDoc} */
@SuppressWarnings({"CatchGenericClass", "ThrowableInstanceNeverThrown"})
- @Override public boolean finish(boolean commit) throws IgniteCheckedException {
+ @Override public boolean localFinish(boolean commit) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Finishing dht local tx [tx=" + this + ", commit=" + commit + "]");
@@ -858,16 +854,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
}
/** {@inheritDoc} */
- @Override public void rollback() throws IgniteCheckedException {
- try {
- rollbackAsync().get();
- }
- finally {
- cctx.tm().resetContext();
- }
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(),
"dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 56884ff..93ea30d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -718,18 +718,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo =
new CIX1<IgniteInternalFuture<IgniteInternalTx>>() {
@Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) {
- try {
- if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1))
- sendPrepareResponse(res);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + tx.nearNodeId() +
- ", res=" + res,
- ", tx=" + tx,
- e);
- }
+ if (REPLIED_UPD.compareAndSet(GridDhtTxPrepareFuture.this, 0, 1))
+ sendPrepareResponse(res);
}
};
@@ -761,18 +751,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
}
else {
- try {
- if (REPLIED_UPD.compareAndSet(this, 0, 1))
- sendPrepareResponse(res);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + tx.nearNodeId() +
- ", res=" + res,
- ", tx=" + tx,
- e);
- }
+ if (REPLIED_UPD.compareAndSet(this, 0, 1))
+ sendPrepareResponse(res);
}
return true;
@@ -784,14 +764,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
try {
sendPrepareResponse(res);
}
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
- ", dhtTxId=" + tx.xidVersion() +
- ", node=" + tx.nearNodeId() +
- ", res=" + res,
- ", tx=" + tx,
- e);
- }
finally {
// Will call super.onDone().
onComplete(res);
@@ -819,9 +791,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/**
* @param res Response.
- * @throws IgniteCheckedException If failed to send response.
*/
- private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException {
+ private void sendPrepareResponse(GridNearTxPrepareResponse res) {
if (!tx.nearNodeId().equals(cctx.localNodeId())) {
Throwable err = this.err;
@@ -837,13 +808,31 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
return;
}
- cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
+ try {
+ cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() +
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DHT prepare fut, sent response [txId=" + tx.nearXidVersion() +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + tx.nearNodeId() +
+ ", res=" + res + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to send prepare response, node left [txId=" + tx.nearXidVersion() + "," +
+ ", dhtTxId=" + tx.xidVersion() +
+ ", node=" + tx.nearNodeId() +
+ ", res=" + res + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send prepare response [txId=" + tx.nearXidVersion() + "," +
", dhtTxId=" + tx.xidVersion() +
", node=" + tx.nearNodeId() +
- ", res=" + res + ']');
+ ", res=" + res,
+ ", tx=" + tx + ']',
+ e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index e1e0ec2..03bbfe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -207,13 +207,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keyCheck)
validateCacheKey(key);
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
final CacheOperationContext opCtx = ctx.operationContextPerCall();
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<V>() {
- @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx,
readyTopVer,
Collections.singleton(ctx.toCacheKeyObject(key)),
@@ -289,13 +289,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (keyCheck)
validateCacheKeys(keys);
- IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx);
+ GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
final CacheOperationContext opCtx = ctx.operationContextPerCall();
if (tx != null && !tx.implicit() && !skipTx) {
return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) {
- @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx, AffinityTopologyVersion readyTopVer) {
+ @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) {
return tx.getAllAsync(ctx,
readyTopVer,
ctx.cacheKeysView(keys),
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 0ce380d..79c15fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -917,6 +917,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
first = false;
}
+ assert !implicitTx() && !implicitSingleTx() : tx;
+
req = new GridNearLockRequest(
cctx.cacheId(),
topVer,
@@ -925,8 +927,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
futId,
lockVer,
inTx(),
- implicitTx(),
- implicitSingleTx(),
read,
retval,
isolation(),
@@ -982,9 +982,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
}
}
- if (inTx() && req != null)
- req.hasTransforms(tx.hasTransforms());
-
if (!distributedKeys.isEmpty()) {
mapping.distributedKeys(distributedKeys);
http://git-wip-us.apache.org/repos/asf/ignite/blob/decb0c7a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index ffc84d8..1948df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1045,6 +1045,8 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
first = false;
}
+ assert !implicitTx() && !implicitSingleTx() : tx;
+
req = new GridNearLockRequest(
cctx.cacheId(),
topVer,
@@ -1053,8 +1055,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
futId,
lockVer,
inTx(),
- implicitTx(),
- implicitSingleTx(),
read,
retval,
isolation(),