You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/02/03 07:32:46 UTC
[45/46] incubator-ignite git commit: GG-9655 - Fixing tests after
merge.
GG-9655 - Fixing tests after merge.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/be5b908c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/be5b908c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/be5b908c
Branch: refs/heads/ingite-9655-merge
Commit: be5b908c0c1c358a63fffe1ec5f4e6586eec3368
Parents: f004f9d
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Mon Feb 2 17:18:40 2015 -0800
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Mon Feb 2 17:18:40 2015 -0800
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 9 +-
.../distributed/GridDistributedTxMapping.java | 6 +-
.../distributed/dht/GridDhtLockFuture.java | 8 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 22 ---
.../distributed/dht/GridDhtTxFinishRequest.java | 114 ++-------------
.../cache/distributed/dht/GridDhtTxLocal.java | 39 +++++-
.../distributed/dht/GridDhtTxPrepareFuture.java | 103 +++++++++-----
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 21 ++-
.../near/GridNearTxPrepareFuture.java | 139 ++++---------------
.../cache/transactions/IgniteTxHandler.java | 73 ++++++----
.../IgniteCacheExpiryPolicyAbstractTest.java | 6 +-
12 files changed, 213 insertions(+), 329 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 835aa39..a5d39ab 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -272,14 +272,19 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
U.join(exchWorker, log);
- exchFuts = null;
-
ResendTimeoutObject resendTimeoutObj = pendingResend.getAndSet(null);
if (resendTimeoutObj != null)
cctx.time().removeTimeoutObject(resendTimeoutObj);
}
+ /** {@inheritDoc} */
+ @Override protected void stop0(boolean cancel) {
+ super.stop0(cancel);
+
+ exchFuts = null;
+ }
+
public GridDhtPartitionTopology<K, V> clientTopology(int cacheId, GridDhtPartitionExchangeId exchId) {
GridClientPartitionTopology<K, V> top = clientTops.get(cacheId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
index bda5db0..257a331 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java
@@ -21,6 +21,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.tostring.*;
@@ -28,7 +29,6 @@ import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
-import java.util.concurrent.*;
/**
* Transaction node mapping.
@@ -76,7 +76,7 @@ public class GridDistributedTxMapping<K, V> implements Externalizable {
public GridDistributedTxMapping(ClusterNode node) {
this.node = node;
- entries = new ConcurrentLinkedQueue<>();
+ entries = new GridConcurrentLinkedHashSet<>();
}
/**
@@ -271,7 +271,7 @@ public class GridDistributedTxMapping<K, V> implements Externalizable {
*/
private void ensureModifiable() {
if (readOnly) {
- entries = new ConcurrentLinkedQueue<>(entries);
+ entries = new GridConcurrentLinkedHashSet<>(entries);
readOnly = false;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index b5da263..efc3452 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -780,12 +780,8 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
}
}
- if (tx != null) {
- tx.addDhtNodeEntryMapping(dhtMap);
- tx.addNearNodeEntryMapping(nearMap);
-
+ if (tx != null)
tx.needsCompletedVersions(hasRmtNodes);
- }
if (isDone()) {
if (log.isDebugEnabled())
@@ -878,8 +874,6 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
boolean invalidateRdr = e.readerId(n.id()) != null;
- IgniteTxEntry<K, V> entry = tx != null ? tx.entry(e.txKey()) : null;
-
req.addDhtKey(
e.key(),
e.getOrMarshalKeyBytes(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index f6054c7..922e644 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.future.*;
@@ -330,20 +329,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.subjectId(),
tx.taskNameHash());
- if (!tx.pessimistic()) {
- int idx = 0;
-
- for (IgniteTxEntry<K, V> e : dhtMapping.writes())
- req.ttl(idx++, e.ttl());
-
- if (nearMapping != null) {
- idx = 0;
-
- for (IgniteTxEntry<K, V> e : nearMapping.writes())
- req.nearTtl(idx++, e.ttl());
- }
- }
-
try {
cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
@@ -395,13 +380,6 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur
tx.subjectId(),
tx.taskNameHash());
- if (!tx.pessimistic()) {
- int idx = 0;
-
- for (IgniteTxEntry<K, V> e : nearMapping.writes())
- req.nearTtl(idx++, e.ttl());
- }
-
if (tx.onePhaseCommit())
req.writeVersion(tx.writeVersion());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 395edd9..9c39c7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -18,10 +18,8 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
@@ -72,12 +70,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
@GridDirectVersion(2)
private int taskNameHash;
- /** TTLs for optimistic transaction. */
- private GridLongList ttls;
-
- /** Near cache TTLs for optimistic transaction. */
- private GridLongList nearTtls;
-
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -228,56 +220,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers;
}
- /**
- * @param idx Entry index.
- * @param ttl TTL.
- */
- public void ttl(int idx, long ttl) {
- if (ttl != -1L) {
- if (ttls == null) {
- ttls = new GridLongList();
-
- for (int i = 0; i < idx - 1; i++)
- ttls.add(-1L);
- }
- }
-
- if (ttls != null)
- ttls.add(ttl);
- }
-
- /**
- * @return TTLs for optimistic transaction.
- */
- public GridLongList ttls() {
- return ttls;
- }
-
- /**
- * @param idx Entry index.
- * @param ttl TTL.
- */
- public void nearTtl(int idx, long ttl) {
- if (ttl != -1L) {
- if (nearTtls == null) {
- nearTtls = new GridLongList();
-
- for (int i = 0; i < idx - 1; i++)
- nearTtls.add(-1L);
- }
- }
-
- if (nearTtls != null)
- nearTtls.add(ttl);
- }
-
- /**
- * @return TTLs for optimistic transaction.
- */
- public GridLongList nearTtls() {
- return nearTtls;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtTxFinishRequest.class, this, super.toString());
@@ -308,8 +250,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
_clone.writeVer = writeVer;
_clone.subjId = subjId;
_clone.taskNameHash = taskNameHash;
- _clone.ttls = ttls;
- _clone.nearTtls = nearTtls;
}
/** {@inheritDoc} */
@@ -347,12 +287,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
case 22:
- if (!commState.putLongList(nearTtls))
- return false;
-
- commState.idx++;
-
- case 23:
if (pendingVers != null) {
if (commState.it == null) {
if (!commState.putInt(pendingVers.size()))
@@ -379,42 +313,35 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 24:
+ case 23:
if (!commState.putBoolean(sysInvalidate))
return false;
commState.idx++;
- case 25:
+ case 24:
if (!commState.putLong(topVer))
return false;
commState.idx++;
- case 26:
- if (!commState.putLongList(ttls))
- return false;
-
- commState.idx++;
-
- case 27:
+ case 25:
if (!commState.putCacheVersion(writeVer))
return false;
commState.idx++;
- case 28:
+ case 26:
if (!commState.putUuid(subjId))
return false;
commState.idx++;
- case 29:
+ case 27:
if (!commState.putInt(taskNameHash))
return false;
commState.idx++;
-
}
return true;
@@ -460,16 +387,6 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
case 22:
- GridLongList nearTtls0 = commState.getLongList();
-
- if (nearTtls0 == LONG_LIST_NOT_READ)
- return false;
-
- nearTtls = nearTtls0;
-
- commState.idx++;
-
- case 23:
if (commState.readSize == -1) {
if (buf.remaining() < 4)
return false;
@@ -498,7 +415,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 24:
+ case 23:
if (buf.remaining() < 1)
return false;
@@ -506,7 +423,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 25:
+ case 24:
if (buf.remaining() < 8)
return false;
@@ -514,17 +431,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 26:
- GridLongList ttls0 = commState.getLongList();
-
- if (ttls0 == LONG_LIST_NOT_READ)
- return false;
-
- ttls = ttls0;
-
- commState.idx++;
-
- case 27:
+ case 25:
GridCacheVersion writeVer0 = commState.getCacheVersion();
if (writeVer0 == CACHE_VER_NOT_READ)
@@ -534,7 +441,7 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 28:
+ case 26:
UUID subjId0 = commState.getUuid();
if (subjId0 == UUID_NOT_READ)
@@ -544,14 +451,13 @@ public class GridDhtTxFinishRequest<K, V> extends GridDistributedTxFinishRequest
commState.idx++;
- case 29:
+ case 27:
if (buf.remaining() < 4)
return false;
taskNameHash = commState.getInt();
commState.idx++;
-
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
index 7bd05da..4077275 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java
@@ -280,7 +280,15 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
if (optimistic()) {
assert isSystemInvalidate();
- return prepareAsync(null, null, Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), 0, nearMiniId, null, true,
+ return prepareAsync(
+ null,
+ null,
+ Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(),
+ 0,
+ nearMiniId,
+ null,
+ true,
+ null,
null);
}
@@ -289,8 +297,15 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
if (fut == null) {
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId,
- Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), true, needReturnValue(), null)))
+ if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+ cctx,
+ this,
+ nearMiniId,
+ Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(),
+ true,
+ needReturnValue(),
+ null,
+ null)))
return prepFut.get();
}
else
@@ -348,14 +363,17 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
* @param lastBackups IDs of backup nodes receiving last prepare request.
* @return Future that will be completed when locks are acquired.
*/
- public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync(@Nullable Iterable<IgniteTxEntry<K, V>> reads,
+ public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsync(
+ @Nullable Iterable<IgniteTxEntry<K, V>> reads,
@Nullable Iterable<IgniteTxEntry<K, V>> writes,
Map<IgniteTxKey<K>, GridCacheVersion> verMap,
long msgId,
IgniteUuid nearMiniId,
Map<UUID, Collection<UUID>> txNodes,
boolean last,
- Collection<UUID> lastBackups) {
+ Collection<UUID> lastBackups,
+ IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
+ ) {
// In optimistic mode prepare still can be called explicitly from salvageTx.
GridDhtTxPrepareFuture<K, V> fut = prepFut.get();
@@ -363,8 +381,15 @@ public class GridDhtTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> implements
init();
// Future must be created before any exception can be thrown.
- if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(cctx, this, nearMiniId, verMap, last,
- needReturnValue(), lastBackups))) {
+ if (!prepFut.compareAndSet(null, fut = new GridDhtTxPrepareFuture<>(
+ cctx,
+ this,
+ nearMiniId,
+ verMap,
+ last,
+ needReturnValue(),
+ lastBackups,
+ completeCb))) {
GridDhtTxPrepareFuture<K, V> f = prepFut.get();
assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 903de41..406a589 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -124,6 +124,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
/** Locks ready flag. */
private volatile boolean locksReady;
+ /** */
+ private IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -146,7 +149,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
Map<IgniteTxKey<K>, GridCacheVersion> dhtVerMap,
boolean last,
boolean retVal,
- Collection<UUID> lastBackups
+ Collection<UUID> lastBackups,
+ IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
) {
super(cctx.kernalContext(), new IgniteReducer<IgniteTxEx<K, V>, IgniteTxEx<K, V>>() {
@Override public boolean collect(IgniteTxEx<K, V> e) {
@@ -176,6 +180,8 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
this.retVal = retVal;
+ this.completeCb = completeCb;
+
assert dhtMap != null;
assert nearMap != null;
}
@@ -273,13 +279,13 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridCacheEntryEx<K, V> cached = txEntry.cached();
- try {
- if (txEntry.op() == CREATE || txEntry.op() == UPDATE && txEntry.drExpireTime() == -1L) {
- ExpiryPolicy expiry = txEntry.expiry();
+ ExpiryPolicy expiry = txEntry.expiry();
- if (expiry == null)
- expiry = cacheCtx.expiry();
+ if (expiry == null)
+ expiry = cacheCtx.expiry();
+ try {
+ if (txEntry.op() == CREATE || txEntry.op() == UPDATE && txEntry.drExpireTime() == -1L) {
if (expiry != null) {
Duration duration = cached.hasValue() ?
expiry.getExpiryForUpdate() : expiry.getExpiryForCreation();
@@ -342,6 +348,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) {
+ if (expiry != null)
+ txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
+
txEntry.op(GridCacheOperation.NOOP);
if (filterFailedKeys == null)
@@ -385,10 +394,9 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
// U.error(log, "Failed to automatically rollback transaction: " + tx, ex);
// }
//
- // If not local node.
- if (!tx.nearNodeId().equals(cctx.localNodeId())) {
+ try {
// Send reply back to near node.
- GridCacheMessage<K, V> res = new GridNearTxPrepareResponse<>(
+ GridNearTxPrepareResponse<K, V> res = new GridNearTxPrepareResponse<>(
tx.nearXidVersion(),
tx.nearFutureId(),
nearMiniId,
@@ -397,14 +405,10 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
ret,
t);
- try {
- cctx.io().send(tx.nearNodeId(), res, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send reply to originating near node (will rollback): " + tx.nearNodeId(), e);
-
- tx.rollbackAsync();
- }
+ sendPrepareResponse(res);
+ }
+ catch (IgniteCheckedException ignore) {
+ tx.rollbackAsync();
}
onComplete();
@@ -521,20 +525,31 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
onComplete();
- if (!tx.colocated() && tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) {
- IgniteInternalFuture<IgniteTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync();
+ if (!tx.near()) {
+ if (tx.markFinalizing(IgniteTxEx.FinalizationStatus.USER_FINISH)) {
+ IgniteInternalFuture<IgniteTx> fut = this.err.get() == null ? tx.commitAsync() : tx.rollbackAsync();
- fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteTx>>() {
- @Override public void applyx(IgniteInternalFuture<IgniteTx> gridCacheTxGridFuture) {
- try {
- if (replied.compareAndSet(false, true))
- sendPrepareResponse(res);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ fut.listenAsync(new CIX1<IgniteInternalFuture<IgniteTx>>() {
+ @Override public void applyx(IgniteInternalFuture<IgniteTx> gridCacheTxGridFuture) {
+ try {
+ if (replied.compareAndSet(false, true))
+ sendPrepareResponse(res);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ }
}
- }
- });
+ });
+ }
+ }
+ else {
+ try {
+ if (replied.compareAndSet(false, true))
+ sendPrepareResponse(res);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send prepare response for transaction: " + tx, e);
+ }
}
return true;
@@ -579,6 +594,11 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
private void sendPrepareResponse(GridNearTxPrepareResponse<K, V> res) throws IgniteCheckedException {
if (!tx.nearNodeId().equals(cctx.localNodeId()))
cctx.io().send(tx.nearNodeId(), res);
+ else {
+ assert completeCb != null;
+
+ completeCb.apply(res);
+ }
}
/**
@@ -778,16 +798,16 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
boolean hasRemoteNodes = false;
// Assign keys to primary nodes.
- if (!F.isEmpty(reads)) {
- for (IgniteTxEntry<K, V> read : reads)
- hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap);
- }
-
if (!F.isEmpty(writes)) {
for (IgniteTxEntry<K, V> write : writes)
hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap);
}
+ if (!F.isEmpty(reads)) {
+ for (IgniteTxEntry<K, V> read : reads)
+ hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap);
+ }
+
tx.needsCompletedVersions(hasRemoteNodes);
}
@@ -990,12 +1010,23 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridDhtCacheEntry<K, V> cached = (GridDhtCacheEntry<K, V>)entry.cached();
- boolean ret;
-
GridCacheContext<K, V> cacheCtx = entry.context();
GridDhtCacheAdapter<K, V> dht = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht();
+ ExpiryPolicy expiry = entry.expiry();
+
+ if (expiry == null)
+ expiry = cacheCtx.expiry();
+
+ if (expiry != null && entry.op() == READ) {
+ entry.op(NOOP);
+
+ entry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
+ }
+
+ boolean ret;
+
while (true) {
try {
Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index d5e9714..6bb7c79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -119,7 +119,7 @@ public class GridDhtPreloader<K, V> extends GridCachePreloaderAdapter<K, V> {
top = cctx.dht().topology();
- startFut = new GridFutureAdapter<>(cctx.kernalContext());
+ startFut = new GridFutureAdapter<>(cctx.kernalContext(), false);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index b8bda46..e77689f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -873,9 +873,13 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
* @return Future that will be completed when locks are acquired.
*/
@SuppressWarnings("TypeMayBeWeakened")
- public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsyncLocal(@Nullable Collection<IgniteTxEntry<K, V>> reads,
- @Nullable Collection<IgniteTxEntry<K, V>> writes, Map<UUID, Collection<UUID>> txNodes, boolean last,
- Collection<UUID> lastBackups) {
+ public IgniteInternalFuture<IgniteTxEx<K, V>> prepareAsyncLocal(
+ @Nullable Collection<IgniteTxEntry<K, V>> reads,
+ @Nullable Collection<IgniteTxEntry<K, V>> writes,
+ Map<UUID, Collection<UUID>> txNodes, boolean last,
+ Collection<UUID> lastBackups,
+ IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
+ ) {
if (state() != PREPARING) {
if (timedOut())
return new GridFinishedFuture<>(cctx.kernalContext(),
@@ -889,8 +893,15 @@ public class GridNearTxLocal<K, V> extends GridDhtTxLocalAdapter<K, V> {
init();
- GridDhtTxPrepareFuture<K, V> fut = new GridDhtTxPrepareFuture<>(cctx, this, IgniteUuid.randomUuid(),
- Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(), last, needReturnValue() && implicit(), lastBackups);
+ GridDhtTxPrepareFuture<K, V> fut = new GridDhtTxPrepareFuture<>(
+ cctx,
+ this,
+ IgniteUuid.randomUuid(),
+ Collections.<IgniteTxKey<K>, GridCacheVersion>emptyMap(),
+ last,
+ needReturnValue() && implicit(),
+ lastBackups,
+ completeCb);
try {
// At this point all the entries passed in must be enlisted in transaction because this is an
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index ee22b3e..fb422a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.*;
import org.jdk8.backport.*;
import org.jetbrains.annotations.*;
+import javax.cache.expiry.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.*;
@@ -586,7 +587,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
futId,
tx.topologyVersion(),
tx,
- tx.optimistic() && tx.serializable() ? m.reads() : null,
+ m.reads(),
m.writes(),
/*grp lock key*/null,
/*part lock*/false,
@@ -605,63 +606,20 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
req.addDhtVersion(txEntry.txKey(), null);
}
- if (node.isLocal()) {
- IgniteInternalFuture<IgniteTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(node.id(), tx, req);
-
- // Add new future.
- add(new GridEmbeddedFuture<>(
- cctx.kernalContext(),
- fut,
- new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() {
- @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> t, Exception ex) {
- if (ex != null) {
- onError(node.id(), null, ex);
-
- return t;
- }
-
- IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, V>)t;
-
- Collection<Integer> invalidParts = dhtTx.invalidPartitions();
-
- assert F.isEmpty(invalidParts);
-
- if (!m.empty()) {
- for (IgniteTxEntry<K, V> writeEntry : m.entries()) {
- IgniteTxKey<K> key = writeEntry.txKey();
-
- IgniteTxEntry<K, V> dhtTxEntry = dhtTx.entry(key);
-
- assert dhtTxEntry != null;
-
- if (dhtTxEntry.op() == NOOP) {
- IgniteTxEntry<K, V> txEntry = tx.entry(key);
-
- assert txEntry != null;
-
- txEntry.op(NOOP);
- }
- }
-
- tx.addDhtVersion(m.node().id(), dhtTx.xidVersion());
+ final MiniFuture fut = new MiniFuture(m, null);
- m.dhtVersion(dhtTx.xidVersion());
- }
+ req.miniId(fut.futureId());
- tx.implicitSingleResult(dhtTx.implicitSingleResult());
+ add(fut);
- return tx;
- }
+ if (node.isLocal()) {
+ cctx.tm().txHandler().prepareTx(node.id(), tx, req, new CI1<GridNearTxPrepareResponse<K, V>>() {
+ @Override public void apply(GridNearTxPrepareResponse<K, V> res) {
+ fut.onResult(node.id(), res);
}
- ));
+ });
}
else {
- MiniFuture fut = new MiniFuture(m, null);
-
- req.miniId(fut.futureId());
-
- add(fut); // Append new future.
-
try {
cctx.io().send(node, req);
}
@@ -750,76 +708,27 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
}
}
+ final MiniFuture fut = new MiniFuture(m, mappings);
+
+ req.miniId(fut.futureId());
+
+ add(fut); // Append new future.
+
// If this is the primary node for the keys.
if (n.isLocal()) {
- req.miniId(IgniteUuid.randomUuid());
-
// At this point, if any new node joined, then it is
// waiting for this transaction to complete, so
// partition reassignments are not possible here.
- IgniteInternalFuture<IgniteTxEx<K, V>> fut = cctx.tm().txHandler().prepareTx(n.id(), tx, req);
-
- // Add new future.
- add(new GridEmbeddedFuture<>(
- cctx.kernalContext(),
- fut,
- new C2<IgniteTxEx<K, V>, Exception, IgniteTxEx<K, V>>() {
- @Override public IgniteTxEx<K, V> apply(IgniteTxEx<K, V> t, Exception ex) {
- if (ex != null) {
- onError(n.id(), mappings, ex);
-
- return t;
- }
-
- IgniteTxLocalEx<K, V> dhtTx = (IgniteTxLocalEx<K, V>)t;
-
- Collection<Integer> invalidParts = dhtTx.invalidPartitions();
-
- assert F.isEmpty(invalidParts);
-
- tx.implicitSingleResult(dhtTx.implicitSingleResult());
-
- if (!m.empty()) {
- for (IgniteTxEntry<K, V> writeEntry : m.entries()) {
- IgniteTxKey<K> key = writeEntry.txKey();
-
- IgniteTxEntry<K, V> dhtTxEntry = dhtTx.entry(key);
-
- if (dhtTxEntry.op() == NOOP)
- tx.entry(key).op(NOOP);
- }
-
- tx.addDhtVersion(m.node().id(), dhtTx.xidVersion());
-
- m.dhtVersion(dhtTx.xidVersion());
-
- GridCacheVersion min = dhtTx.minVersion();
-
- IgniteTxManager<K, V> tm = cctx.tm();
-
- if (m.near())
- tx.readyNearLocks(m, Collections.<GridCacheVersion>emptyList(),
- tm.committedVersions(min), tm.rolledbackVersions(min));
- }
-
- // Continue prepare before completing the future.
- proceedPrepare(mappings);
-
- return tx;
- }
+ cctx.tm().txHandler().prepareTx(n.id(), tx, req, new CI1<GridNearTxPrepareResponse<K, V>>() {
+ @Override public void apply(GridNearTxPrepareResponse<K, V> res) {
+ fut.onResult(n.id(), res);
}
- ));
+ });
}
else {
assert !tx.groupLock() : "Got group lock transaction that is mapped on remote node [tx=" + tx +
", nodeId=" + n.id() + ']';
- MiniFuture fut = new MiniFuture(m, mappings);
-
- req.miniId(fut.futureId());
-
- add(fut); // Append new future.
-
try {
cctx.io().send(n, req, tx.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
}
@@ -1054,6 +963,14 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
assert txEntry != null : "Missing tx entry for write key: " + key;
txEntry.op(NOOP);
+
+ ExpiryPolicy expiry = txEntry.expiry();
+
+ if (expiry == null)
+ expiry = txEntry.context().expiry();
+
+ if (expiry != null)
+ txEntry.ttl(CU.toTtl(expiry.getExpiryForAccess()));
}
if (!m.empty()) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index 6a0d200..911516b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.util.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.transactions.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -54,7 +54,7 @@ public class IgniteTxHandler<K, V> {
public IgniteInternalFuture<IgniteTxEx<K, V>> processNearTxPrepareRequest(final UUID nearNodeId,
final GridNearTxPrepareRequest<K, V> req) {
- return prepareTx(nearNodeId, null, req);
+ return prepareTx(nearNodeId, null, req, null);
}
/**
@@ -134,23 +134,29 @@ public class IgniteTxHandler<K, V> {
* @param req Near prepare request.
* @return Future for transaction.
*/
- public IgniteInternalFuture<IgniteTxEx<K, V>> prepareTx(final UUID nearNodeId, @Nullable GridNearTxLocal<K, V> locTx,
- final GridNearTxPrepareRequest<K, V> req) {
+ public IgniteInternalFuture<IgniteTxEx<K, V>> prepareTx(
+ UUID nearNodeId,
+ @Nullable GridNearTxLocal<K, V> locTx,
+ GridNearTxPrepareRequest<K, V> req,
+ @Nullable IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
+ ) {
assert nearNodeId != null;
assert req != null;
if (locTx != null) {
+ assert completeCb != null;
+
if (req.near()) {
// Make sure not to provide Near entries to DHT cache.
req.cloneEntries();
- return prepareNearTx(nearNodeId, req);
+ return prepareNearTx(nearNodeId, req, completeCb);
}
else
- return prepareColocatedTx(locTx, req);
+ return prepareColocatedTx(locTx, req, completeCb);
}
else
- return prepareNearTx(nearNodeId, req);
+ return prepareNearTx(nearNodeId, req, null);
}
/**
@@ -160,8 +166,11 @@ public class IgniteTxHandler<K, V> {
* @param req Near prepare request.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteTxEx<K, V>> prepareColocatedTx(final GridNearTxLocal<K, V> locTx,
- final GridNearTxPrepareRequest<K, V> req) {
+ private IgniteInternalFuture<IgniteTxEx<K, V>> prepareColocatedTx(
+ final GridNearTxLocal<K, V> locTx,
+ final GridNearTxPrepareRequest<K, V> req,
+ final IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
+ ) {
IgniteInternalFuture<Object> fut = new GridFinishedFutureEx<>(); // TODO force preload keys.
@@ -173,8 +182,13 @@ public class IgniteTxHandler<K, V> {
if (ex != null)
throw new GridClosureException(ex);
- IgniteInternalFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(req.reads(), req.writes(),
- req.transactionNodes(), req.last(), req.lastBackups());
+ IgniteInternalFuture<IgniteTxEx<K, V>> fut = locTx.prepareAsyncLocal(
+ req.reads(),
+ req.writes(),
+ req.transactionNodes(),
+ req.last(),
+ req.lastBackups(),
+ completeCb);
if (locTx.isRollbackOnly())
locTx.rollbackAsync();
@@ -206,8 +220,11 @@ public class IgniteTxHandler<K, V> {
* @param req Near prepare request.
* @return Prepare future.
*/
- private IgniteInternalFuture<IgniteTxEx<K, V>> prepareNearTx(final UUID nearNodeId,
- final GridNearTxPrepareRequest<K, V> req) {
+ private IgniteInternalFuture<IgniteTxEx<K, V>> prepareNearTx(
+ final UUID nearNodeId,
+ final GridNearTxPrepareRequest<K, V> req,
+ IgniteInClosure<GridNearTxPrepareResponse<K, V>> completeCb
+ ) {
ClusterNode nearNode = ctx.node(nearNodeId);
if (nearNode == null) {
@@ -286,9 +303,16 @@ public class IgniteTxHandler<K, V> {
if (req.returnValue())
tx.needReturnValue(true);
- IgniteInternalFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(req.reads(), req.writes(),
- req.dhtVersions(), req.messageId(), req.miniId(), req.transactionNodes(), req.last(),
- req.lastBackups());
+ IgniteInternalFuture<IgniteTxEx<K, V>> fut = tx.prepareAsync(
+ req.reads(),
+ req.writes(),
+ req.dhtVersions(),
+ req.messageId(),
+ req.miniId(),
+ req.transactionNodes(),
+ req.last(),
+ req.lastBackups(),
+ completeCb);
if (tx.isRollbackOnly()) {
try {
@@ -722,10 +746,10 @@ public class IgniteTxHandler<K, V> {
if (nearTx != null && nearTx.local())
nearTx = null;
- finish(nodeId, dhtTx, req, req.ttls());
+ finish(nodeId, dhtTx, req);
if (nearTx != null)
- finish(nodeId, nearTx, req, req.nearTtls());
+ finish(nodeId, nearTx, req);
if (dhtTx != null && !dhtTx.done()) {
dhtTx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteTx>>() {
@@ -742,13 +766,11 @@ public class IgniteTxHandler<K, V> {
* @param nodeId Node ID.
* @param tx Transaction.
* @param req Request.
- * @param ttls TTLs for optimistic transaction.
*/
protected void finish(
UUID nodeId,
IgniteTxRemoteEx<K, V> tx,
- GridDhtTxFinishRequest<K, V> req,
- @Nullable GridLongList ttls) {
+ GridDhtTxFinishRequest<K, V> req) {
// We don't allow explicit locks for transactions and
// therefore immediately return if transaction is null.
// However, we may decide to relax this restriction in
@@ -770,21 +792,12 @@ public class IgniteTxHandler<K, V> {
log.debug("Received finish request for transaction [senderNodeId=" + nodeId + ", req=" + req +
", tx=" + tx + ']');
- assert ttls == null || tx.concurrency() == OPTIMISTIC;
-
try {
if (req.commit() || req.isSystemInvalidate()) {
if (tx.commitVersion(req.commitVersion())) {
tx.invalidate(req.isInvalidate());
tx.systemInvalidate(req.isSystemInvalidate());
- if (tx.concurrency() == OPTIMISTIC && ttls != null) {
- int idx = 0;
-
- for (IgniteTxEntry<K, V> e : tx.writeEntries())
- e.ttl(ttls.get(idx));
- }
-
// Complete remote candidates.
tx.doneRemote(req.baseVersion(), null, null, null);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be5b908c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index c490156..4179310 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -885,7 +885,11 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs
}, 3000);
}
- assertEquals("Unexpected ttl [grid=" + i + ", key=" + key +']', ttl, e.ttl());
+ boolean primary = cache.entry(key).primary();
+ boolean backup = cache.entry(key).backup();
+
+ assertEquals("Unexpected ttl [grid=" + i + ", key=" + key + ", e=" + e +
+ ", primary=" + primary + ", backup=" + backup + ']', ttl, e.ttl());
if (ttl > 0)
assertTrue(e.expireTime() > 0);